Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
7ed1a3253c9978dd38e625ccc0448fc8b8b066c8
[simgrid.git] / src / smpi / smpi_comm.c
1 /* Copyright (c) 2010-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdlib.h>
8
9 #include "private.h"
10 #include "xbt/dict.h"
11 #include "smpi_mpi_dt_private.h"
12 #include "limits.h"
13 #include "simix/smx_private.h"
14 #include "colls/colls.h"
15 #include "xbt/ex.h"
16
17
18 extern xbt_dict_t smpi_keyvals;
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi,
20                                 "Logging specific to SMPI (comm)");
21
22
23
24 /* Support for cartesian topology was added, but there are 2 other types of
25  * topology, graph et dist graph. In order to support them, we have to add a
26  * field MPIR_Topo_type, and replace the MPI_Topology field by an union. */
27
28 typedef struct s_smpi_mpi_communicator {
29   MPI_Group group;
30   MPIR_Topo_type topoType; 
31   MPI_Topology topo; // to be replaced by an union
32   int refcount;
33   MPI_Comm leaders_comm;//inter-node communicator
34   MPI_Comm intra_comm;//intra-node communicator . For MPI_COMM_WORLD this can't be used, as var is global.
35   //use an intracomm stored in the process data instead
36   int* leaders_map; //who is the leader of each process
37   int is_uniform;
38   int* non_uniform_map; //set if smp nodes have a different number of processes allocated
39   int is_blocked;// are ranks allocated on the same smp node contiguous ?
40   xbt_dict_t attributes;
41 } s_smpi_mpi_communicator_t;
42
43 static int smpi_compare_rankmap(const void *a, const void *b)
44 {
45   const int* x = (const int*)a;
46   const int* y = (const int*)b;
47
48   if (x[1] < y[1]) {
49     return -1;
50   }
51   if (x[1] == y[1]) {
52     if (x[0] < y[0]) {
53       return -1;
54     }
55     if (x[0] == y[0]) {
56       return 0;
57     }
58     return 1;
59   }
60   return 1;
61 }
62
63 MPI_Comm smpi_comm_new(MPI_Group group, MPI_Topology topo)
64 {
65   MPI_Comm comm;
66
67   comm = xbt_new(s_smpi_mpi_communicator_t, 1);
68   comm->group = group;
69   smpi_group_use(comm->group);
70   comm->refcount=1;
71   comm->topoType = -1;
72   comm->topo = topo;
73   comm->intra_comm = MPI_COMM_NULL;
74   comm->leaders_comm = MPI_COMM_NULL;
75   comm->is_uniform=1;
76   comm->non_uniform_map = NULL;
77   comm->leaders_map = NULL;
78   comm->is_blocked=0;
79   comm->attributes=NULL;
80   return comm;
81 }
82
83 void smpi_comm_destroy(MPI_Comm comm)
84 {
85   if (comm == MPI_COMM_UNINITIALIZED)
86     comm = smpi_process_comm_world();
87   smpi_group_unuse(comm->group);
88   smpi_topo_destroy(comm->topo); // there's no use count on topos
89   smpi_comm_unuse(comm);
90 }
91
92 MPI_Comm smpi_comm_dup(MPI_Comm comm){
93   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
94      smpi_switch_data_segment(smpi_process_index());
95    }
96   MPI_Comm newcomm = smpi_comm_new(smpi_comm_group(comm), smpi_comm_topo(comm));
97
98   if(comm->attributes !=NULL){
99       newcomm->attributes=xbt_dict_new();
100       xbt_dict_cursor_t cursor = NULL;
101       int *key;
102       int flag;
103       void* value_in;
104       void* value_out;
105       xbt_dict_foreach(comm->attributes, cursor, key, value_in){
106         smpi_key_elem elem = xbt_dict_get_or_null(smpi_keyvals, (const char*)key);
107         if(elem && elem->copy_fn!=MPI_NULL_COPY_FN){
108           elem->copy_fn(comm, *key, NULL, value_in, &value_out, &flag );
109           if(flag)
110             xbt_dict_set(newcomm->attributes, (const char*)key,value_out, NULL);
111         }
112       }
113     }
114   return newcomm;
115 }
116
117
118 MPI_Group smpi_comm_group(MPI_Comm comm)
119 {
120   if (comm == MPI_COMM_UNINITIALIZED)
121     comm = smpi_process_comm_world();
122
123   return comm->group;
124 }
125
126 MPI_Topology smpi_comm_topo(MPI_Comm comm) {
127   if (comm != MPI_COMM_NULL)
128     return comm->topo;
129   return NULL;
130 }
131
132 int smpi_comm_size(MPI_Comm comm)
133 {
134   if (comm == MPI_COMM_UNINITIALIZED)
135     comm = smpi_process_comm_world();
136
137   return smpi_group_size(smpi_comm_group(comm));
138 }
139
140 int smpi_comm_rank(MPI_Comm comm)
141 {
142   if (comm == MPI_COMM_UNINITIALIZED)
143     comm = smpi_process_comm_world();
144   return smpi_group_rank(smpi_comm_group(comm), smpi_process_index());
145 }
146
147 void smpi_comm_get_name (MPI_Comm comm, char* name, int* len)
148 {
149   if (comm == MPI_COMM_UNINITIALIZED)
150     comm = smpi_process_comm_world();
151   if(comm == MPI_COMM_WORLD) {
152     strcpy(name, "WORLD");
153     *len = 5;
154   } else {
155     *len = snprintf(name, MPI_MAX_NAME_STRING, "%p", comm);
156   }
157 }
158
159 void smpi_comm_set_leaders_comm(MPI_Comm comm, MPI_Comm leaders){
160   if (comm == MPI_COMM_UNINITIALIZED)
161     comm = smpi_process_comm_world();
162   comm->leaders_comm=leaders;
163 }
164
165 void smpi_comm_set_intra_comm(MPI_Comm comm, MPI_Comm leaders){
166   comm->intra_comm=leaders;
167 }
168
169 int* smpi_comm_get_non_uniform_map(MPI_Comm comm){
170   if (comm == MPI_COMM_UNINITIALIZED)
171     comm = smpi_process_comm_world();
172   return comm->non_uniform_map;
173 }
174
175 int* smpi_comm_get_leaders_map(MPI_Comm comm){
176   if (comm == MPI_COMM_UNINITIALIZED)
177     comm = smpi_process_comm_world();
178   return comm->leaders_map;
179 }
180
181 MPI_Comm smpi_comm_get_leaders_comm(MPI_Comm comm){
182   if (comm == MPI_COMM_UNINITIALIZED)
183     comm = smpi_process_comm_world();
184   return comm->leaders_comm;
185 }
186
187 MPI_Comm smpi_comm_get_intra_comm(MPI_Comm comm){
188   if (comm == MPI_COMM_UNINITIALIZED || comm==MPI_COMM_WORLD) 
189     return smpi_process_get_comm_intra();
190   else return comm->intra_comm;
191 }
192
193 int smpi_comm_is_uniform(MPI_Comm comm){
194   if (comm == MPI_COMM_UNINITIALIZED)
195     comm = smpi_process_comm_world();
196   return comm->is_uniform;
197 }
198
199 int smpi_comm_is_blocked(MPI_Comm comm){
200   if (comm == MPI_COMM_UNINITIALIZED)
201     comm = smpi_process_comm_world();
202   return comm->is_blocked;
203 }
204
205 MPI_Comm smpi_comm_split(MPI_Comm comm, int color, int key)
206 {
207   if (comm == MPI_COMM_UNINITIALIZED)
208     comm = smpi_process_comm_world();
209   int system_tag = 123;
210   int index, rank, size, i, j, count, reqs;
211   int* sendbuf;
212   int* recvbuf;
213   int* rankmap;
214   MPI_Group group, group_root, group_out;
215   MPI_Request* requests;
216
217   group_root = group_out = NULL;
218   group = smpi_comm_group(comm);
219   rank = smpi_comm_rank(comm);
220   size = smpi_comm_size(comm);
221   /* Gather all colors and keys on rank 0 */
222   sendbuf = xbt_new(int, 2);
223   sendbuf[0] = color;
224   sendbuf[1] = key;
225   if(rank == 0) {
226     recvbuf = xbt_new(int, 2 * size);
227   } else {
228     recvbuf = NULL;
229   }
230   smpi_mpi_gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, comm);
231   xbt_free(sendbuf);
232   /* Do the actual job */
233   if(rank == 0) {
234     rankmap = xbt_new(int, 2 * size);
235     for(i = 0; i < size; i++) {
236       if(recvbuf[2 * i] == MPI_UNDEFINED) {
237         continue;
238       }
239       count = 0;
240       for(j = i + 1; j < size; j++)  {
241         if(recvbuf[2 * i] == recvbuf[2 * j]) {
242           recvbuf[2 * j] = MPI_UNDEFINED;
243           rankmap[2 * count] = j;
244           rankmap[2 * count + 1] = recvbuf[2 * j + 1];
245           count++;
246         }
247       }
248       /* Add self in the group */
249       recvbuf[2 * i] = MPI_UNDEFINED;
250       rankmap[2 * count] = i;
251       rankmap[2 * count + 1] = recvbuf[2 * i + 1];
252       count++;
253       qsort(rankmap, count, 2 * sizeof(int), &smpi_compare_rankmap);
254       group_out = smpi_group_new(count);
255       if(i == 0) {
256         group_root = group_out; /* Save root's group */
257       }
258       for(j = 0; j < count; j++) {
259         //increment refcounter in order to avoid freeing the group too quick before copy
260         index = smpi_group_index(group, rankmap[2 * j]);
261         smpi_group_set_mapping(group_out, index, j);
262       }
263       requests = xbt_new(MPI_Request, count);
264       reqs = 0;
265       for(j = 0; j < count; j++) {
266         if(rankmap[2 * j] != 0) {
267           requests[reqs] = smpi_isend_init(&group_out, 1, MPI_PTR, rankmap[2 * j], system_tag, comm);
268           reqs++;
269         }
270       }
271       smpi_mpi_startall(reqs, requests);
272       smpi_mpi_waitall(reqs, requests, MPI_STATUS_IGNORE);
273       xbt_free(requests);
274     }
275     xbt_free(recvbuf);
276     group_out = group_root; /* exit with root's group */
277   } else {
278     if(color != MPI_UNDEFINED) {
279       smpi_mpi_recv(&group_out, 1, MPI_PTR, 0, system_tag, comm, MPI_STATUS_IGNORE);
280       if(group_out){
281         group_out=smpi_group_copy(group_out);
282       }
283     } /* otherwise, exit with group_out == NULL */
284   }
285   return group_out ? smpi_comm_new(group_out, NULL) : MPI_COMM_NULL;
286 }
287
288 void smpi_comm_use(MPI_Comm comm){
289   if (comm == MPI_COMM_UNINITIALIZED)
290     comm = smpi_process_comm_world();
291   comm->refcount++;
292 }
293
294 void smpi_comm_unuse(MPI_Comm comm){
295   if (comm == MPI_COMM_UNINITIALIZED)
296     comm = smpi_process_comm_world();
297   comm->refcount--;
298   if(comm->refcount==0){
299     if(comm->intra_comm != MPI_COMM_NULL)
300       smpi_comm_unuse(comm->intra_comm);
301     if(comm->leaders_comm != MPI_COMM_NULL)
302       smpi_comm_unuse(comm->leaders_comm);
303     if(comm->non_uniform_map !=NULL)
304       xbt_free(comm->non_uniform_map);
305     if(comm->leaders_map !=NULL)
306       xbt_free(comm->leaders_map);
307     if(comm->attributes !=NULL){
308       xbt_dict_cursor_t cursor = NULL;
309       int* key;
310       smpi_key_elem elem;
311       void * value;
312       int flag;
313       xbt_dict_foreach(comm->attributes, cursor, key, elem){
314         if(smpi_attr_get(comm, *key, &value, &flag)==MPI_SUCCESS)
315           elem->delete_fn(comm, *key, &value, &flag);
316       }
317     }
318     xbt_free(comm);
319   }
320 }
321
322 static int
323 compare_ints (const void *a, const void *b)
324 {
325   const int *da = (const int *) a;
326   const int *db = (const int *) b;
327
328   return (*da > *db) - (*da < *db);
329 }
330
331 void smpi_comm_init_smp(MPI_Comm comm){
332   int leader = -1;
333
334   if (comm == MPI_COMM_UNINITIALIZED)
335     comm = smpi_process_comm_world();
336
337   int comm_size =smpi_comm_size(comm);
338   
339   // If we are in replay - perform an ugly hack  
340   // say to SimGrid that we are not in replay for a while, because we need 
341   // the buffers to be copied for the following calls
342   int replaying = 0; //cache data to set it back again after
343   if(smpi_process_get_replaying()){
344    replaying=1;
345    smpi_process_set_replaying(0);
346   }
347
348   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
349      smpi_switch_data_segment(smpi_process_index());
350    }
351   //identify neighbours in comm
352   //get the indexes of all processes sharing the same simix host
353   xbt_swag_t process_list = simcall_host_get_process_list(SIMIX_host_self());
354   int intra_comm_size = 0;
355   //only one process/node, disable SMP support and return
356 //  if(intra_comm_size==1){
357 //      smpi_comm_set_intra_comm(comm, MPI_COMM_SELF);
358 //      //smpi_comm_set_leaders_comm(comm, comm);
359 //      smpi_process_set_comm_intra(MPI_COMM_SELF);
360 //      return;
361 //  }
362
363
364   int i =0;
365   int min_index=INT_MAX;//the minimum index will be the leader
366   msg_process_t process = NULL;
367   xbt_swag_foreach(process, process_list) {
368     //is_in_comm=0;
369     int index = SIMIX_process_get_PID(process) -1;
370
371     if(smpi_group_rank(smpi_comm_group(comm),  index)!=MPI_UNDEFINED){
372         intra_comm_size++;
373       //the process is in the comm
374       if(index < min_index)
375         min_index=index;
376       i++;
377     }
378   }
379   XBT_DEBUG("number of processes deployed on my node : %d", intra_comm_size);
380   MPI_Group group_intra = smpi_group_new(intra_comm_size);
381   i=0;
382   process = NULL;
383   xbt_swag_foreach(process, process_list) {
384     //is_in_comm=0;
385     int index = SIMIX_process_get_PID(process) -1;
386     if(smpi_group_rank(smpi_comm_group(comm),  index)!=MPI_UNDEFINED){
387       smpi_group_set_mapping(group_intra, index, i);
388       i++;
389     }
390   }
391
392
393   MPI_Comm comm_intra = smpi_comm_new(group_intra, NULL);
394   //MPI_Comm shmem_comm = smpi_process_comm_intra();
395   //int intra_rank = smpi_comm_rank(shmem_comm);
396
397
398   //if(smpi_process_index()==min_index)
399   leader=min_index;
400
401   int * leaders_map= (int*)xbt_malloc0(sizeof(int)*comm_size);
402   int * leader_list= (int*)xbt_malloc0(sizeof(int)*comm_size);
403   for(i=0; i<comm_size; i++){
404       leader_list[i]=-1;
405   }
406
407   smpi_coll_tuned_allgather_mpich(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, comm);
408
409   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
410      smpi_switch_data_segment(smpi_process_index());
411    }
412    
413   if(!comm->leaders_map){
414     comm->leaders_map= leaders_map;
415   }else{
416     xbt_free(leaders_map);
417   }
418   int j=0;
419   int leader_group_size = 0;
420   for(i=0; i<comm_size; i++){
421       int already_done=0;
422       for(j=0;j<leader_group_size; j++){
423         if(comm->leaders_map[i]==leader_list[j]){
424             already_done=1;
425         }
426       }
427       if(!already_done){
428         leader_list[leader_group_size]=comm->leaders_map[i];
429         leader_group_size++;
430       }
431   }
432   qsort(leader_list, leader_group_size, sizeof(int),compare_ints);
433
434   MPI_Group leaders_group = smpi_group_new(leader_group_size);
435
436
437   MPI_Comm leader_comm = MPI_COMM_NULL;
438   if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && comm!=MPI_COMM_WORLD){
439     //create leader_communicator
440     for (i=0; i< leader_group_size;i++)
441       smpi_group_set_mapping(leaders_group, leader_list[i], i);
442     leader_comm = smpi_comm_new(leaders_group, NULL);
443     smpi_comm_set_leaders_comm(comm, leader_comm);
444     smpi_comm_set_intra_comm(comm, comm_intra);
445
446     //create intracommunicator
447    // smpi_comm_set_intra_comm(comm, smpi_comm_split(comm, *(int*)SIMIX_host_self(), comm_rank));
448   }else{
449     for (i=0; i< leader_group_size;i++)
450       smpi_group_set_mapping(leaders_group, leader_list[i], i);
451
452     leader_comm = smpi_comm_new(leaders_group, NULL);
453     if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL)
454       smpi_comm_set_leaders_comm(comm, leader_comm);
455     smpi_process_set_comm_intra(comm_intra);
456   }
457
458   int is_uniform = 1;
459
460   // Are the nodes uniform ? = same number of process/node
461   int my_local_size=smpi_comm_size(comm_intra);
462   if(smpi_comm_rank(comm_intra)==0) {
463     int* non_uniform_map = xbt_malloc0(sizeof(int)*leader_group_size);
464     smpi_coll_tuned_allgather_mpich(&my_local_size, 1, MPI_INT,
465         non_uniform_map, 1, MPI_INT, leader_comm);
466     for(i=0; i < leader_group_size; i++) {
467       if(non_uniform_map[0] != non_uniform_map[i]) {
468         is_uniform = 0;
469         break;
470       }
471     }
472     if(!is_uniform && smpi_comm_is_uniform(comm)){
473         comm->non_uniform_map= non_uniform_map;
474     }else{
475         xbt_free(non_uniform_map);
476     }
477     comm->is_uniform=is_uniform;
478   }
479   smpi_coll_tuned_bcast_mpich(&(comm->is_uniform),1, MPI_INT, 0, comm_intra );
480
481   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
482      smpi_switch_data_segment(smpi_process_index());
483    }
484   // Are the ranks blocked ? = allocated contiguously on the SMP nodes
485   int is_blocked=1;
486   int prev=smpi_group_rank(smpi_comm_group(comm), smpi_group_index(smpi_comm_group(comm_intra), 0));
487     for (i=1; i<my_local_size; i++){
488       int this=smpi_group_rank(smpi_comm_group(comm),smpi_group_index(smpi_comm_group(comm_intra), i));
489       if(this!=prev+1){
490         is_blocked=0;
491         break;
492       }
493       prev = this;
494   }
495
496   int global_blocked;
497   smpi_mpi_allreduce(&is_blocked, &(global_blocked), 1,
498             MPI_INT, MPI_LAND, comm);
499
500   if(MPI_COMM_WORLD==SMPI_UNINITIALIZED || comm==MPI_COMM_WORLD){
501     if(smpi_comm_rank(comm)==0){
502         comm->is_blocked=global_blocked;
503     }
504   }else{
505     comm->is_blocked=global_blocked;
506   }
507   xbt_free(leader_list);
508   
509   if(replaying==1)
510     smpi_process_set_replaying(1); 
511 }
512
513
514 int smpi_comm_attr_delete(MPI_Comm comm, int keyval){
515   if(comm->attributes==NULL)
516     return MPI_ERR_ARG;
517     
518   xbt_dict_remove(comm->attributes, (const char*)&keyval);
519   return MPI_SUCCESS;
520 }
521 int smpi_comm_attr_get(MPI_Comm comm, int keyval, void* attr_value, int* flag){
522   xbt_ex_t ex;
523   if(comm->attributes==NULL){
524     *flag=0;
525     return MPI_SUCCESS;
526   }
527   TRY {
528     *(void**)attr_value = xbt_dict_get(comm->attributes, (const char*)&keyval);
529     *flag=1;
530   }
531   CATCH(ex) {
532     *flag=0;
533     xbt_ex_free(ex);
534   }
535   
536   return MPI_SUCCESS;
537 }
538
539 int smpi_comm_attr_put(MPI_Comm comm, int keyval, void* attr_value){
540   if(comm->attributes==NULL)
541     comm->attributes=xbt_dict_new();
542
543   xbt_dict_set(comm->attributes, (const char*)&keyval, attr_value, NULL);
544   return MPI_SUCCESS;
545 }
546