Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
new tracing mask TRACE_VOLUME to trace the msg tasks communication size and group...
[simgrid.git] / src / smpi / smpi_base.c
1 #include "private.h"
2 #include "xbt/time.h"
3
4 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
5                                 "Logging specific to SMPI (base)");
6 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
7 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
8 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
9 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
10 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
11 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
15
16 #define EAGER_LIMIT 65536
17 #define RDV_TAG     (-10)
18
19 void smpi_process_init(int* argc, char*** argv) {
20   int index;
21   smpi_process_data_t data;
22   smx_process_t proc;
23
24   proc = SIMIX_process_self();
25   index = atoi((*argv)[1]);
26   data = smpi_process_remote_data(index);
27   SIMIX_process_set_data(proc, data);
28   if (*argc > 2) {
29     free((*argv)[1]);
30     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
31     (*argv)[(*argc) - 1] = NULL;
32   }
33   (*argc)--;
34   DEBUG2("<%d> New process in the game: %p", index, proc);
35 }
36
37 void smpi_process_destroy(void) {
38   int index = smpi_process_index();
39
40   DEBUG1("<%d> Process left the game", index);
41 }
42
43 static MPI_Request build_request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) {
44   MPI_Request request;
45
46   request = xbt_new(s_smpi_mpi_request_t, 1);
47   request->buf = buf;
48   request->size = smpi_datatype_size(datatype) * count;
49   request->src = src;
50   request->dst = dst;
51   request->tag = tag;
52   request->comm = comm;
53   request->rdv = NULL;
54   request->pair = NULL;
55   request->complete = 0;
56   request->match = MPI_REQUEST_NULL;
57   request->flags = flags;
58   if(request->size <= EAGER_LIMIT) {
59     request->ack = MPI_REQUEST_NULL;
60   } else {
61     request->ack = xbt_new(s_smpi_mpi_request_t, 1);
62     request->ack->buf = NULL;
63     request->ack->size = 0;
64     request->ack->src = dst;
65     request->ack->dst = src;
66     request->ack->tag = RDV_TAG;
67     request->ack->comm = comm;
68     request->ack->rdv = NULL;
69     request->ack->pair = NULL;
70     request->ack->complete = 0;
71     request->ack->match = MPI_REQUEST_NULL;
72     request->ack->flags = NON_PERSISTENT | ((request->flags & RECV) == RECV ? SEND : RECV);
73     smpi_mpi_start(request->ack);
74   }
75   return request;
76 }
77
78 /* MPI Low level calls */
79 MPI_Request smpi_mpi_send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
80   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, PERSISTENT | SEND);
81
82   return request;
83 }
84
85 MPI_Request smpi_mpi_recv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
86   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, PERSISTENT | RECV);
87
88   return request;
89 }
90
91 void smpi_mpi_start(MPI_Request request) {
92   xbt_assert0(request->complete == 0, "Cannot start a non-finished communication");
93   if(request->size > EAGER_LIMIT) {
94     print_request("RDV ack", request->ack);
95     smpi_mpi_wait(&request->ack, MPI_STATUS_IGNORE);
96   }
97   if((request->flags & RECV) == RECV) {
98     smpi_process_post_recv(request);
99     print_request("New recv", request);
100     request->pair = SIMIX_network_irecv(request->rdv, request->buf, &request->size);
101   } else {
102     smpi_process_post_send(request->comm, request); // FIXME
103     print_request("New send", request);
104     request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, request->buf, request->size, NULL);
105   }
106 }
107
108 void smpi_mpi_startall(int count, MPI_Request* requests) {
109   int i;
110
111   for(i = 0; i < count; i++) {
112     smpi_mpi_start(requests[i]);
113   }
114 }
115
116 void smpi_mpi_request_free(MPI_Request* request) {
117   xbt_free(*request);
118   *request = MPI_REQUEST_NULL;
119 }
120
121 MPI_Request smpi_isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
122   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, NON_PERSISTENT | SEND);
123
124   return request;
125 }
126
127 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
128   MPI_Request request = smpi_isend_init(buf, count, datatype, dst, tag, comm);
129
130   smpi_mpi_start(request);
131   return request;
132 }
133
134 MPI_Request smpi_irecv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
135   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, NON_PERSISTENT | RECV);
136
137   return request;
138 }
139
140 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
141   MPI_Request request = smpi_irecv_init(buf, count, datatype, src, tag, comm);
142
143   smpi_mpi_start(request);
144   return request;
145 }
146
147 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
148   MPI_Request request;
149
150   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
151   smpi_mpi_wait(&request, status);
152 }
153
154 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
155   MPI_Request request;
156
157   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
158   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
159 }
160
161 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
162   MPI_Request requests[2];
163   MPI_Status stats[2];
164
165   requests[0] = smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
166   requests[1] = smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
167   smpi_mpi_startall(2, requests);
168   smpi_mpi_waitall(2, requests, stats);
169   if(status != MPI_STATUS_IGNORE) {
170     // Copy receive status
171     memcpy(status, &stats[1], sizeof(MPI_Status));
172   }
173 }
174
175 static void finish_wait(MPI_Request* request, MPI_Status* status) {
176   if(status != MPI_STATUS_IGNORE) {
177     status->MPI_SOURCE = (*request)->src;
178     status->MPI_TAG = (*request)->tag;
179     status->MPI_ERROR = MPI_SUCCESS;
180   }
181   print_request("finishing wait", *request);
182   if((*request)->complete == 1) {
183     SIMIX_rdv_destroy((*request)->rdv);
184   } else {
185     (*request)->match->complete = 1;
186     (*request)->match->match = MPI_REQUEST_NULL;
187   }
188   if(((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
189     smpi_mpi_request_free(request);
190   }
191 }
192
193 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
194   int flag = (*request)->complete;
195
196   if(flag) {
197     SIMIX_communication_destroy((*request)->pair);
198     finish_wait(request, status);
199   }
200   return flag;
201 }
202
203 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
204   int i, flag;
205
206   *index = MPI_UNDEFINED;
207   flag = 0;
208   for(i = 0; i < count; i++) {
209     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
210       SIMIX_communication_destroy(requests[i]->pair);
211       finish_wait(&requests[i], status);
212       *index = i;
213       flag = 1;
214       break;
215     }
216   }
217   return flag;
218 }
219
220 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
221   print_request("wait", *request);
222   // data is null if receiver waits before sender enters the rdv
223   if((*request)->complete) {
224     SIMIX_communication_destroy((*request)->pair);
225   } else {
226     SIMIX_network_wait((*request)->pair, -1.0);
227   }
228   finish_wait(request, status);
229 }
230
231 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
232   xbt_dynar_t comms;
233   int i, size, index;
234   int* map;
235
236   index = MPI_UNDEFINED;
237   if(count > 0) {
238     // First check for already completed requests
239     for(i = 0; i < count; i++) {
240       if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
241         index = i;
242         SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
243         break;
244       }
245     }
246     if(index == MPI_UNDEFINED) {
247       // Otherwise, wait for a request to complete
248       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
249       map = xbt_new(int, count);
250       size = 0;
251       DEBUG0("Wait for one of");
252       for(i = 0; i < count; i++) {
253         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
254           print_request("   ", requests[i]);
255           xbt_dynar_push(comms, &requests[i]->pair);
256           map[size] = i;
257           size++;
258         }
259       }
260       if(size > 0) {
261         index = SIMIX_network_waitany(comms);
262         index = map[index];
263       }
264       xbt_free(map);
265       xbt_dynar_free(&comms);
266     }
267     if(index != MPI_UNDEFINED) {
268       finish_wait(&requests[index], status);
269     }
270   }
271   return index;
272 }
273
274 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
275   int index;
276   MPI_Status stat;
277
278   while(count > 0) {
279     index = smpi_mpi_waitany(count, requests, &stat);
280     if(index == MPI_UNDEFINED) {
281       break;
282     }
283     if(status != MPI_STATUS_IGNORE) {
284       memcpy(&status[index], &stat, sizeof(stat));
285     }
286     // FIXME: check this -v
287     // Move the last request to the found position
288     requests[index] = requests[count - 1];
289     requests[count - 1] = MPI_REQUEST_NULL;
290     count--;
291   }
292 }
293
294 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
295   int i, count;
296
297   count = 0;
298   for(i = 0; i < incount; i++) {
299     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
300       SIMIX_communication_destroy(requests[i]->pair);
301       finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
302       indices[count] = i;
303       count++;
304     }
305   }
306   return count;
307 }
308
309 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
310   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
311   nary_tree_bcast(buf, count, datatype, root, comm, 4);
312 }
313
314 void smpi_mpi_barrier(MPI_Comm comm) {
315   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
316   nary_tree_barrier(comm, 4);
317 }
318
319 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
320   int system_tag = 666;
321   int rank, size, src, index, sendsize, recvsize;
322   MPI_Request* requests;
323
324   rank = smpi_comm_rank(comm);
325   size = smpi_comm_size(comm);
326   if(rank != root) {
327     // Send buffer to root
328     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
329   } else {
330     sendsize = smpi_datatype_size(sendtype);
331     recvsize = smpi_datatype_size(recvtype);
332     // Local copy from root
333     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
334     // Receive buffers from senders
335     requests = xbt_new(MPI_Request, size - 1);
336     index = 0;
337     for(src = 0; src < size; src++) {
338       if(src != root) {
339         requests[index] = smpi_irecv_init(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
340         index++;
341       }
342     }
343     // Wait for completion of irecv's.
344     smpi_mpi_startall(size - 1, requests);
345     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
346     xbt_free(requests);
347   }
348 }
349
350 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
351   int system_tag = 666;
352   int rank, size, src, index, sendsize;
353   MPI_Request* requests;
354
355   rank = smpi_comm_rank(comm);
356   size = smpi_comm_size(comm);
357   if(rank != root) {
358     // Send buffer to root
359     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
360   } else {
361     sendsize = smpi_datatype_size(sendtype);
362     // Local copy from root
363     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
364     // Receive buffers from senders
365     requests = xbt_new(MPI_Request, size - 1);
366     index = 0;
367     for(src = 0; src < size; src++) {
368       if(src != root) {
369         requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
370         index++;
371       }
372     }
373     // Wait for completion of irecv's.
374     smpi_mpi_startall(size - 1, requests);
375     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
376     xbt_free(requests);
377   }
378 }
379
380 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
381   int system_tag = 666;
382   int rank, size, other, index, sendsize, recvsize;
383   MPI_Request* requests;
384
385   rank = smpi_comm_rank(comm);
386   size = smpi_comm_size(comm);
387   sendsize = smpi_datatype_size(sendtype);
388   recvsize = smpi_datatype_size(recvtype);
389   // Local copy from self
390   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
391   // Send/Recv buffers to/from others;
392   requests = xbt_new(MPI_Request, 2 * (size - 1));
393   index = 0;
394   for(other = 0; other < size; other++) {
395     if(other != rank) {
396       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
397       index++;
398       requests[index] = smpi_irecv_init(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
399       index++;
400     }
401   }
402   // Wait for completion of all comms.
403   smpi_mpi_startall(2 * (size - 1), requests);
404   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
405   xbt_free(requests);
406 }
407
408 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
409   int system_tag = 666;
410   int rank, size, other, index, sendsize, recvsize;
411   MPI_Request* requests;
412
413   rank = smpi_comm_rank(comm);
414   size = smpi_comm_size(comm);
415   sendsize = smpi_datatype_size(sendtype);
416   recvsize = smpi_datatype_size(recvtype);
417   // Local copy from self
418   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
419   // Send buffers to others;
420   requests = xbt_new(MPI_Request, 2 * (size - 1));
421   index = 0;
422   for(other = 0; other < size; other++) {
423     if(other != rank) {
424       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
425       index++;
426       requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
427       index++;
428     }
429   }
430   // Wait for completion of all comms.
431   smpi_mpi_startall(2 * (size - 1), requests);
432   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
433   xbt_free(requests);
434 }
435
436 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
437   int system_tag = 666;
438   int rank, size, dst, index, sendsize, recvsize;
439   MPI_Request* requests;
440
441   rank = smpi_comm_rank(comm);
442   size = smpi_comm_size(comm);
443   if(rank != root) {
444     // Recv buffer from root
445     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
446   } else {
447     sendsize = smpi_datatype_size(sendtype);
448     recvsize = smpi_datatype_size(recvtype);
449     // Local copy from root
450     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
451     // Send buffers to receivers
452     requests = xbt_new(MPI_Request, size - 1);
453     index = 0;
454     for(dst = 0; dst < size; dst++) {
455       if(dst != root) {
456         requests[index] = smpi_isend_init(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
457         index++;
458       }
459     }
460     // Wait for completion of isend's.
461     smpi_mpi_startall(size - 1, requests);
462     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
463     xbt_free(requests);
464   }
465 }
466
467 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
468   int system_tag = 666;
469   int rank, size, dst, index, sendsize, recvsize;
470   MPI_Request* requests;
471
472   rank = smpi_comm_rank(comm);
473   size = smpi_comm_size(comm);
474   if(rank != root) {
475     // Recv buffer from root
476     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
477   } else {
478     sendsize = smpi_datatype_size(sendtype);
479     recvsize = smpi_datatype_size(recvtype);
480     // Local copy from root
481     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
482     // Send buffers to receivers
483     requests = xbt_new(MPI_Request, size - 1);
484     index = 0;
485     for(dst = 0; dst < size; dst++) {
486       if(dst != root) {
487         requests[index] = smpi_isend_init(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
488         index++;
489       }
490     }
491     // Wait for completion of isend's.
492     smpi_mpi_startall(size - 1, requests);
493     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
494     xbt_free(requests);
495   }
496 }
497
498 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
499   int system_tag = 666;
500   int rank, size, src, index, datasize;
501   MPI_Request* requests;
502   void** tmpbufs;
503
504   rank = smpi_comm_rank(comm);
505   size = smpi_comm_size(comm);
506   if(rank != root) {
507     // Send buffer to root
508     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
509   } else {
510     datasize = smpi_datatype_size(datatype);
511     // Local copy from root
512     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
513     // Receive buffers from senders
514     //TODO: make a MPI_barrier here ?
515     requests = xbt_new(MPI_Request, size - 1);
516     tmpbufs = xbt_new(void*, size - 1);
517     index = 0;
518     for(src = 0; src < size; src++) {
519       if(src != root) {
520         tmpbufs[index] = xbt_malloc(count * datasize);
521         requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
522         index++;
523       }
524     }
525     // Wait for completion of irecv's.
526     smpi_mpi_startall(size - 1, requests);
527     for(src = 0; src < size - 1; src++) {
528       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
529       if(index == MPI_UNDEFINED) {
530         break;
531       }
532       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
533     }
534     for(index = 0; index < size - 1; index++) {
535       xbt_free(tmpbufs[index]);
536     }
537     xbt_free(tmpbufs);
538     xbt_free(requests);
539   }
540 }
541
542 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
543   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
544   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
545
546 /*
547 FIXME: buggy implementation
548
549   int system_tag = 666;
550   int rank, size, other, index, datasize;
551   MPI_Request* requests;
552   void** tmpbufs;
553
554   rank = smpi_comm_rank(comm);
555   size = smpi_comm_size(comm);
556   datasize = smpi_datatype_size(datatype);
557   // Local copy from self
558   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
559   // Send/Recv buffers to/from others;
560   //TODO: make a MPI_barrier here ?
561   requests = xbt_new(MPI_Request, 2 * (size - 1));
562   tmpbufs = xbt_new(void*, size - 1);
563   index = 0;
564   for(other = 0; other < size; other++) {
565     if(other != rank) {
566       tmpbufs[index / 2] = xbt_malloc(count * datasize);
567       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
568       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
569       index += 2;
570     }
571   }
572   // Wait for completion of all comms.
573   for(other = 0; other < 2 * (size - 1); other++) {
574     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
575     if(index == MPI_UNDEFINED) {
576       break;
577     }
578     if((index & 1) == 1) {
579       // Request is odd: it's a irecv
580       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
581     }
582   }
583   for(index = 0; index < size - 1; index++) {
584     xbt_free(tmpbufs[index]);
585   }
586   xbt_free(tmpbufs);
587   xbt_free(requests);
588 */
589 }
590
591 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
592   int system_tag = 666;
593   int rank, size, other, index, datasize;
594   int total;
595   MPI_Request* requests;
596   void** tmpbufs;
597
598   rank = smpi_comm_rank(comm);
599   size = smpi_comm_size(comm);
600   datasize = smpi_datatype_size(datatype);
601   // Local copy from self
602   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
603   // Send/Recv buffers to/from others;
604   total = rank + (size - (rank + 1));
605   requests = xbt_new(MPI_Request, total);
606   tmpbufs = xbt_new(void*, rank);
607   index = 0;
608   for(other = 0; other < rank; other++) {
609     tmpbufs[index] = xbt_malloc(count * datasize);
610     requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
611     index++;
612   }
613   for(other = rank + 1; other < size; other++) {
614     requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
615     index++;
616   }
617   // Wait for completion of all comms.
618   smpi_mpi_startall(size - 1, requests);
619   for(other = 0; other < total; other++) {
620     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
621     if(index == MPI_UNDEFINED) {
622       break;
623     }
624     if(index < rank) {
625       // #Request is below rank: it's a irecv
626       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
627     }
628   }
629   for(index = 0; index < size - 1; index++) {
630     xbt_free(tmpbufs[index]);
631   }
632   xbt_free(tmpbufs);
633   xbt_free(requests);
634 }