Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fixed two more unclean execution paths
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 void smpi_process_init(int* argc, char*** argv) {
17   int index;
18   smpi_process_data_t data;
19   smx_process_t proc;
20
21   proc = SIMIX_process_self();
22   index = atoi((*argv)[1]);
23   data = smpi_process_remote_data(index);
24   SIMIX_process_set_data(proc, data);
25   if (*argc > 2) {
26     free((*argv)[1]);
27     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
28     (*argv)[(*argc) - 1] = NULL;
29   }
30   (*argc)--;
31   DEBUG2("<%d> New process in the game: %p", index, proc);
32 }
33
34 void smpi_process_destroy(void) {
35   int index = smpi_process_index();
36
37   DEBUG1("<%d> Process left the game", index);
38 }
39
40 /* MPI Low level calls */
41 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
42   MPI_Request request;
43
44   request = xbt_new(s_smpi_mpi_request_t, 1);
45   request->comm = comm;
46   request->src = smpi_comm_rank(comm);
47   request->dst = dst;
48   request->tag = tag;
49   request->size = smpi_datatype_size(datatype) * count;
50   request->complete = 0;
51   smpi_process_post_send(comm, request);
52   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, request);
53   return request;
54 }
55
56 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
57   MPI_Request request;
58
59   request = xbt_new(s_smpi_mpi_request_t, 1);
60   request->comm = comm;
61   request->src = src;
62   request->dst = smpi_comm_rank(comm);
63   request->tag = tag;
64   request->size = smpi_datatype_size(datatype) * count;
65   request->complete = 0;
66   smpi_process_post_recv(request);
67   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
68   return request;
69 }
70
71 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
72   MPI_Request request;
73
74   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
75   smpi_mpi_wait(&request, status);
76 }
77
78 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
79   MPI_Request request;
80
81   request = smpi_mpi_isend(buf, count, datatype, src, tag, comm);
82   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
83 }
84
85 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
86   MPI_Request requests[2];
87   MPI_Status stats[2];
88
89   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
90   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
91   smpi_mpi_waitall(2, requests, stats);
92   if(status != MPI_STATUS_IGNORE) {
93     // Copy receive status
94     memcpy(status, &stats[1], sizeof(MPI_Status));
95   }
96 }
97
98 static void finish_wait(MPI_Request* request, MPI_Status* status) {
99   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
100
101   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
102   if(status != MPI_STATUS_IGNORE) {
103     status->MPI_SOURCE = (*request)->src;
104     status->MPI_TAG = (*request)->tag;
105     status->MPI_ERROR = MPI_SUCCESS;
106   }
107   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
108   // data == *request if sender is first to finish its wait
109   // data != *request if receiver is first to finish its wait
110   if(data->complete == 0) {
111     // first arrives here
112     data->complete = 1;
113     if(data != *request) {
114       // receveiver cleans its part
115       xbt_free(*request);
116     }
117   } else {
118     // second arrives here
119     if(data != *request) {
120       // receiver cleans everything
121       xbt_free(data);
122     }
123     xbt_free(*request);
124   }
125   *request = MPI_REQUEST_NULL;
126 }
127
128 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
129   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
130   int flag = data && data->complete == 1;
131
132   if(flag) {
133     SIMIX_communication_destroy((*request)->pair);
134     finish_wait(request, status);
135   }
136   return flag;
137 }
138
139 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
140   MPI_Request data;
141   int i, flag;
142
143   *index = MPI_UNDEFINED;
144   flag = 0;
145   for(i = 0; i < count; i++) {
146     if(requests[i] != MPI_REQUEST_NULL) {
147       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
148       if(data != MPI_REQUEST_NULL && data->complete == 1) {
149         SIMIX_communication_destroy(requests[i]->pair);
150         finish_wait(&requests[i], status);
151         *index = i;
152         flag = 1;
153         break;
154       }
155     }
156   }
157   return flag;
158 }
159
160 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
161   MPI_Request data = (MPI_Request)SIMIX_communication_get_data((*request)->pair);
162
163   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
164          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
165   // data is null if receiver waits before sender enters the rdv
166   if(data == MPI_REQUEST_NULL || data->complete == 0) {
167     SIMIX_network_wait((*request)->pair, -1.0);
168   } else {
169     SIMIX_communication_destroy((*request)->pair);
170   }
171   finish_wait(request, status);
172 }
173
174 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
175   xbt_dynar_t comms;
176   MPI_Request data;
177   int i, size, index;
178   int* map;
179
180   index = MPI_UNDEFINED;
181   if(count > 0) {
182     // First check for already completed requests
183     for(i = 0; i < count; i++) {
184       if(requests[i] != MPI_REQUEST_NULL) {
185         data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
186         if(data != MPI_REQUEST_NULL && data->complete == 1) {
187           index = i;
188           SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
189           break;
190         }
191       }
192     }
193     if(index == MPI_UNDEFINED) {
194       // Otherwise, wait for a request to complete
195       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
196       map = xbt_new(int, count);
197       size = 0;
198       DEBUG0("Wait for one of");
199       for(i = 0; i < count; i++) {
200         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
201          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
202                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
203           xbt_dynar_push(comms, &requests[i]->pair);
204           map[size] = i;
205           size++;
206         }
207       }
208       if(size > 0) {
209         index = SIMIX_network_waitany(comms);
210         index = map[index];
211       }
212       xbt_free(map);
213       xbt_dynar_free_container(&comms);
214     }
215     if(index != MPI_UNDEFINED) {
216       finish_wait(&requests[index], status);
217     }
218   }
219   return index;
220 }
221
222 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
223   int index;
224   MPI_Status stat;
225
226   while(count > 0) {
227     index = smpi_mpi_waitany(count, requests, &stat);
228     if(index == MPI_UNDEFINED) {
229       break;
230     }
231     if(status != MPI_STATUS_IGNORE) {
232       memcpy(&status[index], &stat, sizeof(stat));
233     }
234     // Move the last request to the found position
235     requests[index] = requests[count - 1];
236     requests[count - 1] = MPI_REQUEST_NULL;
237     count--;
238   }
239 }
240
241 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
242   MPI_Request data;
243   int i, count;
244
245   count = 0;
246   for(i = 0; i < count; i++) {
247     if(requests[i] != MPI_REQUEST_NULL) {
248       data = (MPI_Request)SIMIX_communication_get_data(requests[i]->pair);
249       if(data != MPI_REQUEST_NULL && data->complete == 1) {
250         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
251         indices[count] = i;
252         count++;
253       }
254     }
255   }
256   return count;
257 }
258
259 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
260   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
261   nary_tree_bcast(buf, count, datatype, root, comm, 4);
262 }
263
264 void smpi_mpi_barrier(MPI_Comm comm) {
265   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
266   nary_tree_barrier(comm, 4);
267 }
268
269 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
270   int system_tag = 666;
271   int rank, size, src, index, sendsize, recvsize;
272   MPI_Request* requests;
273
274   rank = smpi_comm_rank(comm);
275   size = smpi_comm_size(comm);
276   if(rank != root) {
277     // Send buffer to root
278     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
279   } else {
280     sendsize = smpi_datatype_size(sendtype);
281     recvsize = smpi_datatype_size(recvtype);
282     // Local copy from root
283     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
284     // Receive buffers from senders
285     requests = xbt_new(MPI_Request, size - 1);
286     index = 0;
287     for(src = 0; src < size; src++) {
288       if(src != root) {
289         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
290         index++;
291       }
292     }
293     // Wait for completion of irecv's.
294     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
295     xbt_free(requests);
296   }
297 }
298
299 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
300   int system_tag = 666;
301   int rank, size, src, index, sendsize;
302   MPI_Request* requests;
303
304   rank = smpi_comm_rank(comm);
305   size = smpi_comm_size(comm);
306   if(rank != root) {
307     // Send buffer to root
308     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
309   } else {
310     sendsize = smpi_datatype_size(sendtype);
311     // Local copy from root
312     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
313     // Receive buffers from senders
314     requests = xbt_new(MPI_Request, size - 1);
315     index = 0;
316     for(src = 0; src < size; src++) {
317       if(src != root) {
318         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
319         index++;
320       }
321     }
322     // Wait for completion of irecv's.
323     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
324     xbt_free(requests);
325   }
326 }
327
328 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
329   int system_tag = 666;
330   int rank, size, other, index, sendsize, recvsize;
331   MPI_Request* requests;
332
333   rank = smpi_comm_rank(comm);
334   size = smpi_comm_size(comm);
335   sendsize = smpi_datatype_size(sendtype);
336   recvsize = smpi_datatype_size(recvtype);
337   // Local copy from self
338   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
339   // Send/Recv buffers to/from others;
340   requests = xbt_new(MPI_Request, 2 * (size - 1));
341   index = 0;
342   for(other = 0; other < size; other++) {
343     if(other != rank) {
344       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
345       index++;
346       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
347       index++;
348     }
349   }
350   // Wait for completion of all comms.
351   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
352   xbt_free(requests);
353 }
354
355 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
356   int system_tag = 666;
357   int rank, size, other, index, sendsize, recvsize;
358   MPI_Request* requests;
359
360   rank = smpi_comm_rank(comm);
361   size = smpi_comm_size(comm);
362   sendsize = smpi_datatype_size(sendtype);
363   recvsize = smpi_datatype_size(recvtype);
364   // Local copy from self
365   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
366   // Send buffers to others;
367   requests = xbt_new(MPI_Request, 2 * (size - 1));
368   index = 0;
369   for(other = 0; other < size; other++) {
370     if(other != rank) {
371       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
372       index++;
373       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
374       index++;
375     }
376   }
377   // Wait for completion of all comms.
378   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
379   xbt_free(requests);
380 }
381
382 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
383   int system_tag = 666;
384   int rank, size, dst, index, sendsize, recvsize;
385   MPI_Request* requests;
386
387   rank = smpi_comm_rank(comm);
388   size = smpi_comm_size(comm);
389   if(rank != root) {
390     // Recv buffer from root
391     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
392   } else {
393     sendsize = smpi_datatype_size(sendtype);
394     recvsize = smpi_datatype_size(recvtype);
395     // Local copy from root
396     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
397     // Send buffers to receivers
398     requests = xbt_new(MPI_Request, size - 1);
399     index = 0;
400     for(dst = 0; dst < size; dst++) {
401       if(dst != root) {
402         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
403         index++;
404       }
405     }
406     // Wait for completion of isend's.
407     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
408     xbt_free(requests);
409   }
410 }
411
412 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
413   int system_tag = 666;
414   int rank, size, dst, index, sendsize, recvsize;
415   MPI_Request* requests;
416
417   rank = smpi_comm_rank(comm);
418   size = smpi_comm_size(comm);
419   if(rank != root) {
420     // Recv buffer from root
421     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
422   } else {
423     sendsize = smpi_datatype_size(sendtype);
424     recvsize = smpi_datatype_size(recvtype);
425     // Local copy from root
426     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
427     // Send buffers to receivers
428     requests = xbt_new(MPI_Request, size - 1);
429     index = 0;
430     for(dst = 0; dst < size; dst++) {
431       if(dst != root) {
432         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
433         index++;
434       }
435     }
436     // Wait for completion of isend's.
437     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
438     xbt_free(requests);
439   }
440 }
441
442 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
443   int system_tag = 666;
444   int rank, size, src, index, datasize;
445   MPI_Request* requests;
446   void** tmpbufs;
447  
448   rank = smpi_comm_rank(comm);
449   size = smpi_comm_size(comm);
450   if(rank != root) {
451     // Send buffer to root
452     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
453   } else {
454     datasize = smpi_datatype_size(datatype);
455     // Local copy from root
456     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
457     // Receive buffers from senders
458     //TODO: make a MPI_barrier here ?
459     requests = xbt_new(MPI_Request, size - 1);
460     tmpbufs = xbt_new(void*, size - 1);
461     index = 0;
462     for(src = 0; src < size; src++) {
463       if(src != root) {
464         tmpbufs[index] = xbt_malloc(count * datasize);
465         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
466         index++;
467       }
468     }
469     // Wait for completion of irecv's.
470     for(src = 0; src < size - 1; src++) {
471       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
472       if(index == MPI_UNDEFINED) {
473         break;
474       }
475       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
476     }
477     for(index = 0; index < size - 1; index++) {
478       xbt_free(tmpbufs[index]);
479     }
480     xbt_free(tmpbufs);
481     xbt_free(requests);
482   }
483 }
484
485 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
486   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
487   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
488
489 /*
490 FIXME: buggy implementation
491
492   int system_tag = 666;
493   int rank, size, other, index, datasize;
494   MPI_Request* requests;
495   void** tmpbufs;
496  
497   rank = smpi_comm_rank(comm);
498   size = smpi_comm_size(comm);
499   datasize = smpi_datatype_size(datatype);
500   // Local copy from self
501   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char)); 
502   // Send/Recv buffers to/from others;
503   //TODO: make a MPI_barrier here ?
504   requests = xbt_new(MPI_Request, 2 * (size - 1));
505   tmpbufs = xbt_new(void*, size - 1);
506   index = 0;
507   for(other = 0; other < size; other++) {
508     if(other != rank) {
509       tmpbufs[index / 2] = xbt_malloc(count * datasize);
510       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
511       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
512       index += 2;
513     }
514   }
515   // Wait for completion of all comms.
516   for(other = 0; other < 2 * (size - 1); other++) {
517     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
518     if(index == MPI_UNDEFINED) {
519       break;
520     }
521     if((index & 1) == 1) {
522       // Request is odd: it's a irecv
523       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
524     }
525   }
526   for(index = 0; index < size - 1; index++) {
527     xbt_free(tmpbufs[index]);
528   }
529   xbt_free(tmpbufs);
530   xbt_free(requests);
531 */
532 }