Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Change way replay is handled, to allow cohabitation between replay and "classic"...
[simgrid.git] / src / smpi / smpi_comm.c
1 /* Copyright (c) 2010-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <stdlib.h>
8
9 #include "private.h"
10 #include "smpi_mpi_dt_private.h"
11 #include "limits.h"
12 #include "simix/smx_private.h"
13 #include "colls/colls.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_comm, smpi,
17                                 "Logging specific to SMPI (comm)");
18
19
20
21 /* Support for cartesian topology was added, but there are 2 other types of
22  * topology, graph et dist graph. In order to support them, we have to add a
23  * field MPIR_Topo_type, and replace the MPI_Topology field by an union. */
24
25 typedef struct s_smpi_mpi_communicator {
26   MPI_Group group;
27   MPIR_Topo_type topoType; 
28   MPI_Topology topo; // to be replaced by an union
29   int refcount;
30   MPI_Comm leaders_comm;//inter-node communicator
31   MPI_Comm intra_comm;//intra-node communicator . For MPI_COMM_WORLD this can't be used, as var is global.
32   //use an intracomm stored in the process data instead
33   int* leaders_map; //who is the leader of each process
34   int is_uniform;
35   int* non_uniform_map; //set if smp nodes have a different number of processes allocated
36   int is_blocked;// are ranks allocated on the same smp node contiguous ?
37 } s_smpi_mpi_communicator_t;
38
39 static int smpi_compare_rankmap(const void *a, const void *b)
40 {
41   const int* x = (const int*)a;
42   const int* y = (const int*)b;
43
44   if (x[1] < y[1]) {
45     return -1;
46   }
47   if (x[1] == y[1]) {
48     if (x[0] < y[0]) {
49       return -1;
50     }
51     if (x[0] == y[0]) {
52       return 0;
53     }
54     return 1;
55   }
56   return 1;
57 }
58
59 MPI_Comm smpi_comm_new(MPI_Group group, MPI_Topology topo)
60 {
61   MPI_Comm comm;
62
63   comm = xbt_new(s_smpi_mpi_communicator_t, 1);
64   comm->group = group;
65   smpi_group_use(comm->group);
66   comm->refcount=1;
67   comm->topoType = -1;
68   comm->topo = topo;
69   comm->intra_comm = MPI_COMM_NULL;
70   comm->leaders_comm = MPI_COMM_NULL;
71   comm->is_uniform=1;
72   comm->non_uniform_map = NULL;
73   comm->leaders_map = NULL;
74   comm->is_blocked=0;
75   return comm;
76 }
77
78 void smpi_comm_destroy(MPI_Comm comm)
79 {
80   if (comm == MPI_COMM_UNINITIALIZED)
81     comm = smpi_process_comm_world();
82   smpi_group_unuse(comm->group);
83   smpi_topo_destroy(comm->topo); // there's no use count on topos
84   smpi_comm_unuse(comm);
85 }
86
87 MPI_Group smpi_comm_group(MPI_Comm comm)
88 {
89   if (comm == MPI_COMM_UNINITIALIZED)
90     comm = smpi_process_comm_world();
91
92   return comm->group;
93 }
94
95 MPI_Topology smpi_comm_topo(MPI_Comm comm) {
96   if (comm != MPI_COMM_NULL)
97     return comm->topo;
98   return NULL;
99 }
100
101 int smpi_comm_size(MPI_Comm comm)
102 {
103   if (comm == MPI_COMM_UNINITIALIZED)
104     comm = smpi_process_comm_world();
105
106   return smpi_group_size(smpi_comm_group(comm));
107 }
108
109 int smpi_comm_rank(MPI_Comm comm)
110 {
111   if (comm == MPI_COMM_UNINITIALIZED)
112     comm = smpi_process_comm_world();
113   return smpi_group_rank(smpi_comm_group(comm), smpi_process_index());
114 }
115
116 void smpi_comm_get_name (MPI_Comm comm, char* name, int* len)
117 {
118   if (comm == MPI_COMM_UNINITIALIZED)
119     comm = smpi_process_comm_world();
120   if(comm == MPI_COMM_WORLD) {
121     strcpy(name, "WORLD");
122     *len = 5;
123   } else {
124     *len = snprintf(name, MPI_MAX_NAME_STRING, "%p", comm);
125   }
126 }
127
128 void smpi_comm_set_leaders_comm(MPI_Comm comm, MPI_Comm leaders){
129   if (comm == MPI_COMM_UNINITIALIZED)
130     comm = smpi_process_comm_world();
131   comm->leaders_comm=leaders;
132 }
133
134 void smpi_comm_set_intra_comm(MPI_Comm comm, MPI_Comm leaders){
135   comm->intra_comm=leaders;
136 }
137
138 int* smpi_comm_get_non_uniform_map(MPI_Comm comm){
139   if (comm == MPI_COMM_UNINITIALIZED)
140     comm = smpi_process_comm_world();
141   return comm->non_uniform_map;
142 }
143
144 int* smpi_comm_get_leaders_map(MPI_Comm comm){
145   if (comm == MPI_COMM_UNINITIALIZED)
146     comm = smpi_process_comm_world();
147   return comm->leaders_map;
148 }
149
150 MPI_Comm smpi_comm_get_leaders_comm(MPI_Comm comm){
151   if (comm == MPI_COMM_UNINITIALIZED)
152     comm = smpi_process_comm_world();
153   return comm->leaders_comm;
154 }
155
156 MPI_Comm smpi_comm_get_intra_comm(MPI_Comm comm){
157   if (comm == MPI_COMM_UNINITIALIZED || comm==MPI_COMM_WORLD) 
158     return smpi_process_get_comm_intra();
159   else return comm->intra_comm;
160 }
161
162 int smpi_comm_is_uniform(MPI_Comm comm){
163   if (comm == MPI_COMM_UNINITIALIZED)
164     comm = smpi_process_comm_world();
165   return comm->is_uniform;
166 }
167
168 int smpi_comm_is_blocked(MPI_Comm comm){
169   if (comm == MPI_COMM_UNINITIALIZED)
170     comm = smpi_process_comm_world();
171   return comm->is_blocked;
172 }
173
174 MPI_Comm smpi_comm_split(MPI_Comm comm, int color, int key)
175 {
176   if (comm == MPI_COMM_UNINITIALIZED)
177     comm = smpi_process_comm_world();
178   int system_tag = 123;
179   int index, rank, size, i, j, count, reqs;
180   int* sendbuf;
181   int* recvbuf;
182   int* rankmap;
183   MPI_Group group, group_root, group_out;
184   MPI_Request* requests;
185
186   group_root = group_out = NULL;
187   group = smpi_comm_group(comm);
188   rank = smpi_comm_rank(comm);
189   size = smpi_comm_size(comm);
190   /* Gather all colors and keys on rank 0 */
191   sendbuf = xbt_new(int, 2);
192   sendbuf[0] = color;
193   sendbuf[1] = key;
194   if(rank == 0) {
195     recvbuf = xbt_new(int, 2 * size);
196   } else {
197     recvbuf = NULL;
198   }
199   smpi_mpi_gather(sendbuf, 2, MPI_INT, recvbuf, 2, MPI_INT, 0, comm);
200   xbt_free(sendbuf);
201   /* Do the actual job */
202   if(rank == 0) {
203     rankmap = xbt_new(int, 2 * size);
204     for(i = 0; i < size; i++) {
205       if(recvbuf[2 * i] == MPI_UNDEFINED) {
206         continue;
207       }
208       count = 0;
209       for(j = i + 1; j < size; j++)  {
210         if(recvbuf[2 * i] == recvbuf[2 * j]) {
211           recvbuf[2 * j] = MPI_UNDEFINED;
212           rankmap[2 * count] = j;
213           rankmap[2 * count + 1] = recvbuf[2 * j + 1];
214           count++;
215         }
216       }
217       /* Add self in the group */
218       recvbuf[2 * i] = MPI_UNDEFINED;
219       rankmap[2 * count] = i;
220       rankmap[2 * count + 1] = recvbuf[2 * i + 1];
221       count++;
222       qsort(rankmap, count, 2 * sizeof(int), &smpi_compare_rankmap);
223       group_out = smpi_group_new(count);
224       if(i == 0) {
225         group_root = group_out; /* Save root's group */
226       }
227       for(j = 0; j < count; j++) {
228         //increment refcounter in order to avoid freeing the group too quick before copy
229         index = smpi_group_index(group, rankmap[2 * j]);
230         smpi_group_set_mapping(group_out, index, j);
231       }
232       requests = xbt_new(MPI_Request, count);
233       reqs = 0;
234       for(j = 0; j < count; j++) {
235         if(rankmap[2 * j] != 0) {
236           requests[reqs] = smpi_isend_init(&group_out, 1, MPI_PTR, rankmap[2 * j], system_tag, comm);
237           reqs++;
238         }
239       }
240       smpi_mpi_startall(reqs, requests);
241       smpi_mpi_waitall(reqs, requests, MPI_STATUS_IGNORE);
242       xbt_free(requests);
243     }
244     xbt_free(recvbuf);
245     group_out = group_root; /* exit with root's group */
246   } else {
247     if(color != MPI_UNDEFINED) {
248       smpi_mpi_recv(&group_out, 1, MPI_PTR, 0, system_tag, comm, MPI_STATUS_IGNORE);
249       if(group_out){
250         group_out=smpi_group_copy(group_out);
251       }
252     } /* otherwise, exit with group_out == NULL */
253   }
254   return group_out ? smpi_comm_new(group_out, NULL) : MPI_COMM_NULL;
255 }
256
257 void smpi_comm_use(MPI_Comm comm){
258   if (comm == MPI_COMM_UNINITIALIZED)
259     comm = smpi_process_comm_world();
260   comm->refcount++;
261 }
262
263 void smpi_comm_unuse(MPI_Comm comm){
264   if (comm == MPI_COMM_UNINITIALIZED)
265     comm = smpi_process_comm_world();
266   comm->refcount--;
267   if(comm->refcount==0){
268     if(comm->intra_comm != MPI_COMM_NULL)
269       smpi_comm_unuse(comm->intra_comm);
270     if(comm->leaders_comm != MPI_COMM_NULL)
271       smpi_comm_unuse(comm->leaders_comm);
272     if(comm->non_uniform_map !=NULL)
273       xbt_free(comm->non_uniform_map);
274     if(comm->leaders_map !=NULL)
275       xbt_free(comm->leaders_map);
276     xbt_free(comm);
277   }
278 }
279
280 static int
281 compare_ints (const void *a, const void *b)
282 {
283   const int *da = (const int *) a;
284   const int *db = (const int *) b;
285
286   return (*da > *db) - (*da < *db);
287 }
288
289 void smpi_comm_init_smp(MPI_Comm comm){
290   int leader = -1;
291
292   if (comm == MPI_COMM_UNINITIALIZED)
293     comm = smpi_process_comm_world();
294
295   int comm_size =smpi_comm_size(comm);
296   
297   // If we are in replay - perform an ugly hack  
298   // say to SimGrid that we are not in replay for a while, because we need 
299   // the buffers to be copied for the following calls
300   int replaying = 0; //cache data to set it back again after
301   if(smpi_process_get_replaying()){
302    replaying=1;
303    smpi_process_set_replaying(0);
304   }
305
306   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
307      smpi_switch_data_segment(smpi_process_index());
308    }
309   //identify neighbours in comm
310   //get the indexes of all processes sharing the same simix host
311   xbt_swag_t process_list = simcall_host_get_process_list(SIMIX_host_self());
312   int intra_comm_size = 0;
313   //only one process/node, disable SMP support and return
314 //  if(intra_comm_size==1){
315 //      smpi_comm_set_intra_comm(comm, MPI_COMM_SELF);
316 //      //smpi_comm_set_leaders_comm(comm, comm);
317 //      smpi_process_set_comm_intra(MPI_COMM_SELF);
318 //      return;
319 //  }
320
321
322   int i =0;
323   int min_index=INT_MAX;//the minimum index will be the leader
324   msg_process_t process = NULL;
325   xbt_swag_foreach(process, process_list) {
326     //is_in_comm=0;
327     int index = SIMIX_process_get_PID(process) -1;
328
329     if(smpi_group_rank(smpi_comm_group(comm),  index)!=MPI_UNDEFINED){
330         intra_comm_size++;
331       //the process is in the comm
332       if(index < min_index)
333         min_index=index;
334       i++;
335     }
336   }
337   XBT_DEBUG("number of processes deployed on my node : %d", intra_comm_size);
338   MPI_Group group_intra = smpi_group_new(intra_comm_size);
339   i=0;
340   process = NULL;
341   xbt_swag_foreach(process, process_list) {
342     //is_in_comm=0;
343     int index = SIMIX_process_get_PID(process) -1;
344     if(smpi_group_rank(smpi_comm_group(comm),  index)!=MPI_UNDEFINED){
345       smpi_group_set_mapping(group_intra, index, i);
346       i++;
347     }
348   }
349
350
351   MPI_Comm comm_intra = smpi_comm_new(group_intra, NULL);
352   //MPI_Comm shmem_comm = smpi_process_comm_intra();
353   //int intra_rank = smpi_comm_rank(shmem_comm);
354
355
356   //if(smpi_process_index()==min_index)
357   leader=min_index;
358
359   int * leaders_map= (int*)xbt_malloc0(sizeof(int)*comm_size);
360   int * leader_list= (int*)xbt_malloc0(sizeof(int)*comm_size);
361   for(i=0; i<comm_size; i++){
362       leader_list[i]=-1;
363   }
364
365   smpi_coll_tuned_allgather_mpich(&leader, 1, MPI_INT , leaders_map, 1, MPI_INT, comm);
366
367   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
368      smpi_switch_data_segment(smpi_process_index());
369    }
370    
371   if(!comm->leaders_map){
372     comm->leaders_map= leaders_map;
373   }else{
374     xbt_free(leaders_map);
375   }
376   int j=0;
377   int leader_group_size = 0;
378   for(i=0; i<comm_size; i++){
379       int already_done=0;
380       for(j=0;j<leader_group_size; j++){
381         if(comm->leaders_map[i]==leader_list[j]){
382             already_done=1;
383         }
384       }
385       if(!already_done){
386         leader_list[leader_group_size]=comm->leaders_map[i];
387         leader_group_size++;
388       }
389   }
390   qsort(leader_list, leader_group_size, sizeof(int),compare_ints);
391
392   MPI_Group leaders_group = smpi_group_new(leader_group_size);
393
394
395   MPI_Comm leader_comm = MPI_COMM_NULL;
396   if(MPI_COMM_WORLD!=MPI_COMM_UNINITIALIZED && comm!=MPI_COMM_WORLD){
397     //create leader_communicator
398     for (i=0; i< leader_group_size;i++)
399       smpi_group_set_mapping(leaders_group, leader_list[i], i);
400     leader_comm = smpi_comm_new(leaders_group, NULL);
401     smpi_comm_set_leaders_comm(comm, leader_comm);
402     smpi_comm_set_intra_comm(comm, comm_intra);
403
404     //create intracommunicator
405    // smpi_comm_set_intra_comm(comm, smpi_comm_split(comm, *(int*)SIMIX_host_self(), comm_rank));
406   }else{
407     for (i=0; i< leader_group_size;i++)
408       smpi_group_set_mapping(leaders_group, leader_list[i], i);
409
410     leader_comm = smpi_comm_new(leaders_group, NULL);
411     if(smpi_comm_get_leaders_comm(comm)==MPI_COMM_NULL)
412       smpi_comm_set_leaders_comm(comm, leader_comm);
413     smpi_process_set_comm_intra(comm_intra);
414   }
415
416   int is_uniform = 1;
417
418   // Are the nodes uniform ? = same number of process/node
419   int my_local_size=smpi_comm_size(comm_intra);
420   if(smpi_comm_rank(comm_intra)==0) {
421     int* non_uniform_map = xbt_malloc0(sizeof(int)*leader_group_size);
422     smpi_coll_tuned_allgather_mpich(&my_local_size, 1, MPI_INT,
423         non_uniform_map, 1, MPI_INT, leader_comm);
424     for(i=0; i < leader_group_size; i++) {
425       if(non_uniform_map[0] != non_uniform_map[i]) {
426         is_uniform = 0;
427         break;
428       }
429     }
430     if(!is_uniform && smpi_comm_is_uniform(comm)){
431         comm->non_uniform_map= non_uniform_map;
432     }else{
433         xbt_free(non_uniform_map);
434     }
435     comm->is_uniform=is_uniform;
436   }
437   smpi_coll_tuned_bcast_mpich(&(comm->is_uniform),1, MPI_INT, 0, comm_intra );
438
439   if(smpi_privatize_global_variables){ //we need to switch here, as the called function may silently touch global variables
440      smpi_switch_data_segment(smpi_process_index());
441    }
442   // Are the ranks blocked ? = allocated contiguously on the SMP nodes
443   int is_blocked=1;
444   int prev=smpi_group_rank(smpi_comm_group(comm), smpi_group_index(smpi_comm_group(comm_intra), 0));
445     for (i=1; i<my_local_size; i++){
446       int this=smpi_group_rank(smpi_comm_group(comm),smpi_group_index(smpi_comm_group(comm_intra), i));
447       if(this!=prev+1){
448         is_blocked=0;
449         break;
450       }
451       prev = this;
452   }
453
454   int global_blocked;
455   smpi_mpi_allreduce(&is_blocked, &(global_blocked), 1,
456             MPI_INT, MPI_LAND, comm);
457
458   if(MPI_COMM_WORLD==SMPI_UNINITIALIZED || comm==MPI_COMM_WORLD){
459     if(smpi_comm_rank(comm)==0){
460         comm->is_blocked=global_blocked;
461     }
462   }else{
463     comm->is_blocked=global_blocked;
464   }
465   xbt_free(leader_list);
466   
467   if(replaying==1)
468     smpi_process_set_replaying(1); 
469 }
470