Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
this writeActions stuff was never used
[simgrid.git] / src / smpi / smpi_comm.cpp
1 /* Copyright (c) 2010-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/s4u/Host.hpp"
7 #include <climits>
8
9 #include "src/simix/smx_private.h"
10 #include "src/smpi/private.h"
11 #include "src/smpi/private.hpp"
12 #include "src/smpi/smpi_comm.hpp"
13 #include "src/smpi/smpi_coll.hpp"
14 #include "src/smpi/smpi_datatype.hpp"
15 #include "src/smpi/smpi_process.hpp"
16 #include "src/smpi/smpi_request.hpp"
17 #include "src/smpi/smpi_status.hpp"
18 #include "src/smpi/smpi_win.hpp"
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, "Logging specific to SMPI (comm)");
21
22  simgrid::smpi::Comm mpi_MPI_COMM_UNINITIALIZED;
23 MPI_Comm MPI_COMM_UNINITIALIZED=&mpi_MPI_COMM_UNINITIALIZED;
24
25 /* Support for cartesian topology was added, but there are 2 other types of topology, graph et dist graph. In order to
26  * support them, we have to add a field SMPI_Topo_type, and replace the MPI_Topology field by an union. */
27
28 static int smpi_compare_rankmap(const void *a, const void *b)
29 {
30   const int* x = static_cast<const int*>(a);
31   const int* y = static_cast<const int*>(b);
32
33   if (x[1] < y[1]) {
34     return -1;
35   }
36   if (x[1] == y[1]) {
37     if (x[0] < y[0]) {
38       return -1;
39     }
40     if (x[0] == y[0]) {
41       return 0;
42     }
43     return 1;
44   }
45   return 1;
46 }
47
48 namespace simgrid{
49 namespace smpi{
50
51 std::unordered_map<int, smpi_key_elem> Comm::keyvals_;
52 int Comm::keyval_id_=0;
53
54 Comm::Comm(MPI_Group group, MPI_Topology topo) : group_(group), topo_(topo)
55 {
56   refcount_=1;
57   topoType_ = MPI_INVALID_TOPO;
58   intra_comm_ = MPI_COMM_NULL;
59   leaders_comm_ = MPI_COMM_NULL;
60   is_uniform_=1;
61   non_uniform_map_ = nullptr;
62   leaders_map_ = nullptr;
63   is_blocked_=0;
64 }
65
66 void Comm::destroy(Comm* comm)
67 {
68   if (comm == MPI_COMM_UNINITIALIZED){
69     Comm::destroy(smpi_process()->comm_world());
70     return;
71   }
72   delete comm->topo_; // there's no use count on topos
73   Comm::unref(comm);
74 }
75
76 int Comm::dup(MPI_Comm* newcomm){
77   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
78      smpi_switch_data_segment(smpi_process()->index());
79    }
80   MPI_Group cp = new  Group(this->group());
81   (*newcomm) = new  Comm(cp, this->topo());
82   int ret = MPI_SUCCESS;
83
84   if (not attributes()->empty()) {
85     int flag;
86     void* value_out;
87     for(auto it : *attributes()){
88       smpi_key_elem elem = keyvals_.at(it.first);
89       if (elem != nullptr && elem->copy_fn.comm_copy_fn != MPI_NULL_COPY_FN) {
90         ret = elem->copy_fn.comm_copy_fn(this, it.first, nullptr, it.second, &value_out, &flag);
91         if (ret != MPI_SUCCESS) {
92           Comm::destroy(*newcomm);
93           *newcomm = MPI_COMM_NULL;
94           return ret;
95         }
96         if (flag){
97           elem->refcount++;
98           (*newcomm)->attributes()->insert({it.first, value_out});
99         }
100       }
101       }
102     }
103   return ret;
104 }
105
106 MPI_Group Comm::group()
107 {
108   if (this == MPI_COMM_UNINITIALIZED)
109     return smpi_process()->comm_world()->group();
110   return group_;
111 }
112
113 MPI_Topology Comm::topo() {
114   return topo_;
115 }
116
117 int Comm::size()
118 {
119   if (this == MPI_COMM_UNINITIALIZED)
120     return smpi_process()->comm_world()->size();
121   return group_->size();
122 }
123
124 int Comm::rank()
125 {
126   if (this == MPI_COMM_UNINITIALIZED)
127     return smpi_process()->comm_world()->rank();
128   return group_->rank(smpi_process()->index());
129 }
130
131 void Comm::get_name (char* name, int* len)
132 {
133   if (this == MPI_COMM_UNINITIALIZED){
134     smpi_process()->comm_world()->get_name(name, len);
135     return;
136   }
137   if(this == MPI_COMM_WORLD) {
138     strncpy(name, "WORLD",5);
139     *len = 5;
140   } else {
141     *len = snprintf(name, MPI_MAX_NAME_STRING, "%p", this);
142   }
143 }
144
145 void Comm::set_leaders_comm(MPI_Comm leaders){
146   if (this == MPI_COMM_UNINITIALIZED){
147     smpi_process()->comm_world()->set_leaders_comm(leaders);
148     return;
149   }
150   leaders_comm_=leaders;
151 }
152
153 void Comm::set_intra_comm(MPI_Comm leaders){
154   intra_comm_=leaders;
155 }
156
157 int* Comm::get_non_uniform_map(){
158   if (this == MPI_COMM_UNINITIALIZED)
159     return smpi_process()->comm_world()->get_non_uniform_map();
160   return non_uniform_map_;
161 }
162
163 int* Comm::get_leaders_map(){
164   if (this == MPI_COMM_UNINITIALIZED)
165     return smpi_process()->comm_world()->get_leaders_map();
166   return leaders_map_;
167 }
168
169 MPI_Comm Comm::get_leaders_comm(){
170   if (this == MPI_COMM_UNINITIALIZED)
171     return smpi_process()->comm_world()->get_leaders_comm();
172   return leaders_comm_;
173 }
174
175 MPI_Comm Comm::get_intra_comm(){
176   if (this == MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD)
177     return smpi_process()->comm_intra();
178   else return intra_comm_;
179 }
180
181 int Comm::is_uniform(){
182   if (this == MPI_COMM_UNINITIALIZED)
183     return smpi_process()->comm_world()->is_uniform();
184   return is_uniform_;
185 }
186
187 int Comm::is_blocked(){
188   if (this == MPI_COMM_UNINITIALIZED)
189     return smpi_process()->comm_world()->is_blocked();
190   return is_blocked_;
191 }
192
193 MPI_Comm Comm::split(int color, int key)
194 {
195   if (this == MPI_COMM_UNINITIALIZED)
196     return smpi_process()->comm_world()->split(color, key);
197   int system_tag = 123;
198   int* recvbuf;
199
200   MPI_Group group_root = nullptr;
201   MPI_Group group_out  = nullptr;
202   MPI_Group group      = this->group();
203   int rank             = this->rank();
204   int size             = this->size();
205   /* Gather all colors and keys on rank 0 */
206   int* sendbuf = xbt_new(int, 2);
207   sendbuf[0] = color;
208   sendbuf[1] = key;
209   if(rank == 0) {
210     recvbuf = xbt_new(int, 2 * size);
211   } else {
212     recvbuf = nullptr;
213   }
214   Coll_gather_default::gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, this);
215   xbt_free(sendbuf);
216   /* Do the actual job */
217   if(rank == 0) {
218     MPI_Group* group_snd = xbt_new(MPI_Group, size);
219     int* rankmap         = xbt_new(int, 2 * size);
220     for (int i = 0; i < size; i++) {
221       if (recvbuf[2 * i] != MPI_UNDEFINED) {
222         int count = 0;
223         for (int j = i + 1; j < size; j++) {
224           if(recvbuf[2 * i] == recvbuf[2 * j]) {
225             recvbuf[2 * j] = MPI_UNDEFINED;
226             rankmap[2 * count] = j;
227             rankmap[2 * count + 1] = recvbuf[2 * j + 1];
228             count++;
229           }
230         }
231         /* Add self in the group */
232         recvbuf[2 * i] = MPI_UNDEFINED;
233         rankmap[2 * count] = i;
234         rankmap[2 * count + 1] = recvbuf[2 * i + 1];
235         count++;
236         qsort(rankmap, count, 2 * sizeof(int), &smpi_compare_rankmap);
237         group_out = new  Group(count);
238         if (i == 0) {
239           group_root = group_out; /* Save root's group */
240         }
241         for (int j = 0; j < count; j++) {
242           int index = group->index(rankmap[2 * j]);
243           group_out->set_mapping(index, j);
244         }
245         MPI_Request* requests = xbt_new(MPI_Request, count);
246         int reqs              = 0;
247         for (int j = 0; j < count; j++) {
248           if(rankmap[2 * j] != 0) {
249             group_snd[reqs]=new  Group(group_out);
250             requests[reqs] = Request::isend(&(group_snd[reqs]), 1, MPI_PTR, rankmap[2 * j], system_tag, this);
251             reqs++;
252           }
253         }
254         if(i != 0 && group_out != MPI_COMM_WORLD->group() && group_out != MPI_GROUP_EMPTY)
255           Group::unref(group_out);
256
257         Request::waitall(reqs, requests, MPI_STATUS_IGNORE);
258         xbt_free(requests);
259       }
260     }
261     xbt_free(recvbuf);
262     xbt_free(rankmap);
263     xbt_free(group_snd);
264     group_out = group_root; /* exit with root's group */
265   } else {
266     if(color != MPI_UNDEFINED) {
267       Request::recv(&group_out, 1, MPI_PTR, 0, system_tag, this, MPI_STATUS_IGNORE);
268     } /* otherwise, exit with group_out == nullptr */
269   }
270   return group_out!=nullptr ? new  Comm(group_out, nullptr) : MPI_COMM_NULL;
271 }
272
273 void Comm::ref(){
274   if (this == MPI_COMM_UNINITIALIZED){
275     smpi_process()->comm_world()->ref();
276     return;
277   }
278   group_->ref();
279   refcount_++;
280 }
281
282 void Comm::cleanup_smp(){
283   if (intra_comm_ != MPI_COMM_NULL)
284     Comm::unref(intra_comm_);
285   if (leaders_comm_ != MPI_COMM_NULL)
286     Comm::unref(leaders_comm_);
287   if (non_uniform_map_ != nullptr)
288     xbt_free(non_uniform_map_);
289   if (leaders_map_ != nullptr)
290     xbt_free(leaders_map_);
291 }
292
293 void Comm::unref(Comm* comm){
294   if (comm == MPI_COMM_UNINITIALIZED){
295     Comm::unref(smpi_process()->comm_world());
296     return;
297   }
298   comm->refcount_--;
299   Group::unref(comm->group_);
300
301   if(comm->refcount_==0){
302     comm->cleanup_smp();
303     comm->cleanup_attr<Comm>();
304     delete comm;
305   }
306 }
307
308 static int compare_ints (const void *a, const void *b)
309 {
310   const int *da = static_cast<const int *>(a);
311   const int *db = static_cast<const int *>(b);
312
313   return static_cast<int>(*da > *db) - static_cast<int>(*da < *db);
314 }
315
316 void Comm::init_smp(){
317   int leader = -1;
318
319   if (this == MPI_COMM_UNINITIALIZED)
320     smpi_process()->comm_world()->init_smp();
321
322   int comm_size = this->size();
323
324   // If we are in replay - perform an ugly hack
325   // tell SimGrid we are not in replay for a while, because we need the buffers to be copied for the following calls
326   bool replaying = false; //cache data to set it back again after
327   if(smpi_process()->replaying()){
328    replaying=true;
329    smpi_process()->set_replaying(false);
330   }
331
332   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
333      smpi_switch_data_segment(smpi_process()->index());
334    }
335   //identify neighbours in comm
336   //get the indexes of all processes sharing the same simix host
337    xbt_swag_t process_list = sg_host_self()->extension<simgrid::simix::Host>()->process_list;
338    int intra_comm_size     = 0;
339    int min_index           = INT_MAX; // the minimum index will be the leader
340    smx_actor_t actor       = nullptr;
341    xbt_swag_foreach(actor, process_list)
342    {
343      int index = actor->pid - 1;
344
345      if (this->group()->rank(index) != MPI_UNDEFINED) {
346        intra_comm_size++;
347        // the process is in the comm
348        if (index < min_index)
349          min_index = index;
350      }
351   }
352   XBT_DEBUG("number of processes deployed on my node : %d", intra_comm_size);
353   MPI_Group group_intra = new  Group(intra_comm_size);
354   int i = 0;
355   actor = nullptr;
356   xbt_swag_foreach(actor, process_list) {
357     int index = actor->pid -1;
358     if(this->group()->rank(index)!=MPI_UNDEFINED){
359       group_intra->set_mapping(index, i);
360       i++;
361     }
362   }
363
364   MPI_Comm comm_intra = new  Comm(group_intra, nullptr);
365   leader=min_index;
366
367   int * leaders_map= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
368   int * leader_list= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
369   for(i=0; i<comm_size; i++){
370       leader_list[i]=-1;
371   }
372
373   Coll_allgather_mpich::allgather(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, this);
374
375   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
376      smpi_switch_data_segment(smpi_process()->index());
377    }
378
379   if(leaders_map_==nullptr){
380     leaders_map_= leaders_map;
381   }else{
382     xbt_free(leaders_map);
383   }
384   int j=0;
385   int leader_group_size = 0;
386   for(i=0; i<comm_size; i++){
387       int already_done=0;
388       for(j=0;j<leader_group_size; j++){
389         if(leaders_map_[i]==leader_list[j]){
390             already_done=1;
391         }
392       }
393       if(already_done==0){
394         leader_list[leader_group_size]=leaders_map_[i];
395         leader_group_size++;
396       }
397   }
398   qsort(leader_list, leader_group_size, sizeof(int),compare_ints);
399
400   MPI_Group leaders_group = new  Group(leader_group_size);
401
402   MPI_Comm leader_comm = MPI_COMM_NULL;
403   if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && this!=MPI_COMM_WORLD){
404     //create leader_communicator
405     for (i=0; i< leader_group_size;i++)
406       leaders_group->set_mapping(leader_list[i], i);
407     leader_comm = new  Comm(leaders_group, nullptr);
408     this->set_leaders_comm(leader_comm);
409     this->set_intra_comm(comm_intra);
410
411    //create intracommunicator
412   }else{
413     for (i=0; i< leader_group_size;i++)
414       leaders_group->set_mapping(leader_list[i], i);
415
416     if(this->get_leaders_comm()==MPI_COMM_NULL){
417       leader_comm = new  Comm(leaders_group, nullptr);
418       this->set_leaders_comm(leader_comm);
419     }else{
420       leader_comm=this->get_leaders_comm();
421       Group::unref(leaders_group);
422     }
423     smpi_process()->set_comm_intra(comm_intra);
424   }
425
426   int is_uniform = 1;
427
428   // Are the nodes uniform ? = same number of process/node
429   int my_local_size=comm_intra->size();
430   if(comm_intra->rank()==0) {
431     int* non_uniform_map = xbt_new0(int,leader_group_size);
432     Coll_allgather_mpich::allgather(&my_local_size, 1, MPI_INT,
433         non_uniform_map, 1, MPI_INT, leader_comm);
434     for(i=0; i < leader_group_size; i++) {
435       if(non_uniform_map[0] != non_uniform_map[i]) {
436         is_uniform = 0;
437         break;
438       }
439     }
440     if(is_uniform==0 && this->is_uniform()!=0){
441         non_uniform_map_= non_uniform_map;
442     }else{
443         xbt_free(non_uniform_map);
444     }
445     is_uniform_=is_uniform;
446   }
447   Coll_bcast_mpich::bcast(&(is_uniform_),1, MPI_INT, 0, comm_intra );
448
449   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
450      smpi_switch_data_segment(smpi_process()->index());
451    }
452   // Are the ranks blocked ? = allocated contiguously on the SMP nodes
453   int is_blocked=1;
454   int prev=this->group()->rank(comm_intra->group()->index(0));
455     for (i=1; i<my_local_size; i++){
456       int that=this->group()->rank(comm_intra->group()->index(i));
457       if(that!=prev+1){
458         is_blocked=0;
459         break;
460       }
461       prev = that;
462   }
463
464   int global_blocked;
465   Coll_allreduce_default::allreduce(&is_blocked, &(global_blocked), 1, MPI_INT, MPI_LAND, this);
466
467   if(MPI_COMM_WORLD==MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD){
468     if(this->rank()==0){
469         is_blocked_=global_blocked;
470     }
471   }else{
472     is_blocked_=global_blocked;
473   }
474   xbt_free(leader_list);
475
476   if(replaying)
477     smpi_process()->set_replaying(true);
478 }
479
480 MPI_Comm Comm::f2c(int id) {
481   if(id == -2) {
482     return MPI_COMM_SELF;
483   } else if(id==0){
484     return MPI_COMM_WORLD;
485   } else if(F2C::f2c_lookup() != nullptr && id >= 0) {
486       char key[KEY_SIZE];
487       MPI_Comm tmp =  static_cast<MPI_Comm>(xbt_dict_get_or_null(F2C::f2c_lookup(),get_key_id(key, id)));
488       return tmp != nullptr ? tmp : MPI_COMM_NULL ;
489   } else {
490     return MPI_COMM_NULL;
491   }
492 }
493
494 void Comm::free_f(int id) {
495   char key[KEY_SIZE];
496   xbt_dict_remove(F2C::f2c_lookup(), id==0? get_key(key, id) : get_key_id(key, id));
497 }
498
499 int Comm::add_f() {
500   if(F2C::f2c_lookup()==nullptr){
501     F2C::set_f2c_lookup(xbt_dict_new_homogeneous(nullptr));
502   }
503   char key[KEY_SIZE];
504   xbt_dict_set(F2C::f2c_lookup(), this==MPI_COMM_WORLD? get_key(key, F2C::f2c_id()) : get_key_id(key,F2C::f2c_id()), this, nullptr);
505   f2c_id_increment();
506   return F2C::f2c_id()-1;
507 }
508
509
510 void Comm::add_rma_win(MPI_Win win){
511   rma_wins_.push_back(win);
512 }
513
514 void Comm::remove_rma_win(MPI_Win win){
515   rma_wins_.remove(win);
516 }
517
518 void Comm::finish_rma_calls(){
519   for(auto it : rma_wins_){
520     if(it->rank()==this->rank()){//is it ours (for MPI_COMM_WORLD)?
521       int finished = it->finish_comms();
522       XBT_DEBUG("Barrier for rank %d - Finished %d RMA calls",this->rank(), finished);
523     }
524   }
525 }
526
527
528 }
529 }
530
531