Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
8ea2304215782336327b160a930f31f21b26c395
[simgrid.git] / src / smpi / smpi_comm.cpp
1 /* Copyright (c) 2010-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdlib.h>
8 #include <limits.h>
9
10 #include <xbt/dict.h>
11 #include <xbt/ex.h>
12 #include <xbt/ex.hpp>
13
14 #include <simgrid/s4u/host.hpp>
15
16 #include "private.h"
17 #include "src/simix/smx_private.h"
18
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, "Logging specific to SMPI (comm)");
20
21  Comm mpi_MPI_COMM_UNINITIALIZED;
22 MPI_Comm MPI_COMM_UNINITIALIZED=&mpi_MPI_COMM_UNINITIALIZED;
23
24 /* Support for cartesian topology was added, but there are 2 other types of topology, graph et dist graph. In order to
25  * support them, we have to add a field MPIR_Topo_type, and replace the MPI_Topology field by an union. */
26
27 static int smpi_compare_rankmap(const void *a, const void *b)
28 {
29   const int* x = static_cast<const int*>(a);
30   const int* y = static_cast<const int*>(b);
31
32   if (x[1] < y[1]) {
33     return -1;
34   }
35   if (x[1] == y[1]) {
36     if (x[0] < y[0]) {
37       return -1;
38     }
39     if (x[0] == y[0]) {
40       return 0;
41     }
42     return 1;
43   }
44   return 1;
45 }
46
47 namespace simgrid{
48 namespace smpi{
49
50 xbt_dict_t Comm::keyvals_ = nullptr;
51 int Comm::keyval_id_ = 0;//avoid collisions
52
53 Comm::Comm(MPI_Group group, MPI_Topology topo) : group_(group), topo_(topo)
54 {
55   refcount_=1;
56   topoType_ = MPI_INVALID_TOPO;
57   intra_comm_ = MPI_COMM_NULL;
58   leaders_comm_ = MPI_COMM_NULL;
59   is_uniform_=1;
60   non_uniform_map_ = nullptr;
61   leaders_map_ = nullptr;
62   is_blocked_=0;
63   attributes_=nullptr;
64 }
65
66 void Comm::destroy(Comm* comm)
67 {
68   if (comm == MPI_COMM_UNINITIALIZED){
69     Comm::destroy(smpi_process_comm_world());
70     return;
71   }
72   delete comm->topo_; // there's no use count on topos
73   Comm::unref(comm);
74 }
75
76 int Comm::dup(MPI_Comm* newcomm){
77   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
78      smpi_switch_data_segment(smpi_process_index());
79    }
80   MPI_Group cp = new  Group(this->group());
81   (*newcomm) = new  Comm(cp, this->topo());
82   int ret = MPI_SUCCESS;
83
84   if(attributes_ !=nullptr){
85     (*newcomm)->attributes_   = xbt_dict_new_homogeneous(nullptr);
86     xbt_dict_cursor_t cursor = nullptr;
87     char* key;
88     int flag;
89     void* value_in;
90     void* value_out;
91     xbt_dict_foreach (attributes_, cursor, key, value_in) {
92       smpi_comm_key_elem elem =
93           static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(keyvals_, key, sizeof(int)));
94       if (elem != nullptr && elem->copy_fn != MPI_NULL_COPY_FN) {
95         ret = elem->copy_fn(this, atoi(key), nullptr, value_in, &value_out, &flag);
96         if (ret != MPI_SUCCESS) {
97           Comm::destroy(*newcomm);
98           *newcomm = MPI_COMM_NULL;
99           xbt_dict_cursor_free(&cursor);
100           return ret;
101         }
102         if (flag)
103           xbt_dict_set_ext((*newcomm)->attributes_, key, sizeof(int), value_out, nullptr);
104       }
105       }
106     }
107   return ret;
108 }
109
110 MPI_Group Comm::group()
111 {
112   if (this == MPI_COMM_UNINITIALIZED)
113     return smpi_process_comm_world()->group();
114   return group_;
115 }
116
117 MPI_Topology Comm::topo() {
118   return topo_;
119 }
120
121 int Comm::size()
122 {
123   if (this == MPI_COMM_UNINITIALIZED)
124     return smpi_process_comm_world()->size();
125   return group_->size();
126 }
127
128 int Comm::rank()
129 {
130   if (this == MPI_COMM_UNINITIALIZED)
131     return smpi_process_comm_world()->rank();
132   return group_->rank(smpi_process_index());
133 }
134
135 void Comm::get_name (char* name, int* len)
136 {
137   if (this == MPI_COMM_UNINITIALIZED){
138     smpi_process_comm_world()->get_name(name, len);
139     return;
140   }
141   if(this == MPI_COMM_WORLD) {
142     strncpy(name, "WORLD",5);
143     *len = 5;
144   } else {
145     *len = snprintf(name, MPI_MAX_NAME_STRING, "%p", this);
146   }
147 }
148
149 void Comm::set_leaders_comm(MPI_Comm leaders){
150   if (this == MPI_COMM_UNINITIALIZED){
151     smpi_process_comm_world()->set_leaders_comm(leaders);
152     return;
153   }
154   leaders_comm_=leaders;
155 }
156
157 void Comm::set_intra_comm(MPI_Comm leaders){
158   intra_comm_=leaders;
159 }
160
161 int* Comm::get_non_uniform_map(){
162   if (this == MPI_COMM_UNINITIALIZED)
163     return smpi_process_comm_world()->get_non_uniform_map();
164   return non_uniform_map_;
165 }
166
167 int* Comm::get_leaders_map(){
168   if (this == MPI_COMM_UNINITIALIZED)
169     return smpi_process_comm_world()->get_leaders_map();
170   return leaders_map_;
171 }
172
173 MPI_Comm Comm::get_leaders_comm(){
174   if (this == MPI_COMM_UNINITIALIZED)
175     return smpi_process_comm_world()->get_leaders_comm();
176   return leaders_comm_;
177 }
178
179 MPI_Comm Comm::get_intra_comm(){
180   if (this == MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD) 
181     return smpi_process_get_comm_intra();
182   else return intra_comm_;
183 }
184
185 int Comm::is_uniform(){
186   if (this == MPI_COMM_UNINITIALIZED)
187     return smpi_process_comm_world()->is_uniform();
188   return is_uniform_;
189 }
190
191 int Comm::is_blocked(){
192   if (this == MPI_COMM_UNINITIALIZED)
193     return smpi_process_comm_world()->is_blocked();
194   return is_blocked_;
195 }
196
197 MPI_Comm Comm::split(int color, int key)
198 {
199   if (this == MPI_COMM_UNINITIALIZED)
200     return smpi_process_comm_world()->split(color, key);
201   int system_tag = 123;
202   int* recvbuf;
203
204   MPI_Group group_root = nullptr;
205   MPI_Group group_out  = nullptr;
206   MPI_Group group      = this->group();
207   int rank             = this->rank();
208   int size             = this->size();
209   /* Gather all colors and keys on rank 0 */
210   int* sendbuf = xbt_new(int, 2);
211   sendbuf[0] = color;
212   sendbuf[1] = key;
213   if(rank == 0) {
214     recvbuf = xbt_new(int, 2 * size);
215   } else {
216     recvbuf = nullptr;
217   }
218   Coll_gather_default::gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, this);
219   xbt_free(sendbuf);
220   /* Do the actual job */
221   if(rank == 0) {
222     MPI_Group* group_snd = xbt_new(MPI_Group, size);
223     int* rankmap         = xbt_new(int, 2 * size);
224     for (int i = 0; i < size; i++) {
225       if (recvbuf[2 * i] != MPI_UNDEFINED) {
226         int count = 0;
227         for (int j = i + 1; j < size; j++) {
228           if(recvbuf[2 * i] == recvbuf[2 * j]) {
229             recvbuf[2 * j] = MPI_UNDEFINED;
230             rankmap[2 * count] = j;
231             rankmap[2 * count + 1] = recvbuf[2 * j + 1];
232             count++;
233           }
234         }
235         /* Add self in the group */
236         recvbuf[2 * i] = MPI_UNDEFINED;
237         rankmap[2 * count] = i;
238         rankmap[2 * count + 1] = recvbuf[2 * i + 1];
239         count++;
240         qsort(rankmap, count, 2 * sizeof(int), &smpi_compare_rankmap);
241         group_out = new  Group(count);
242         if (i == 0) {
243           group_root = group_out; /* Save root's group */
244         }
245         for (int j = 0; j < count; j++) {
246           int index = group->index(rankmap[2 * j]);
247           group_out->set_mapping(index, j);
248         }
249         MPI_Request* requests = xbt_new(MPI_Request, count);
250         int reqs              = 0;
251         for (int j = 0; j < count; j++) {
252           if(rankmap[2 * j] != 0) {
253             group_snd[reqs]=new  Group(group_out);
254             requests[reqs] = Request::isend(&(group_snd[reqs]), 1, MPI_PTR, rankmap[2 * j], system_tag, this);
255             reqs++;
256           }
257         }
258         if(i != 0) {
259           if(group_out != MPI_COMM_WORLD->group() && group_out != MPI_GROUP_EMPTY)
260             Group::unref(group_out);
261         }
262         Request::waitall(reqs, requests, MPI_STATUS_IGNORE);
263         xbt_free(requests);
264       }
265     }
266     xbt_free(recvbuf);
267     xbt_free(rankmap);
268     xbt_free(group_snd);
269     group_out = group_root; /* exit with root's group */
270   } else {
271     if(color != MPI_UNDEFINED) {
272       Request::recv(&group_out, 1, MPI_PTR, 0, system_tag, this, MPI_STATUS_IGNORE);
273     } /* otherwise, exit with group_out == nullptr */
274   }
275   return group_out!=nullptr ? new  Comm(group_out, nullptr) : MPI_COMM_NULL;
276 }
277
278 void Comm::ref(){
279   if (this == MPI_COMM_UNINITIALIZED){
280     smpi_process_comm_world()->ref();
281     return;
282   }
283   group_->ref();
284   refcount_++;
285 }
286
287 void Comm::cleanup_attributes(){
288   if(attributes_ !=nullptr){
289     xbt_dict_cursor_t cursor = nullptr;
290     char* key;
291     void* value;
292     int flag;
293     xbt_dict_foreach (attributes_, cursor, key, value) {
294       smpi_comm_key_elem elem = static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null(keyvals_, key));
295       if (elem != nullptr && elem->delete_fn != nullptr)
296         elem->delete_fn(this, atoi(key), value, &flag);
297     }
298     xbt_dict_free(&attributes_);
299   }
300 }
301
302 void Comm::cleanup_smp(){
303   if (intra_comm_ != MPI_COMM_NULL)
304     Comm::unref(intra_comm_);
305   if (leaders_comm_ != MPI_COMM_NULL)
306     Comm::unref(leaders_comm_);
307   if (non_uniform_map_ != nullptr)
308     xbt_free(non_uniform_map_);
309   if (leaders_map_ != nullptr)
310     xbt_free(leaders_map_);
311 }
312
313 void Comm::unref(Comm* comm){
314   if (comm == MPI_COMM_UNINITIALIZED){
315     Comm::unref(smpi_process_comm_world());
316     return;
317   }
318   comm->refcount_--;
319   Group::unref(comm->group_);
320
321   if(comm->refcount_==0){
322     comm->cleanup_smp();
323     comm->cleanup_attributes();
324     delete comm;
325   }
326 }
327
328 static int compare_ints (const void *a, const void *b)
329 {
330   const int *da = static_cast<const int *>(a);
331   const int *db = static_cast<const int *>(b);
332
333   return static_cast<int>(*da > *db) - static_cast<int>(*da < *db);
334 }
335
336 void Comm::init_smp(){
337   int leader = -1;
338
339   if (this == MPI_COMM_UNINITIALIZED)
340     smpi_process_comm_world()->init_smp();
341
342   int comm_size = this->size();
343   
344   // If we are in replay - perform an ugly hack  
345   // tell SimGrid we are not in replay for a while, because we need the buffers to be copied for the following calls
346   bool replaying = false; //cache data to set it back again after
347   if(smpi_process_get_replaying()){
348    replaying=true;
349    smpi_process_set_replaying(false);
350   }
351
352   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
353      smpi_switch_data_segment(smpi_process_index());
354    }
355   //identify neighbours in comm
356   //get the indexes of all processes sharing the same simix host
357   xbt_swag_t process_list = SIMIX_host_self()->extension<simgrid::simix::Host>()->process_list;
358   int intra_comm_size     = 0;
359   int min_index           = INT_MAX;//the minimum index will be the leader
360   smx_actor_t actor       = nullptr;
361   xbt_swag_foreach(actor, process_list) {
362     int index = actor->pid -1;
363
364     if(this->group()->rank(index)!=MPI_UNDEFINED){
365       intra_comm_size++;
366       //the process is in the comm
367       if(index < min_index)
368         min_index=index;
369     }
370   }
371   XBT_DEBUG("number of processes deployed on my node : %d", intra_comm_size);
372   MPI_Group group_intra = new  Group(intra_comm_size);
373   int i = 0;
374   actor = nullptr;
375   xbt_swag_foreach(actor, process_list) {
376     int index = actor->pid -1;
377     if(this->group()->rank(index)!=MPI_UNDEFINED){
378       group_intra->set_mapping(index, i);
379       i++;
380     }
381   }
382
383   MPI_Comm comm_intra = new  Comm(group_intra, nullptr);
384   leader=min_index;
385
386   int * leaders_map= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
387   int * leader_list= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
388   for(i=0; i<comm_size; i++){
389       leader_list[i]=-1;
390   }
391
392   Coll_allgather_mpich::allgather(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, this);
393
394   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
395      smpi_switch_data_segment(smpi_process_index());
396    }
397
398   if(leaders_map_==nullptr){
399     leaders_map_= leaders_map;
400   }else{
401     xbt_free(leaders_map);
402   }
403   int j=0;
404   int leader_group_size = 0;
405   for(i=0; i<comm_size; i++){
406       int already_done=0;
407       for(j=0;j<leader_group_size; j++){
408         if(leaders_map_[i]==leader_list[j]){
409             already_done=1;
410         }
411       }
412       if(already_done==0){
413         leader_list[leader_group_size]=leaders_map_[i];
414         leader_group_size++;
415       }
416   }
417   qsort(leader_list, leader_group_size, sizeof(int),compare_ints);
418
419   MPI_Group leaders_group = new  Group(leader_group_size);
420
421   MPI_Comm leader_comm = MPI_COMM_NULL;
422   if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && this!=MPI_COMM_WORLD){
423     //create leader_communicator
424     for (i=0; i< leader_group_size;i++)
425       leaders_group->set_mapping(leader_list[i], i);
426     leader_comm = new  Comm(leaders_group, nullptr);
427     this->set_leaders_comm(leader_comm);
428     this->set_intra_comm(comm_intra);
429
430    //create intracommunicator
431   }else{
432     for (i=0; i< leader_group_size;i++)
433       leaders_group->set_mapping(leader_list[i], i);
434
435     if(this->get_leaders_comm()==MPI_COMM_NULL){
436       leader_comm = new  Comm(leaders_group, nullptr);
437       this->set_leaders_comm(leader_comm);
438     }else{
439       leader_comm=this->get_leaders_comm();
440       Group::unref(leaders_group);
441     }
442     smpi_process_set_comm_intra(comm_intra);
443   }
444
445   int is_uniform = 1;
446
447   // Are the nodes uniform ? = same number of process/node
448   int my_local_size=comm_intra->size();
449   if(comm_intra->rank()==0) {
450     int* non_uniform_map = xbt_new0(int,leader_group_size);
451     Coll_allgather_mpich::allgather(&my_local_size, 1, MPI_INT,
452         non_uniform_map, 1, MPI_INT, leader_comm);
453     for(i=0; i < leader_group_size; i++) {
454       if(non_uniform_map[0] != non_uniform_map[i]) {
455         is_uniform = 0;
456         break;
457       }
458     }
459     if(is_uniform==0 && this->is_uniform()!=0){
460         non_uniform_map_= non_uniform_map;
461     }else{
462         xbt_free(non_uniform_map);
463     }
464     is_uniform_=is_uniform;
465   }
466   Coll_bcast_mpich::bcast(&(is_uniform_),1, MPI_INT, 0, comm_intra );
467
468   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
469      smpi_switch_data_segment(smpi_process_index());
470    }
471   // Are the ranks blocked ? = allocated contiguously on the SMP nodes
472   int is_blocked=1;
473   int prev=this->group()->rank(comm_intra->group()->index(0));
474     for (i=1; i<my_local_size; i++){
475       int that=this->group()->rank(comm_intra->group()->index(i));
476       if(that!=prev+1){
477         is_blocked=0;
478         break;
479       }
480       prev = that;
481   }
482
483   int global_blocked;
484   Coll_allreduce_default::allreduce(&is_blocked, &(global_blocked), 1, MPI_INT, MPI_LAND, this);
485
486   if(MPI_COMM_WORLD==MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD){
487     if(this->rank()==0){
488         is_blocked_=global_blocked;
489     }
490   }else{
491     is_blocked_=global_blocked;
492   }
493   xbt_free(leader_list);
494   
495   if(replaying)
496     smpi_process_set_replaying(true); 
497 }
498
499 int Comm::attr_delete(int keyval){
500   smpi_comm_key_elem elem =
501      static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(keyvals_, reinterpret_cast<const char*>(&keyval), sizeof(int)));
502   if(elem==nullptr)
503     return MPI_ERR_ARG;
504   if(elem->delete_fn!=MPI_NULL_DELETE_FN){
505     void* value = nullptr;
506     int flag;
507     if(this->attr_get(keyval, &value, &flag)==MPI_SUCCESS){
508       int ret = elem->delete_fn(this, keyval, value, &flag);
509       if(ret!=MPI_SUCCESS) 
510         return ret;
511     }
512   }
513   if(attributes_==nullptr)
514     return MPI_ERR_ARG;
515
516   xbt_dict_remove_ext(attributes_, reinterpret_cast<const char*>(&keyval), sizeof(int));
517   return MPI_SUCCESS;
518 }
519
520 int Comm::attr_get(int keyval, void* attr_value, int* flag){
521   smpi_comm_key_elem elem =
522     static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(keyvals_, reinterpret_cast<const char*>(&keyval), sizeof(int)));
523   if(elem==nullptr)
524     return MPI_ERR_ARG;
525   if(attributes_==nullptr){
526     *flag=0;
527     return MPI_SUCCESS;
528   }
529   try {
530     *static_cast<void**>(attr_value) =
531         xbt_dict_get_ext(attributes_, reinterpret_cast<const char*>(&keyval), sizeof(int));
532     *flag=1;
533   }
534   catch (xbt_ex& ex) {
535     *flag=0;
536   }
537   return MPI_SUCCESS;
538 }
539
540 int Comm::attr_put(int keyval, void* attr_value){
541   if(keyvals_==nullptr)
542     keyvals_ = xbt_dict_new_homogeneous(nullptr);
543   smpi_comm_key_elem elem =
544     static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(keyvals_,  reinterpret_cast<const char*>(&keyval), sizeof(int)));
545   if(elem==nullptr)
546     return MPI_ERR_ARG;
547   int flag;
548   void* value = nullptr;
549   this->attr_get(keyval, &value, &flag);
550   if(flag!=0 && elem->delete_fn!=MPI_NULL_DELETE_FN){
551     int ret = elem->delete_fn(this, keyval, value, &flag);
552     if(ret!=MPI_SUCCESS) 
553       return ret;
554   }
555   if(attributes_==nullptr)
556     attributes_ = xbt_dict_new_homogeneous(nullptr);
557
558   xbt_dict_set_ext(attributes_,  reinterpret_cast<const char*>(&keyval), sizeof(int), attr_value, nullptr);
559   return MPI_SUCCESS;
560 }
561
562 MPI_Comm Comm::f2c(int id) {
563   if(id == -2) {
564     return MPI_COMM_SELF;
565   } else if(id==0){
566     return MPI_COMM_WORLD;
567   } else if(F2C::f2c_lookup_ != nullptr && id >= 0) {
568       char key[KEY_SIZE];
569       MPI_Comm tmp =  static_cast<MPI_Comm>(xbt_dict_get_or_null(F2C::f2c_lookup_,get_key_id(key, id)));
570       return tmp != nullptr ? tmp : MPI_COMM_NULL ;
571   } else {
572     return MPI_COMM_NULL;
573   }
574 }
575
576 void Comm::free_f(int id) {
577   char key[KEY_SIZE];
578   xbt_dict_remove(F2C::f2c_lookup_, id==0? get_key(key, id) : get_key_id(key, id));
579 }
580
581 int Comm::add_f() {
582   if(F2C::f2c_lookup_==nullptr){
583     F2C::f2c_lookup_=xbt_dict_new_homogeneous(nullptr);
584   }
585   char key[KEY_SIZE];
586   xbt_dict_set(F2C::f2c_lookup_, this==MPI_COMM_WORLD? get_key(key, F2C::f2c_id_) : get_key_id(key,F2C::f2c_id_), this, nullptr);
587   F2C::f2c_id_++;
588   return F2C::f2c_id_-1;
589 }
590
591 int Comm::keyval_create(MPI_Comm_copy_attr_function* copy_fn, MPI_Comm_delete_attr_function* delete_fn, int* keyval,
592                             void* extra_state){
593   if(keyvals_==nullptr)
594     keyvals_ = xbt_dict_new_homogeneous(nullptr);
595
596   smpi_comm_key_elem value = static_cast<smpi_comm_key_elem>(xbt_new0(s_smpi_mpi_comm_key_elem_t,1));
597
598   value->copy_fn=copy_fn;
599   value->delete_fn=delete_fn;
600
601   *keyval = keyval_id_;
602   xbt_dict_set_ext(keyvals_, reinterpret_cast<const char*>(keyval), sizeof(int),static_cast<void*>(value), nullptr);
603   keyval_id_++;
604   return MPI_SUCCESS;
605 }
606
607 int Comm::keyval_free(int* keyval){
608   smpi_comm_key_elem elem =
609      static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(keyvals_,  reinterpret_cast<const char*>(keyval), sizeof(int)));
610   if(elem==nullptr)
611     return MPI_ERR_ARG;
612   xbt_dict_remove_ext(keyvals_,  reinterpret_cast<const char*>(keyval), sizeof(int));
613   xbt_free(elem);
614   return MPI_SUCCESS;
615 }
616
617 void Comm::keyval_cleanup(){
618   if(Comm::keyvals_!=nullptr) 
619     xbt_dict_free(&Comm::keyvals_);
620 }
621
622
623 }
624 }
625
626