Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
d6e2514f6551d58d8320637956dd7621b7c0ad87
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 #define EAGER_LIMIT 65536
17 #define RDV_TAG     (-10)
18
19 void smpi_process_init(int* argc, char*** argv) {
20   int index;
21   smpi_process_data_t data;
22   smx_process_t proc;
23
24   proc = SIMIX_process_self();
25   index = atoi((*argv)[1]);
26   data = smpi_process_remote_data(index);
27   SIMIX_process_set_data(proc, data);
28   if (*argc > 2) {
29     free((*argv)[1]);
30     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
31     (*argv)[(*argc) - 1] = NULL;
32   }
33   (*argc)--;
34   DEBUG2("<%d> New process in the game: %p", index, proc);
35 }
36
37 void smpi_process_destroy(void) {
38   int index = smpi_process_index();
39
40   DEBUG1("<%d> Process left the game", index);
41 }
42
43 /* MPI Low level calls */
44 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
45   MPI_Request request;
46   MPI_Request rdv[2];
47   size_t size = smpi_datatype_size(datatype) * count;
48
49   if (size > EAGER_LIMIT) {
50         /* Warning: this (zero-length synchronous) call will come back here with size == 0 */
51    DEBUG1("RDV send to %d", dst);
52         rdv[0] = smpi_mpi_isend(NULL, 0, MPI_BYTE, dst, RDV_TAG, comm);
53         rdv[1] = smpi_mpi_irecv(NULL, 0, MPI_BYTE, dst, RDV_TAG, comm);
54    smpi_mpi_waitall(2, rdv, MPI_STATUS_IGNORE);
55   }
56   request = xbt_new(s_smpi_mpi_request_t, 1);
57   request->comm = comm;
58   request->src = smpi_comm_rank(comm);
59   request->dst = dst;
60   request->tag = tag;
61   request->size = size;
62   request->complete = 0;
63   request->data = request;
64   smpi_process_post_send(comm, request);
65   request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, buf, request->size, NULL);
66   return request;
67 }
68
69 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
70   MPI_Request request;
71   MPI_Request rdv[2];
72   size_t size = smpi_datatype_size(datatype) * count;
73
74   if (size > EAGER_LIMIT) {
75         /* Warning: this (zero-length synchronous) call will come back here with size == 0 */
76    DEBUG1("RDV recv from %d", src);
77         rdv[0] = smpi_mpi_irecv(NULL, 0, MPI_BYTE, src, RDV_TAG, comm);
78         rdv[1] = smpi_mpi_isend(NULL, 0, MPI_BYTE, src, RDV_TAG, comm);
79    smpi_mpi_waitall(2, rdv, MPI_STATUS_IGNORE);
80   }
81   request = xbt_new(s_smpi_mpi_request_t, 1);
82   request->comm = comm;
83   request->src = src;
84   request->dst = smpi_comm_rank(comm);
85   request->tag = tag;
86   request->size = size;
87   request->complete = 0;
88   request->data = MPI_REQUEST_NULL;
89   smpi_process_post_recv(request);
90   request->pair = SIMIX_network_irecv(request->rdv, buf, &request->size);
91   return request;
92 }
93
94 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
95   MPI_Request request;
96
97   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
98   smpi_mpi_wait(&request, status);
99 }
100
101 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
102   MPI_Request request;
103
104   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
105   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
106 }
107
108 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
109   MPI_Request requests[2];
110   MPI_Status stats[2];
111
112   requests[0] = smpi_mpi_isend(sendbuf, sendcount, sendtype, dst, sendtag, comm);
113   requests[1] = smpi_mpi_irecv(recvbuf, recvcount, recvtype, src, recvtag, comm);
114   smpi_mpi_waitall(2, requests, stats);
115   if(status != MPI_STATUS_IGNORE) {
116     // Copy receive status
117     memcpy(status, &stats[1], sizeof(MPI_Status));
118   }
119 }
120
121 static void finish_wait(MPI_Request* request, MPI_Status* status) {
122   MPI_Request data = (*request)->data;
123
124   xbt_assert0(data != MPI_REQUEST_NULL, "Erroneous situation");
125   if(status != MPI_STATUS_IGNORE) {
126     status->MPI_SOURCE = (*request)->src;
127     status->MPI_TAG = (*request)->tag;
128     status->MPI_ERROR = MPI_SUCCESS;
129     status->_count = (*request)->size; // size in bytes
130     status->_cancelled = 0;            // FIXME: cancellation of requests not handled yet
131   }
132   DEBUG3("finishing wait for %p [data = %p, complete = %d]", *request, data, data->complete);
133   // data == *request if sender is first to finish its wait
134   // data != *request if receiver is first to finish its wait
135   if(data->complete == 0) {
136     // first arrives here
137     data->complete = 1;
138     if(data != *request) {
139       // receveiver cleans its part
140       xbt_free(*request);
141     }
142   } else {
143     // second arrives here
144     if(data != *request) {
145       // receiver cleans everything
146       xbt_free(data);
147     }
148     SIMIX_rdv_destroy((*request)->rdv);
149     xbt_free(*request);
150   }
151   *request = MPI_REQUEST_NULL;
152 }
153
154 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
155   MPI_Request data = (*request)->data;
156   int flag = data && data->complete == 1;
157
158   if(flag) {
159     SIMIX_communication_destroy((*request)->pair);
160     finish_wait(request, status);
161   }
162   return flag;
163 }
164
165 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
166   MPI_Request data;
167   int i, flag;
168
169   *index = MPI_UNDEFINED;
170   flag = 0;
171   for(i = 0; i < count; i++) {
172     if(requests[i] != MPI_REQUEST_NULL) {
173       data = requests[i]->data;
174       if(data != MPI_REQUEST_NULL && data->complete == 1) {
175         SIMIX_communication_destroy(requests[i]->pair);
176         finish_wait(&requests[i], status);
177         *index = i;
178         flag = 1;
179         break;
180       }
181     }
182   }
183   return flag;
184 }
185
186
187 void smpi_mpi_get_count(MPI_Status *status, MPI_Datatype datatype, int *count) {
188  int size = smpi_datatype_size(datatype);
189           *count = (int)(status->_count / size);
190           if ( (int)((*count) * size) != status->_count )
191                     *count = MPI_UNDEFINED;
192 }
193
194
195 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
196   MPI_Request data = (*request)->data;
197
198   DEBUG6("wait for request %p (%p: %p) [src = %d, dst = %d, tag = %d]",
199          *request, (*request)->pair, data, (*request)->src, (*request)->dst, (*request)->tag);
200   // data is null if receiver waits before sender enters the rdv
201   if(data == MPI_REQUEST_NULL || data->complete == 0) {
202     SIMIX_network_wait((*request)->pair, -1.0);
203   } else {
204     SIMIX_communication_destroy((*request)->pair);
205   }
206   finish_wait(request, status);
207 }
208
209 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
210   xbt_dynar_t comms;
211   MPI_Request data;
212   int i, size, index;
213   int* map;
214
215   index = MPI_UNDEFINED;
216   if(count > 0) {
217     // First check for already completed requests
218     for(i = 0; i < count; i++) {
219       if(requests[i] != MPI_REQUEST_NULL) {
220         data = requests[i]->data;
221         if(data != MPI_REQUEST_NULL && data->complete == 1) {
222           index = i;
223           SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
224           break;
225         }
226       }
227     }
228     if(index == MPI_UNDEFINED) {
229       // Otherwise, wait for a request to complete
230       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
231       map = xbt_new(int, count);
232       size = 0;
233       DEBUG0("Wait for one of");
234       for(i = 0; i < count; i++) {
235         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
236          DEBUG4("   request %p [src = %d, dst = %d, tag = %d]",
237                 requests[i], requests[i]->src, requests[i]->dst, requests[i]->tag);
238           xbt_dynar_push(comms, &requests[i]->pair);
239           map[size] = i;
240           size++;
241         }
242       }
243       if(size > 0) {
244         index = SIMIX_network_waitany(comms);
245         index = map[index];
246       }
247       xbt_free(map);
248       xbt_dynar_free_container(&comms);
249     }
250     if(index != MPI_UNDEFINED) {
251       finish_wait(&requests[index], status);
252     }
253   }
254   return index;
255 }
256
257 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
258   int index;
259   MPI_Status stat;
260
261   while(count > 0) {
262     index = smpi_mpi_waitany(count, requests, &stat);
263     if(index == MPI_UNDEFINED) {
264       break;
265     }
266     if(status != MPI_STATUS_IGNORE) {
267       memcpy(&status[index], &stat, sizeof(stat));
268     }
269     // Move the last request to the found position
270     requests[index] = requests[count - 1];
271     requests[count - 1] = MPI_REQUEST_NULL;
272     count--;
273   }
274 }
275
276 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
277   MPI_Request data;
278   int i, count;
279
280   count = 0;
281   for(i = 0; i < incount; i++) {
282     if(requests[i] != MPI_REQUEST_NULL) {
283       data = requests[i]->data;
284       if(data != MPI_REQUEST_NULL && data->complete == 1) {
285         SIMIX_communication_destroy(requests[i]->pair);
286         finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
287         indices[count] = i;
288         count++;
289       }
290     }
291   }
292   return count;
293 }
294
295 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
296   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
297   nary_tree_bcast(buf, count, datatype, root, comm, 4);
298 }
299
300 void smpi_mpi_barrier(MPI_Comm comm) {
301   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
302   nary_tree_barrier(comm, 4);
303 }
304
305 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
306   int system_tag = 666;
307   int rank, size, src, index, sendsize, recvsize;
308   MPI_Request* requests;
309
310   rank = smpi_comm_rank(comm);
311   size = smpi_comm_size(comm);
312   if(rank != root) {
313     // Send buffer to root
314     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
315   } else {
316     sendsize = smpi_datatype_size(sendtype);
317     recvsize = smpi_datatype_size(recvtype);
318     // Local copy from root
319     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
320     // Receive buffers from senders
321     requests = xbt_new(MPI_Request, size - 1);
322     index = 0;
323     for(src = 0; src < size; src++) {
324       if(src != root) {
325         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
326         index++;
327       }
328     }
329     // Wait for completion of irecv's.
330     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
331     xbt_free(requests);
332   }
333 }
334
335 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
336   int system_tag = 666;
337   int rank, size, src, index, sendsize;
338   MPI_Request* requests;
339
340   rank = smpi_comm_rank(comm);
341   size = smpi_comm_size(comm);
342   if(rank != root) {
343     // Send buffer to root
344     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
345   } else {
346     sendsize = smpi_datatype_size(sendtype);
347     // Local copy from root
348     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
349     // Receive buffers from senders
350     requests = xbt_new(MPI_Request, size - 1);
351     index = 0;
352     for(src = 0; src < size; src++) {
353       if(src != root) {
354         requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
355         index++;
356       }
357     }
358     // Wait for completion of irecv's.
359     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
360     xbt_free(requests);
361   }
362 }
363
364 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
365   int system_tag = 666;
366   int rank, size, other, index, sendsize, recvsize;
367   MPI_Request* requests;
368
369   rank = smpi_comm_rank(comm);
370   size = smpi_comm_size(comm);
371   sendsize = smpi_datatype_size(sendtype);
372   recvsize = smpi_datatype_size(recvtype);
373   // Local copy from self
374   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
375   // Send/Recv buffers to/from others;
376   requests = xbt_new(MPI_Request, 2 * (size - 1));
377   index = 0;
378   for(other = 0; other < size; other++) {
379     if(other != rank) {
380       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
381       index++;
382       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
383       index++;
384     }
385   }
386   // Wait for completion of all comms.
387   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
388   xbt_free(requests);
389 }
390
391 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
392   int system_tag = 666;
393   int rank, size, other, index, sendsize, recvsize;
394   MPI_Request* requests;
395
396   rank = smpi_comm_rank(comm);
397   size = smpi_comm_size(comm);
398   sendsize = smpi_datatype_size(sendtype);
399   recvsize = smpi_datatype_size(recvtype);
400   // Local copy from self
401   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
402   // Send buffers to others;
403   requests = xbt_new(MPI_Request, 2 * (size - 1));
404   index = 0;
405   for(other = 0; other < size; other++) {
406     if(other != rank) {
407       requests[index] = smpi_mpi_isend(sendbuf, sendcount, sendtype, other, system_tag, comm);
408       index++;
409       requests[index] = smpi_mpi_irecv(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
410       index++;
411     }
412   }
413   // Wait for completion of all comms.
414   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
415   xbt_free(requests);
416 }
417
418 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
419   int system_tag = 666;
420   int rank, size, dst, index, sendsize, recvsize;
421   MPI_Request* requests;
422
423   rank = smpi_comm_rank(comm);
424   size = smpi_comm_size(comm);
425   if(rank != root) {
426     // Recv buffer from root
427     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
428   } else {
429     sendsize = smpi_datatype_size(sendtype);
430     recvsize = smpi_datatype_size(recvtype);
431     // Local copy from root
432     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
433     // Send buffers to receivers
434     requests = xbt_new(MPI_Request, size - 1);
435     index = 0;
436     for(dst = 0; dst < size; dst++) {
437       if(dst != root) {
438         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
439         index++;
440       }
441     }
442     // Wait for completion of isend's.
443     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
444     xbt_free(requests);
445   }
446 }
447
448 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
449   int system_tag = 666;
450   int rank, size, dst, index, sendsize, recvsize;
451   MPI_Request* requests;
452
453   rank = smpi_comm_rank(comm);
454   size = smpi_comm_size(comm);
455   if(rank != root) {
456     // Recv buffer from root
457     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
458   } else {
459     sendsize = smpi_datatype_size(sendtype);
460     recvsize = smpi_datatype_size(recvtype);
461     // Local copy from root
462     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
463     // Send buffers to receivers
464     requests = xbt_new(MPI_Request, size - 1);
465     index = 0;
466     for(dst = 0; dst < size; dst++) {
467       if(dst != root) {
468         requests[index] = smpi_mpi_isend(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
469         index++;
470       }
471     }
472     // Wait for completion of isend's.
473     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
474     xbt_free(requests);
475   }
476 }
477
478 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
479   int system_tag = 666;
480   int rank, size, src, index, datasize;
481   MPI_Request* requests;
482   void** tmpbufs;
483
484   rank = smpi_comm_rank(comm);
485   size = smpi_comm_size(comm);
486   if(rank != root) {
487     // Send buffer to root
488     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
489   } else {
490     datasize = smpi_datatype_size(datatype);
491     // Local copy from root
492     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
493     // Receive buffers from senders
494     //TODO: make a MPI_barrier here ?
495     requests = xbt_new(MPI_Request, size - 1);
496     tmpbufs = xbt_new(void*, size - 1);
497     index = 0;
498     for(src = 0; src < size; src++) {
499       if(src != root) {
500         tmpbufs[index] = xbt_malloc(count * datasize);
501         requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, src, system_tag, comm);
502         index++;
503       }
504     }
505     // Wait for completion of irecv's.
506     for(src = 0; src < size - 1; src++) {
507       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
508       if(index == MPI_UNDEFINED) {
509         break;
510       }
511       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
512     }
513     for(index = 0; index < size - 1; index++) {
514       xbt_free(tmpbufs[index]);
515     }
516     xbt_free(tmpbufs);
517     xbt_free(requests);
518   }
519 }
520
521 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
522   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
523   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
524
525 /*
526 FIXME: buggy implementation
527
528   int system_tag = 666;
529   int rank, size, other, index, datasize;
530   MPI_Request* requests;
531   void** tmpbufs;
532
533   rank = smpi_comm_rank(comm);
534   size = smpi_comm_size(comm);
535   datasize = smpi_datatype_size(datatype);
536   // Local copy from self
537   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
538   // Send/Recv buffers to/from others;
539   //TODO: make a MPI_barrier here ?
540   requests = xbt_new(MPI_Request, 2 * (size - 1));
541   tmpbufs = xbt_new(void*, size - 1);
542   index = 0;
543   for(other = 0; other < size; other++) {
544     if(other != rank) {
545       tmpbufs[index / 2] = xbt_malloc(count * datasize);
546       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
547       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
548       index += 2;
549     }
550   }
551   // Wait for completion of all comms.
552   for(other = 0; other < 2 * (size - 1); other++) {
553     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
554     if(index == MPI_UNDEFINED) {
555       break;
556     }
557     if((index & 1) == 1) {
558       // Request is odd: it's a irecv
559       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
560     }
561   }
562   for(index = 0; index < size - 1; index++) {
563     xbt_free(tmpbufs[index]);
564   }
565   xbt_free(tmpbufs);
566   xbt_free(requests);
567 */
568 }
569
570 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
571   int system_tag = 666;
572   int rank, size, other, index, datasize;
573   int total;
574   MPI_Request* requests;
575   void** tmpbufs;
576
577   rank = smpi_comm_rank(comm);
578   size = smpi_comm_size(comm);
579   datasize = smpi_datatype_size(datatype);
580   // Local copy from self
581   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
582   // Send/Recv buffers to/from others;
583   total = rank + (size - (rank + 1));
584   requests = xbt_new(MPI_Request, total);
585   tmpbufs = xbt_new(void*, rank);
586   index = 0;
587   for(other = 0; other < rank; other++) {
588     tmpbufs[index] = xbt_malloc(count * datasize);
589     requests[index] = smpi_mpi_irecv(tmpbufs[index], count, datatype, other, system_tag, comm);
590     index++;
591   }
592   for(other = rank + 1; other < size; other++) {
593     requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
594     index++;
595   }
596   // Wait for completion of all comms.
597   for(other = 0; other < total; other++) {
598     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
599     if(index == MPI_UNDEFINED) {
600       break;
601     }
602     if(index < rank) {
603       // #Request is below rank: it's a irecv
604       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
605     }
606   }
607   for(index = 0; index < size - 1; index++) {
608     xbt_free(tmpbufs[index]);
609   }
610   xbt_free(tmpbufs);
611   xbt_free(requests);
612 }