Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use std::sort instead of qsort in C++ files (easy part).
[simgrid.git] / src / smpi / mpi / smpi_comm.cpp
1 /* Copyright (c) 2010-2017. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "smpi_comm.hpp"
7 #include "private.h"
8 #include "private.hpp"
9 #include "simgrid/s4u/Host.hpp"
10 #include "smpi_coll.hpp"
11 #include "smpi_datatype.hpp"
12 #include "smpi_process.hpp"
13 #include "smpi_request.hpp"
14 #include "smpi_status.hpp"
15 #include "smpi_win.hpp"
16 #include "src/simix/smx_private.h"
17 #include <algorithm>
18 #include <climits>
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi, "Logging specific to SMPI (comm)");
21
22  simgrid::smpi::Comm mpi_MPI_COMM_UNINITIALIZED;
23 MPI_Comm MPI_COMM_UNINITIALIZED=&mpi_MPI_COMM_UNINITIALIZED;
24
25 /* Support for cartesian topology was added, but there are 2 other types of topology, graph et dist graph. In order to
26  * support them, we have to add a field SMPI_Topo_type, and replace the MPI_Topology field by an union. */
27
28 static int smpi_compare_rankmap(const void *a, const void *b)
29 {
30   const int* x = static_cast<const int*>(a);
31   const int* y = static_cast<const int*>(b);
32
33   if (x[1] < y[1]) {
34     return -1;
35   }
36   if (x[1] == y[1]) {
37     if (x[0] < y[0]) {
38       return -1;
39     }
40     if (x[0] == y[0]) {
41       return 0;
42     }
43     return 1;
44   }
45   return 1;
46 }
47
48 namespace simgrid{
49 namespace smpi{
50
51 std::unordered_map<int, smpi_key_elem> Comm::keyvals_;
52 int Comm::keyval_id_=0;
53
54 Comm::Comm(MPI_Group group, MPI_Topology topo) : group_(group), topo_(topo)
55 {
56   refcount_=1;
57   topoType_ = MPI_INVALID_TOPO;
58   intra_comm_ = MPI_COMM_NULL;
59   leaders_comm_ = MPI_COMM_NULL;
60   is_uniform_=1;
61   non_uniform_map_ = nullptr;
62   leaders_map_ = nullptr;
63   is_blocked_=0;
64 }
65
66 void Comm::destroy(Comm* comm)
67 {
68   if (comm == MPI_COMM_UNINITIALIZED){
69     Comm::destroy(smpi_process()->comm_world());
70     return;
71   }
72   delete comm->topo_; // there's no use count on topos
73   Comm::unref(comm);
74 }
75
76 int Comm::dup(MPI_Comm* newcomm){
77   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
78      smpi_switch_data_segment(smpi_process()->index());
79    }
80   MPI_Group cp = new  Group(this->group());
81   (*newcomm) = new  Comm(cp, this->topo());
82   int ret = MPI_SUCCESS;
83
84   if (not attributes()->empty()) {
85     int flag;
86     void* value_out;
87     for(auto it : *attributes()){
88       smpi_key_elem elem = keyvals_.at(it.first);
89       if (elem != nullptr && elem->copy_fn.comm_copy_fn != MPI_NULL_COPY_FN) {
90         ret = elem->copy_fn.comm_copy_fn(this, it.first, nullptr, it.second, &value_out, &flag);
91         if (ret != MPI_SUCCESS) {
92           Comm::destroy(*newcomm);
93           *newcomm = MPI_COMM_NULL;
94           return ret;
95         }
96         if (flag){
97           elem->refcount++;
98           (*newcomm)->attributes()->insert({it.first, value_out});
99         }
100       }
101       }
102     }
103   return ret;
104 }
105
106 MPI_Group Comm::group()
107 {
108   if (this == MPI_COMM_UNINITIALIZED)
109     return smpi_process()->comm_world()->group();
110   return group_;
111 }
112
113 MPI_Topology Comm::topo() {
114   return topo_;
115 }
116
117 int Comm::size()
118 {
119   if (this == MPI_COMM_UNINITIALIZED)
120     return smpi_process()->comm_world()->size();
121   return group_->size();
122 }
123
124 int Comm::rank()
125 {
126   if (this == MPI_COMM_UNINITIALIZED)
127     return smpi_process()->comm_world()->rank();
128   return group_->rank(smpi_process()->index());
129 }
130
131 void Comm::get_name (char* name, int* len)
132 {
133   if (this == MPI_COMM_UNINITIALIZED){
134     smpi_process()->comm_world()->get_name(name, len);
135     return;
136   }
137   if(this == MPI_COMM_WORLD) {
138     strncpy(name, "WORLD",5);
139     *len = 5;
140   } else {
141     *len = snprintf(name, MPI_MAX_NAME_STRING, "%p", this);
142   }
143 }
144
145 void Comm::set_leaders_comm(MPI_Comm leaders){
146   if (this == MPI_COMM_UNINITIALIZED){
147     smpi_process()->comm_world()->set_leaders_comm(leaders);
148     return;
149   }
150   leaders_comm_=leaders;
151 }
152
153 void Comm::set_intra_comm(MPI_Comm leaders){
154   intra_comm_=leaders;
155 }
156
157 int* Comm::get_non_uniform_map(){
158   if (this == MPI_COMM_UNINITIALIZED)
159     return smpi_process()->comm_world()->get_non_uniform_map();
160   return non_uniform_map_;
161 }
162
163 int* Comm::get_leaders_map(){
164   if (this == MPI_COMM_UNINITIALIZED)
165     return smpi_process()->comm_world()->get_leaders_map();
166   return leaders_map_;
167 }
168
169 MPI_Comm Comm::get_leaders_comm(){
170   if (this == MPI_COMM_UNINITIALIZED)
171     return smpi_process()->comm_world()->get_leaders_comm();
172   return leaders_comm_;
173 }
174
175 MPI_Comm Comm::get_intra_comm(){
176   if (this == MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD)
177     return smpi_process()->comm_intra();
178   else return intra_comm_;
179 }
180
181 int Comm::is_uniform(){
182   if (this == MPI_COMM_UNINITIALIZED)
183     return smpi_process()->comm_world()->is_uniform();
184   return is_uniform_;
185 }
186
187 int Comm::is_blocked(){
188   if (this == MPI_COMM_UNINITIALIZED)
189     return smpi_process()->comm_world()->is_blocked();
190   return is_blocked_;
191 }
192
193 MPI_Comm Comm::split(int color, int key)
194 {
195   if (this == MPI_COMM_UNINITIALIZED)
196     return smpi_process()->comm_world()->split(color, key);
197   int system_tag = 123;
198   int* recvbuf;
199
200   MPI_Group group_root = nullptr;
201   MPI_Group group_out  = nullptr;
202   MPI_Group group      = this->group();
203   int rank             = this->rank();
204   int size             = this->size();
205   /* Gather all colors and keys on rank 0 */
206   int* sendbuf = xbt_new(int, 2);
207   sendbuf[0] = color;
208   sendbuf[1] = key;
209   if(rank == 0) {
210     recvbuf = xbt_new(int, 2 * size);
211   } else {
212     recvbuf = nullptr;
213   }
214   Coll_gather_default::gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, this);
215   xbt_free(sendbuf);
216   /* Do the actual job */
217   if(rank == 0) {
218     MPI_Group* group_snd = xbt_new(MPI_Group, size);
219     int* rankmap         = xbt_new(int, 2 * size);
220     for (int i = 0; i < size; i++) {
221       if (recvbuf[2 * i] != MPI_UNDEFINED) {
222         int count = 0;
223         for (int j = i + 1; j < size; j++) {
224           if(recvbuf[2 * i] == recvbuf[2 * j]) {
225             recvbuf[2 * j] = MPI_UNDEFINED;
226             rankmap[2 * count] = j;
227             rankmap[2 * count + 1] = recvbuf[2 * j + 1];
228             count++;
229           }
230         }
231         /* Add self in the group */
232         recvbuf[2 * i] = MPI_UNDEFINED;
233         rankmap[2 * count] = i;
234         rankmap[2 * count + 1] = recvbuf[2 * i + 1];
235         count++;
236         qsort(rankmap, count, 2 * sizeof(int), &smpi_compare_rankmap);
237         group_out = new  Group(count);
238         if (i == 0) {
239           group_root = group_out; /* Save root's group */
240         }
241         for (int j = 0; j < count; j++) {
242           int index = group->index(rankmap[2 * j]);
243           group_out->set_mapping(index, j);
244         }
245         MPI_Request* requests = xbt_new(MPI_Request, count);
246         int reqs              = 0;
247         for (int j = 0; j < count; j++) {
248           if(rankmap[2 * j] != 0) {
249             group_snd[reqs]=new  Group(group_out);
250             requests[reqs] = Request::isend(&(group_snd[reqs]), 1, MPI_PTR, rankmap[2 * j], system_tag, this);
251             reqs++;
252           }
253         }
254         if(i != 0 && group_out != MPI_COMM_WORLD->group() && group_out != MPI_GROUP_EMPTY)
255           Group::unref(group_out);
256
257         Request::waitall(reqs, requests, MPI_STATUS_IGNORE);
258         xbt_free(requests);
259       }
260     }
261     xbt_free(recvbuf);
262     xbt_free(rankmap);
263     xbt_free(group_snd);
264     group_out = group_root; /* exit with root's group */
265   } else {
266     if(color != MPI_UNDEFINED) {
267       Request::recv(&group_out, 1, MPI_PTR, 0, system_tag, this, MPI_STATUS_IGNORE);
268     } /* otherwise, exit with group_out == nullptr */
269   }
270   return group_out!=nullptr ? new  Comm(group_out, nullptr) : MPI_COMM_NULL;
271 }
272
273 void Comm::ref(){
274   if (this == MPI_COMM_UNINITIALIZED){
275     smpi_process()->comm_world()->ref();
276     return;
277   }
278   group_->ref();
279   refcount_++;
280 }
281
282 void Comm::cleanup_smp(){
283   if (intra_comm_ != MPI_COMM_NULL)
284     Comm::unref(intra_comm_);
285   if (leaders_comm_ != MPI_COMM_NULL)
286     Comm::unref(leaders_comm_);
287   if (non_uniform_map_ != nullptr)
288     xbt_free(non_uniform_map_);
289   if (leaders_map_ != nullptr)
290     xbt_free(leaders_map_);
291 }
292
293 void Comm::unref(Comm* comm){
294   if (comm == MPI_COMM_UNINITIALIZED){
295     Comm::unref(smpi_process()->comm_world());
296     return;
297   }
298   comm->refcount_--;
299   Group::unref(comm->group_);
300
301   if(comm->refcount_==0){
302     comm->cleanup_smp();
303     comm->cleanup_attr<Comm>();
304     delete comm;
305   }
306 }
307
308 void Comm::init_smp(){
309   int leader = -1;
310
311   if (this == MPI_COMM_UNINITIALIZED)
312     smpi_process()->comm_world()->init_smp();
313
314   int comm_size = this->size();
315
316   // If we are in replay - perform an ugly hack
317   // tell SimGrid we are not in replay for a while, because we need the buffers to be copied for the following calls
318   bool replaying = false; //cache data to set it back again after
319   if(smpi_process()->replaying()){
320    replaying=true;
321    smpi_process()->set_replaying(false);
322   }
323
324   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
325      smpi_switch_data_segment(smpi_process()->index());
326    }
327   //identify neighbours in comm
328   //get the indexes of all processes sharing the same simix host
329    xbt_swag_t process_list = sg_host_self()->extension<simgrid::simix::Host>()->process_list;
330    int intra_comm_size     = 0;
331    int min_index           = INT_MAX; // the minimum index will be the leader
332    smx_actor_t actor       = nullptr;
333    xbt_swag_foreach(actor, process_list)
334    {
335      int index = actor->pid - 1;
336
337      if (this->group()->rank(index) != MPI_UNDEFINED) {
338        intra_comm_size++;
339        // the process is in the comm
340        if (index < min_index)
341          min_index = index;
342      }
343   }
344   XBT_DEBUG("number of processes deployed on my node : %d", intra_comm_size);
345   MPI_Group group_intra = new  Group(intra_comm_size);
346   int i = 0;
347   actor = nullptr;
348   xbt_swag_foreach(actor, process_list) {
349     int index = actor->pid -1;
350     if(this->group()->rank(index)!=MPI_UNDEFINED){
351       group_intra->set_mapping(index, i);
352       i++;
353     }
354   }
355
356   MPI_Comm comm_intra = new  Comm(group_intra, nullptr);
357   leader=min_index;
358
359   int * leaders_map= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
360   int * leader_list= static_cast<int*>(xbt_malloc0(sizeof(int)*comm_size));
361   for(i=0; i<comm_size; i++){
362       leader_list[i]=-1;
363   }
364
365   Coll_allgather_mpich::allgather(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, this);
366
367   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
368      smpi_switch_data_segment(smpi_process()->index());
369    }
370
371   if(leaders_map_==nullptr){
372     leaders_map_= leaders_map;
373   }else{
374     xbt_free(leaders_map);
375   }
376   int j=0;
377   int leader_group_size = 0;
378   for(i=0; i<comm_size; i++){
379       int already_done=0;
380       for(j=0;j<leader_group_size; j++){
381         if(leaders_map_[i]==leader_list[j]){
382             already_done=1;
383         }
384       }
385       if(already_done==0){
386         leader_list[leader_group_size]=leaders_map_[i];
387         leader_group_size++;
388       }
389   }
390   std::sort(leader_list, leader_list + leader_group_size);
391
392   MPI_Group leaders_group = new  Group(leader_group_size);
393
394   MPI_Comm leader_comm = MPI_COMM_NULL;
395   if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && this!=MPI_COMM_WORLD){
396     //create leader_communicator
397     for (i=0; i< leader_group_size;i++)
398       leaders_group->set_mapping(leader_list[i], i);
399     leader_comm = new  Comm(leaders_group, nullptr);
400     this->set_leaders_comm(leader_comm);
401     this->set_intra_comm(comm_intra);
402
403    //create intracommunicator
404   }else{
405     for (i=0; i< leader_group_size;i++)
406       leaders_group->set_mapping(leader_list[i], i);
407
408     if(this->get_leaders_comm()==MPI_COMM_NULL){
409       leader_comm = new  Comm(leaders_group, nullptr);
410       this->set_leaders_comm(leader_comm);
411     }else{
412       leader_comm=this->get_leaders_comm();
413       Group::unref(leaders_group);
414     }
415     smpi_process()->set_comm_intra(comm_intra);
416   }
417
418   // Are the nodes uniform ? = same number of process/node
419   int my_local_size=comm_intra->size();
420   if(comm_intra->rank()==0) {
421     int is_uniform       = 1;
422     int* non_uniform_map = xbt_new0(int,leader_group_size);
423     Coll_allgather_mpich::allgather(&my_local_size, 1, MPI_INT,
424         non_uniform_map, 1, MPI_INT, leader_comm);
425     for(i=0; i < leader_group_size; i++) {
426       if(non_uniform_map[0] != non_uniform_map[i]) {
427         is_uniform = 0;
428         break;
429       }
430     }
431     if(is_uniform==0 && this->is_uniform()!=0){
432         non_uniform_map_= non_uniform_map;
433     }else{
434         xbt_free(non_uniform_map);
435     }
436     is_uniform_=is_uniform;
437   }
438   Coll_bcast_mpich::bcast(&(is_uniform_),1, MPI_INT, 0, comm_intra );
439
440   if(smpi_privatize_global_variables == SMPI_PRIVATIZE_MMAP){ //we need to switch as the called function may silently touch global variables
441      smpi_switch_data_segment(smpi_process()->index());
442    }
443   // Are the ranks blocked ? = allocated contiguously on the SMP nodes
444   int is_blocked=1;
445   int prev=this->group()->rank(comm_intra->group()->index(0));
446     for (i=1; i<my_local_size; i++){
447       int that=this->group()->rank(comm_intra->group()->index(i));
448       if(that!=prev+1){
449         is_blocked=0;
450         break;
451       }
452       prev = that;
453   }
454
455   int global_blocked;
456   Coll_allreduce_default::allreduce(&is_blocked, &(global_blocked), 1, MPI_INT, MPI_LAND, this);
457
458   if(MPI_COMM_WORLD==MPI_COMM_UNINITIALIZED || this==MPI_COMM_WORLD){
459     if(this->rank()==0){
460         is_blocked_=global_blocked;
461     }
462   }else{
463     is_blocked_=global_blocked;
464   }
465   xbt_free(leader_list);
466
467   if(replaying)
468     smpi_process()->set_replaying(true);
469 }
470
471 MPI_Comm Comm::f2c(int id) {
472   if(id == -2) {
473     return MPI_COMM_SELF;
474   } else if(id==0){
475     return MPI_COMM_WORLD;
476   } else if(F2C::f2c_lookup() != nullptr && id >= 0) {
477       char key[KEY_SIZE];
478       MPI_Comm tmp =  static_cast<MPI_Comm>(xbt_dict_get_or_null(F2C::f2c_lookup(),get_key_id(key, id)));
479       return tmp != nullptr ? tmp : MPI_COMM_NULL ;
480   } else {
481     return MPI_COMM_NULL;
482   }
483 }
484
485 void Comm::free_f(int id) {
486   char key[KEY_SIZE];
487   xbt_dict_remove(F2C::f2c_lookup(), id==0? get_key(key, id) : get_key_id(key, id));
488 }
489
490 int Comm::add_f() {
491   if(F2C::f2c_lookup()==nullptr){
492     F2C::set_f2c_lookup(xbt_dict_new_homogeneous(nullptr));
493   }
494   char key[KEY_SIZE];
495   xbt_dict_set(F2C::f2c_lookup(), this==MPI_COMM_WORLD? get_key(key, F2C::f2c_id()) : get_key_id(key,F2C::f2c_id()), this, nullptr);
496   f2c_id_increment();
497   return F2C::f2c_id()-1;
498 }
499
500
501 void Comm::add_rma_win(MPI_Win win){
502   rma_wins_.push_back(win);
503 }
504
505 void Comm::remove_rma_win(MPI_Win win){
506   rma_wins_.remove(win);
507 }
508
509 void Comm::finish_rma_calls(){
510   for(auto it : rma_wins_){
511     if(it->rank()==this->rank()){//is it ours (for MPI_COMM_WORLD)?
512       int finished = it->finish_comms();
513       XBT_DEBUG("Barrier for rank %d - Finished %d RMA calls",this->rank(), finished);
514     }
515   }
516 }
517
518
519 }
520 }
521
522