Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add some accessors
[simgrid.git] / src / smpi / smpi_comm.cpp
1 /* Copyright (c) 2010-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdlib.h>
8 #include <limits.h>
9
10 #include <xbt/dict.h>
11 #include <xbt/ex.h>
12 #include <xbt/ex.hpp>
13
14 #include <simgrid/s4u/host.hpp>
15
16 #include "private.h"
17 #include "src/simix/smx_private.h"
18 #include "colls/colls.h"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, "Logging specific to SMPI (comm)");
21
22 xbt_dict_t smpi_comm_keyvals = nullptr;
23 int comm_keyval_id = 0;//avoid collisions
24
25
26  Comm mpi_MPI_COMM_UNINITIALIZED;
27 MPI_Comm MPI_COMM_UNINITIALIZED=&mpi_MPI_COMM_UNINITIALIZED;
28
29 /* Support for cartesian topology was added, but there are 2 other types of topology, graph et dist graph. In order to
30  * support them, we have to add a field MPIR_Topo_type, and replace the MPI_Topology field by an union. */
31
32 static int smpi_compare_rankmap(const void *a, const void *b)
33 {
34   const int* x = static_cast<const int*>(a);
35   const int* y = static_cast<const int*>(b);
36
37   if (x[1] < y[1]) {
38     return -1;
39   }
40   if (x[1] == y[1]) {
41     if (x[0] < y[0]) {
42       return -1;
43     }
44     if (x[0] == y[0]) {
45       return 0;
46     }
47     return 1;
48   }
49   return 1;
50 }
51
52 namespace simgrid{
53 namespace smpi{
54
55 Comm::Comm(){}
56
57 Comm::Comm(MPI_Group group, MPI_Topology topo) : group_(group), topo_(topo)
58 {
59   refcount_=1;
60   topoType_ = MPI_INVALID_TOPO;
61   intra_comm_ = MPI_COMM_NULL;
62   leaders_comm_ = MPI_COMM_NULL;
63   is_uniform_=1;
64   non_uniform_map_ = nullptr;
65   leaders_map_ = nullptr;
66   is_blocked_=0;
67   attributes_=nullptr;
68 }
69
70 void Comm::destroy()
71 {
72   if (this == MPI_COMM_UNINITIALIZED){
73     smpi_process_comm_world()->destroy();
74     return;
75   }
76   delete topo_; // there's no use count on topos
77   this->unuse();
78 }
79
80 int Comm::dup(MPI_Comm* newcomm){
81   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
82      smpi_switch_data_segment(smpi_process_index());
83    }
84   MPI_Group cp = new  Group(this->group());
85   (*newcomm) = new  Comm(cp, this->topo());
86   int ret = MPI_SUCCESS;
87
88   if(attributes_ !=nullptr){
89     (*newcomm)->attributes_   = xbt_dict_new_homogeneous(nullptr);
90     xbt_dict_cursor_t cursor = nullptr;
91     char* key;
92     int flag;
93     void* value_in;
94     void* value_out;
95     xbt_dict_foreach (attributes_, cursor, key, value_in) {
96       smpi_comm_key_elem elem =
97           static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(smpi_comm_keyvals, key, sizeof(int)));
98       if (elem != nullptr && elem->copy_fn != MPI_NULL_COPY_FN) {
99         ret = elem->copy_fn(this, atoi(key), nullptr, value_in, &value_out, &flag);
100         if (ret != MPI_SUCCESS) {
101           (*newcomm)->destroy();
102           *newcomm = MPI_COMM_NULL;
103           xbt_dict_cursor_free(&cursor);
104           return ret;
105         }
106         if (flag)
107           xbt_dict_set_ext((*newcomm)->attributes_, key, sizeof(int), value_out, nullptr);
108       }
109       }
110     }
111   return ret;
112 }
113
114 MPI_Group Comm::group()
115 {
116   if (this == MPI_COMM_UNINITIALIZED)
117     return smpi_process_comm_world()->group();
118   return group_;
119 }
120
121 MPI_Topology Comm::topo() {
122   return topo_;
123 }
124
125 int Comm::size()
126 {
127   if (this == MPI_COMM_UNINITIALIZED)
128     return smpi_process_comm_world()->size();
129   return group_->size();
130 }
131
132 int Comm::rank()
133 {
134   if (this == MPI_COMM_UNINITIALIZED)
135     return smpi_process_comm_world()->rank();
136   return group_->rank(smpi_process_index());
137 }
138
139 void Comm::get_name (char* name, int* len)
140 {
141   if (this == MPI_COMM_UNINITIALIZED){
142     smpi_process_comm_world()->get_name(name, len);
143     return;
144   }
145   if(this == MPI_COMM_WORLD) {
146     strncpy(name, "WORLD",5);
147     *len = 5;
148   } else {
149     *len = snprintf(name, MPI_MAX_NAME_STRING, "%p", this);
150   }
151 }
152
153 void Comm::set_leaders_comm(MPI_Comm leaders){
154   if (this == MPI_COMM_UNINITIALIZED){
155     smpi_process_comm_world()->set_leaders_comm(leaders);
156     return;
157   }
158   leaders_comm_=leaders;
159 }
160
161 void Comm::set_intra_comm(MPI_Comm leaders){
162   intra_comm_=leaders;
163 }
164
165 int* Comm::get_non_uniform_map(){
166   if (this == MPI_COMM_UNINITIALIZED)
167     return smpi_process_comm_world()->get_non_uniform_map();
168   return non_uniform_map_;
169 }
170
171 int* Comm::get_leaders_map(){
172   if (this == MPI_COMM_UNINITIALIZED)
173     return smpi_process_comm_world()->get_leaders_map();
174   return leaders_map_;
175 }
176
177 MPI_Comm Comm::get_leaders_comm(){
178   if (this == MPI_COMM_UNINITIALIZED)
179     return smpi_process_comm_world()->get_leaders_comm();
180   return leaders_comm_;
181 }
182
183 MPI_Comm Comm::get_intra_comm(){
184   if (this == MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD) 
185     return smpi_process_get_comm_intra();
186   else return intra_comm_;
187 }
188
189 int Comm::is_uniform(){
190   if (this == MPI_COMM_UNINITIALIZED)
191     return smpi_process_comm_world()->is_uniform();
192   return is_uniform_;
193 }
194
195 int Comm::is_blocked(){
196   if (this == MPI_COMM_UNINITIALIZED)
197     return smpi_process_comm_world()->is_blocked();
198   return is_blocked_;
199 }
200
201 MPI_Comm Comm::split(int color, int key)
202 {
203   if (this == MPI_COMM_UNINITIALIZED)
204     return smpi_process_comm_world()->split(color, key);
205   int system_tag = 123;
206   int* recvbuf;
207
208   MPI_Group group_root = nullptr;
209   MPI_Group group_out  = nullptr;
210   MPI_Group group      = this->group();
211   int rank             = this->rank();
212   int size             = this->size();
213   /* Gather all colors and keys on rank 0 */
214   int* sendbuf = xbt_new(int, 2);
215   sendbuf[0] = color;
216   sendbuf[1] = key;
217   if(rank == 0) {
218     recvbuf = xbt_new(int, 2 * size);
219   } else {
220     recvbuf = nullptr;
221   }
222   smpi_mpi_gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, this);
223   xbt_free(sendbuf);
224   /* Do the actual job */
225   if(rank == 0) {
226     MPI_Group* group_snd = xbt_new(MPI_Group, size);
227     int* rankmap         = xbt_new(int, 2 * size);
228     for (int i = 0; i < size; i++) {
229       if (recvbuf[2 * i] != MPI_UNDEFINED) {
230         int count = 0;
231         for (int j = i + 1; j < size; j++) {
232           if(recvbuf[2 * i] == recvbuf[2 * j]) {
233             recvbuf[2 * j] = MPI_UNDEFINED;
234             rankmap[2 * count] = j;
235             rankmap[2 * count + 1] = recvbuf[2 * j + 1];
236             count++;
237           }
238         }
239         /* Add self in the group */
240         recvbuf[2 * i] = MPI_UNDEFINED;
241         rankmap[2 * count] = i;
242         rankmap[2 * count + 1] = recvbuf[2 * i + 1];
243         count++;
244         qsort(rankmap, count, 2 * sizeof(int), &smpi_compare_rankmap);
245         group_out = new  Group(count);
246         if (i == 0) {
247           group_root = group_out; /* Save root's group */
248         }
249         for (int j = 0; j < count; j++) {
250           int index = group->index(rankmap[2 * j]);
251           group_out->set_mapping(index, j);
252         }
253         MPI_Request* requests = xbt_new(MPI_Request, count);
254         int reqs              = 0;
255         for (int j = 0; j < count; j++) {
256           if(rankmap[2 * j] != 0) {
257             group_snd[reqs]=new  Group(group_out);
258             requests[reqs] = Request::isend(&(group_snd[reqs]), 1, MPI_PTR, rankmap[2 * j], system_tag, this);
259             reqs++;
260           }
261         }
262         if(i != 0) {
263           group_out->destroy();
264         }
265         Request::waitall(reqs, requests, MPI_STATUS_IGNORE);
266         xbt_free(requests);
267       }
268     }
269     xbt_free(recvbuf);
270     xbt_free(rankmap);
271     xbt_free(group_snd);
272     group_out = group_root; /* exit with root's group */
273   } else {
274     if(color != MPI_UNDEFINED) {
275       Request::recv(&group_out, 1, MPI_PTR, 0, system_tag, this, MPI_STATUS_IGNORE);
276     } /* otherwise, exit with group_out == nullptr */
277   }
278   return group_out!=nullptr ? new  Comm(group_out, nullptr) : MPI_COMM_NULL;
279 }
280
281 void Comm::use(){
282   if (this == MPI_COMM_UNINITIALIZED){
283     smpi_process_comm_world()->use();
284     return;
285   }
286   group_->use();
287   refcount_++;
288 }
289
290 void Comm::cleanup_attributes(){
291   if(attributes_ !=nullptr){
292     xbt_dict_cursor_t cursor = nullptr;
293     char* key;
294     void* value;
295     int flag;
296     xbt_dict_foreach (attributes_, cursor, key, value) {
297       smpi_comm_key_elem elem = static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null(smpi_comm_keyvals, key));
298       if (elem != nullptr && elem->delete_fn != nullptr)
299         elem->delete_fn(this, atoi(key), value, &flag);
300     }
301     xbt_dict_free(&attributes_);
302   }
303 }
304
305 void Comm::cleanup_smp(){
306   if (intra_comm_ != MPI_COMM_NULL)
307     intra_comm_->unuse();
308   if (leaders_comm_ != MPI_COMM_NULL)
309     leaders_comm_->unuse();
310   if (non_uniform_map_ != nullptr)
311     xbt_free(non_uniform_map_);
312   if (leaders_map_ != nullptr)
313     xbt_free(leaders_map_);
314 }
315
316 void Comm::unuse(){
317   if (this == MPI_COMM_UNINITIALIZED){
318     smpi_process_comm_world()->unuse();
319     return;
320   }
321   refcount_--;
322   group_->unuse();
323
324   if(refcount_==0){
325     this->cleanup_smp();
326     this->cleanup_attributes();
327     delete this;
328   }
329 }
330
331 static int compare_ints (const void *a, const void *b)
332 {
333   const int *da = static_cast<const int *>(a);
334   const int *db = static_cast<const int *>(b);
335
336   return static_cast<int>(*da > *db) - static_cast<int>(*da < *db);
337 }
338
339 void Comm::init_smp(){
340   int leader = -1;
341
342   if (this == MPI_COMM_UNINITIALIZED)
343     smpi_process_comm_world()->init_smp();
344
345   int comm_size = this->size();
346   
347   // If we are in replay - perform an ugly hack  
348   // tell SimGrid we are not in replay for a while, because we need the buffers to be copied for the following calls
349   bool replaying = false; //cache data to set it back again after
350   if(smpi_process_get_replaying()){
351    replaying=true;
352    smpi_process_set_replaying(false);
353   }
354
355   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
356      smpi_switch_data_segment(smpi_process_index());
357    }
358   //identify neighbours in comm
359   //get the indexes of all processes sharing the same simix host
360   xbt_swag_t process_list = SIMIX_host_self()->processes();
361   int intra_comm_size = 0;
362   int i =0;
363   int min_index=INT_MAX;//the minimum index will be the leader
364   smx_actor_t process = nullptr;
365   xbt_swag_foreach(process, process_list) {
366     int index = process->pid -1;
367
368     if(this->group()->rank(index)!=MPI_UNDEFINED){
369         intra_comm_size++;
370       //the process is in the comm
371       if(index < min_index)
372         min_index=index;
373       i++;
374     }
375   }
376   XBT_DEBUG("number of processes deployed on my node : %d", intra_comm_size);
377   MPI_Group group_intra = new  Group(intra_comm_size);
378   i=0;
379   process = nullptr;
380   xbt_swag_foreach(process, process_list) {
381     int index = process->pid -1;
382     if(this->group()->rank(index)!=MPI_UNDEFINED){
383       group_intra->set_mapping(index, i);
384       i++;
385     }
386   }
387
388   MPI_Comm comm_intra = new  Comm(group_intra, nullptr);
389   leader=min_index;
390
391   int * leaders_map= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
392   int * leader_list= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
393   for(i=0; i<comm_size; i++){
394       leader_list[i]=-1;
395   }
396
397   smpi_coll_tuned_allgather_mpich(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, this);
398
399   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
400      smpi_switch_data_segment(smpi_process_index());
401    }
402
403   if(leaders_map_==nullptr){
404     leaders_map_= leaders_map;
405   }else{
406     xbt_free(leaders_map);
407   }
408   int j=0;
409   int leader_group_size = 0;
410   for(i=0; i<comm_size; i++){
411       int already_done=0;
412       for(j=0;j<leader_group_size; j++){
413         if(leaders_map_[i]==leader_list[j]){
414             already_done=1;
415         }
416       }
417       if(already_done==0){
418         leader_list[leader_group_size]=leaders_map_[i];
419         leader_group_size++;
420       }
421   }
422   qsort(leader_list, leader_group_size, sizeof(int),compare_ints);
423
424   MPI_Group leaders_group = new  Group(leader_group_size);
425
426   MPI_Comm leader_comm = MPI_COMM_NULL;
427   if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && this!=MPI_COMM_WORLD){
428     //create leader_communicator
429     for (i=0; i< leader_group_size;i++)
430       leaders_group->set_mapping(leader_list[i], i);
431     leader_comm = new  Comm(leaders_group, nullptr);
432     this->set_leaders_comm(leader_comm);
433     this->set_intra_comm(comm_intra);
434
435    //create intracommunicator
436   }else{
437     for (i=0; i< leader_group_size;i++)
438       leaders_group->set_mapping(leader_list[i], i);
439
440     if(this->get_leaders_comm()==MPI_COMM_NULL){
441       leader_comm = new  Comm(leaders_group, nullptr);
442       this->set_leaders_comm(leader_comm);
443     }else{
444       leader_comm=this->get_leaders_comm();
445       leaders_group->unuse();
446     }
447     smpi_process_set_comm_intra(comm_intra);
448   }
449
450   int is_uniform = 1;
451
452   // Are the nodes uniform ? = same number of process/node
453   int my_local_size=comm_intra->size();
454   if(comm_intra->rank()==0) {
455     int* non_uniform_map = xbt_new0(int,leader_group_size);
456     smpi_coll_tuned_allgather_mpich(&my_local_size, 1, MPI_INT,
457         non_uniform_map, 1, MPI_INT, leader_comm);
458     for(i=0; i < leader_group_size; i++) {
459       if(non_uniform_map[0] != non_uniform_map[i]) {
460         is_uniform = 0;
461         break;
462       }
463     }
464     if(is_uniform==0 && this->is_uniform()!=0){
465         non_uniform_map_= non_uniform_map;
466     }else{
467         xbt_free(non_uniform_map);
468     }
469     is_uniform_=is_uniform;
470   }
471   smpi_coll_tuned_bcast_mpich(&(is_uniform_),1, MPI_INT, 0, comm_intra );
472
473   if(smpi_privatize_global_variables){ //we need to switch as the called function may silently touch global variables
474      smpi_switch_data_segment(smpi_process_index());
475    }
476   // Are the ranks blocked ? = allocated contiguously on the SMP nodes
477   int is_blocked=1;
478   int prev=this->group()->rank(comm_intra->group()->index(0));
479     for (i=1; i<my_local_size; i++){
480       int that=this->group()->rank(comm_intra->group()->index(i));
481       if(that!=prev+1){
482         is_blocked=0;
483         break;
484       }
485       prev = that;
486   }
487
488   int global_blocked;
489   smpi_mpi_allreduce(&is_blocked, &(global_blocked), 1, MPI_INT, MPI_LAND, this);
490
491   if(MPI_COMM_WORLD==MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD){
492     if(this->rank()==0){
493         is_blocked_=global_blocked;
494     }
495   }else{
496     is_blocked_=global_blocked;
497   }
498   xbt_free(leader_list);
499   
500   if(replaying)
501     smpi_process_set_replaying(true); 
502 }
503
504 int Comm::attr_delete(int keyval){
505   smpi_comm_key_elem elem =
506      static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(smpi_comm_keyvals, reinterpret_cast<const char*>(&keyval), sizeof(int)));
507   if(elem==nullptr)
508     return MPI_ERR_ARG;
509   if(elem->delete_fn!=MPI_NULL_DELETE_FN){
510     void* value = nullptr;
511     int flag;
512     if(this->attr_get(keyval, &value, &flag)==MPI_SUCCESS){
513       int ret = elem->delete_fn(this, keyval, value, &flag);
514       if(ret!=MPI_SUCCESS) 
515         return ret;
516     }
517   }
518   if(attributes_==nullptr)
519     return MPI_ERR_ARG;
520
521   xbt_dict_remove_ext(attributes_, reinterpret_cast<const char*>(&keyval), sizeof(int));
522   return MPI_SUCCESS;
523 }
524
525 int Comm::attr_get(int keyval, void* attr_value, int* flag){
526   smpi_comm_key_elem elem =
527     static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(smpi_comm_keyvals, reinterpret_cast<const char*>(&keyval), sizeof(int)));
528   if(elem==nullptr)
529     return MPI_ERR_ARG;
530   if(attributes_==nullptr){
531     *flag=0;
532     return MPI_SUCCESS;
533   }
534   try {
535     *static_cast<void**>(attr_value) =
536         xbt_dict_get_ext(attributes_, reinterpret_cast<const char*>(&keyval), sizeof(int));
537     *flag=1;
538   }
539   catch (xbt_ex& ex) {
540     *flag=0;
541   }
542   return MPI_SUCCESS;
543 }
544
545 int Comm::attr_put(int keyval, void* attr_value){
546   if(smpi_comm_keyvals==nullptr)
547     smpi_comm_keyvals = xbt_dict_new_homogeneous(nullptr);
548   smpi_comm_key_elem elem =
549     static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(smpi_comm_keyvals,  reinterpret_cast<const char*>(&keyval), sizeof(int)));
550   if(elem==nullptr)
551     return MPI_ERR_ARG;
552   int flag;
553   void* value = nullptr;
554   this->attr_get(keyval, &value, &flag);
555   if(flag!=0 && elem->delete_fn!=MPI_NULL_DELETE_FN){
556     int ret = elem->delete_fn(this, keyval, value, &flag);
557     if(ret!=MPI_SUCCESS) 
558       return ret;
559   }
560   if(attributes_==nullptr)
561     attributes_ = xbt_dict_new_homogeneous(nullptr);
562
563   xbt_dict_set_ext(attributes_,  reinterpret_cast<const char*>(&keyval), sizeof(int), attr_value, nullptr);
564   return MPI_SUCCESS;
565 }
566
567 }
568 }
569
570 int smpi_comm_keyval_create(MPI_Comm_copy_attr_function* copy_fn, MPI_Comm_delete_attr_function* delete_fn, int* keyval,
571                             void* extra_state){
572   if(smpi_comm_keyvals==nullptr)
573     smpi_comm_keyvals = xbt_dict_new_homogeneous(nullptr);
574
575   smpi_comm_key_elem value = static_cast<smpi_comm_key_elem>(xbt_new0(s_smpi_mpi_comm_key_elem_t,1));
576
577   value->copy_fn=copy_fn;
578   value->delete_fn=delete_fn;
579
580   *keyval = comm_keyval_id;
581   xbt_dict_set_ext(smpi_comm_keyvals, reinterpret_cast<const char*>(keyval), sizeof(int),static_cast<void*>(value), nullptr);
582   comm_keyval_id++;
583   return MPI_SUCCESS;
584 }
585
586 int smpi_comm_keyval_free(int* keyval){
587   smpi_comm_key_elem elem =
588      static_cast<smpi_comm_key_elem>(xbt_dict_get_or_null_ext(smpi_comm_keyvals,  reinterpret_cast<const char*>(keyval), sizeof(int)));
589   if(elem==nullptr)
590     return MPI_ERR_ARG;
591   xbt_dict_remove_ext(smpi_comm_keyvals,  reinterpret_cast<const char*>(keyval), sizeof(int));
592   xbt_free(elem);
593   return MPI_SUCCESS;
594 }