Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix copyright headers
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 #define EAGER_LIMIT 65536
23 #define RDV_TAG     (-10)
24
25 void smpi_process_init(int* argc, char*** argv) {
26   int index;
27   smpi_process_data_t data;
28   smx_process_t proc;
29
30   proc = SIMIX_process_self();
31   index = atoi((*argv)[1]);
32   data = smpi_process_remote_data(index);
33   SIMIX_process_set_data(proc, data);
34   if (*argc > 2) {
35     free((*argv)[1]);
36     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
37     (*argv)[(*argc) - 1] = NULL;
38   }
39   (*argc)--;
40   DEBUG2("<%d> New process in the game: %p", index, proc);
41 }
42
43 void smpi_process_destroy(void) {
44   int index = smpi_process_index();
45
46   DEBUG1("<%d> Process left the game", index);
47 }
48
49 static MPI_Request build_request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) {
50   MPI_Request request;
51
52   request = xbt_new(s_smpi_mpi_request_t, 1);
53   request->buf = buf;
54   request->size = smpi_datatype_size(datatype) * count;
55   request->src = src;
56   request->dst = dst;
57   request->tag = tag;
58   request->comm = comm;
59   request->rdv = NULL;
60   request->pair = NULL;
61   request->complete = 0;
62   request->match = MPI_REQUEST_NULL;
63   request->flags = flags;
64   if(request->size <= EAGER_LIMIT) {
65     request->ack = MPI_REQUEST_NULL;
66   } else {
67     request->ack = xbt_new(s_smpi_mpi_request_t, 1);
68     request->ack->buf = NULL;
69     request->ack->size = 0;
70     request->ack->src = dst;
71     request->ack->dst = src;
72     request->ack->tag = RDV_TAG;
73     request->ack->comm = comm;
74     request->ack->rdv = NULL;
75     request->ack->pair = NULL;
76     request->ack->complete = 0;
77     request->ack->match = MPI_REQUEST_NULL;
78     request->ack->flags = NON_PERSISTENT | ((request->flags & RECV) == RECV ? SEND : RECV);
79     smpi_mpi_start(request->ack);
80   }
81   return request;
82 }
83
84 /* MPI Low level calls */
85 MPI_Request smpi_mpi_send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
86   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, PERSISTENT | SEND);
87
88   return request;
89 }
90
91 MPI_Request smpi_mpi_recv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
92   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, PERSISTENT | RECV);
93
94   return request;
95 }
96
97 void smpi_mpi_start(MPI_Request request) {
98   xbt_assert0(request->complete == 0, "Cannot start a non-finished communication");
99   if(request->size > EAGER_LIMIT) {
100     print_request("RDV ack", request->ack);
101     smpi_mpi_wait(&request->ack, MPI_STATUS_IGNORE);
102   }
103   if((request->flags & RECV) == RECV) {
104     smpi_process_post_recv(request);
105     print_request("New recv", request);
106     request->pair = SIMIX_network_irecv(request->rdv, request->buf, &request->size);
107   } else {
108     smpi_process_post_send(request->comm, request); // FIXME
109     print_request("New send", request);
110     request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, request->buf, request->size, NULL);
111   }
112 }
113
114 void smpi_mpi_startall(int count, MPI_Request* requests) {
115   int i;
116
117   for(i = 0; i < count; i++) {
118     smpi_mpi_start(requests[i]);
119   }
120 }
121
122 void smpi_mpi_request_free(MPI_Request* request) {
123   xbt_free(*request);
124   *request = MPI_REQUEST_NULL;
125 }
126
127 MPI_Request smpi_isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
128   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, NON_PERSISTENT | SEND);
129
130   return request;
131 }
132
133 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
134   MPI_Request request = smpi_isend_init(buf, count, datatype, dst, tag, comm);
135
136   smpi_mpi_start(request);
137   return request;
138 }
139
140 MPI_Request smpi_irecv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
141   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, NON_PERSISTENT | RECV);
142
143   return request;
144 }
145
146 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
147   MPI_Request request = smpi_irecv_init(buf, count, datatype, src, tag, comm);
148
149   smpi_mpi_start(request);
150   return request;
151 }
152
153 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
154   MPI_Request request;
155
156   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
157   smpi_mpi_wait(&request, status);
158 }
159
160 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
161   MPI_Request request;
162
163   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
164   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
165 }
166
167 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
168   MPI_Request requests[2];
169   MPI_Status stats[2];
170
171   requests[0] = smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
172   requests[1] = smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
173   smpi_mpi_startall(2, requests);
174   smpi_mpi_waitall(2, requests, stats);
175   if(status != MPI_STATUS_IGNORE) {
176     // Copy receive status
177     memcpy(status, &stats[1], sizeof(MPI_Status));
178   }
179 }
180
181 static void finish_wait(MPI_Request* request, MPI_Status* status) {
182   if(status != MPI_STATUS_IGNORE) {
183     status->MPI_SOURCE = (*request)->src;
184     status->MPI_TAG = (*request)->tag;
185     status->MPI_ERROR = MPI_SUCCESS;
186   }
187   print_request("finishing wait", *request);
188   if((*request)->complete == 1) {
189     SIMIX_rdv_destroy((*request)->rdv);
190   } else {
191     (*request)->match->complete = 1;
192     (*request)->match->match = MPI_REQUEST_NULL;
193   }
194   if(((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
195     smpi_mpi_request_free(request);
196   }
197 }
198
199 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
200   int flag = (*request)->complete;
201
202   if(flag) {
203     SIMIX_communication_destroy((*request)->pair);
204     finish_wait(request, status);
205   }
206   return flag;
207 }
208
209 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
210   int i, flag;
211
212   *index = MPI_UNDEFINED;
213   flag = 0;
214   for(i = 0; i < count; i++) {
215     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
216       SIMIX_communication_destroy(requests[i]->pair);
217       finish_wait(&requests[i], status);
218       *index = i;
219       flag = 1;
220       break;
221     }
222   }
223   return flag;
224 }
225
226 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
227   print_request("wait", *request);
228   // data is null if receiver waits before sender enters the rdv
229   if((*request)->complete) {
230     SIMIX_communication_destroy((*request)->pair);
231   } else {
232     SIMIX_network_wait((*request)->pair, -1.0);
233   }
234   finish_wait(request, status);
235 }
236
237 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
238   xbt_dynar_t comms;
239   int i, size, index;
240   int* map;
241
242   index = MPI_UNDEFINED;
243   if(count > 0) {
244     // First check for already completed requests
245     for(i = 0; i < count; i++) {
246       if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
247         index = i;
248         SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
249         break;
250       }
251     }
252     if(index == MPI_UNDEFINED) {
253       // Otherwise, wait for a request to complete
254       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
255       map = xbt_new(int, count);
256       size = 0;
257       DEBUG0("Wait for one of");
258       for(i = 0; i < count; i++) {
259         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
260           print_request("   ", requests[i]);
261           xbt_dynar_push(comms, &requests[i]->pair);
262           map[size] = i;
263           size++;
264         }
265       }
266       if(size > 0) {
267         index = SIMIX_network_waitany(comms);
268         index = map[index];
269       }
270       xbt_free(map);
271       xbt_dynar_free(&comms);
272     }
273     if(index != MPI_UNDEFINED) {
274       finish_wait(&requests[index], status);
275     }
276   }
277   return index;
278 }
279
280 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
281   int index;
282   MPI_Status stat;
283
284   while(count > 0) {
285     index = smpi_mpi_waitany(count, requests, &stat);
286     if(index == MPI_UNDEFINED) {
287       break;
288     }
289     if(status != MPI_STATUS_IGNORE) {
290       memcpy(&status[index], &stat, sizeof(stat));
291     }
292     // FIXME: check this -v
293     // Move the last request to the found position
294     requests[index] = requests[count - 1];
295     requests[count - 1] = MPI_REQUEST_NULL;
296     count--;
297   }
298 }
299
300 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
301   int i, count;
302
303   count = 0;
304   for(i = 0; i < incount; i++) {
305     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
306       SIMIX_communication_destroy(requests[i]->pair);
307       finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
308       indices[count] = i;
309       count++;
310     }
311   }
312   return count;
313 }
314
315 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
316   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
317   nary_tree_bcast(buf, count, datatype, root, comm, 4);
318 }
319
320 void smpi_mpi_barrier(MPI_Comm comm) {
321   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
322   nary_tree_barrier(comm, 4);
323 }
324
325 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
326   int system_tag = 666;
327   int rank, size, src, index, sendsize, recvsize;
328   MPI_Request* requests;
329
330   rank = smpi_comm_rank(comm);
331   size = smpi_comm_size(comm);
332   if(rank != root) {
333     // Send buffer to root
334     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
335   } else {
336     sendsize = smpi_datatype_size(sendtype);
337     recvsize = smpi_datatype_size(recvtype);
338     // Local copy from root
339     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
340     // Receive buffers from senders
341     requests = xbt_new(MPI_Request, size - 1);
342     index = 0;
343     for(src = 0; src < size; src++) {
344       if(src != root) {
345         requests[index] = smpi_irecv_init(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
346         index++;
347       }
348     }
349     // Wait for completion of irecv's.
350     smpi_mpi_startall(size - 1, requests);
351     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
352     xbt_free(requests);
353   }
354 }
355
356 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
357   int system_tag = 666;
358   int rank, size, src, index, sendsize;
359   MPI_Request* requests;
360
361   rank = smpi_comm_rank(comm);
362   size = smpi_comm_size(comm);
363   if(rank != root) {
364     // Send buffer to root
365     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
366   } else {
367     sendsize = smpi_datatype_size(sendtype);
368     // Local copy from root
369     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
370     // Receive buffers from senders
371     requests = xbt_new(MPI_Request, size - 1);
372     index = 0;
373     for(src = 0; src < size; src++) {
374       if(src != root) {
375         requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
376         index++;
377       }
378     }
379     // Wait for completion of irecv's.
380     smpi_mpi_startall(size - 1, requests);
381     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
382     xbt_free(requests);
383   }
384 }
385
386 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
387   int system_tag = 666;
388   int rank, size, other, index, sendsize, recvsize;
389   MPI_Request* requests;
390
391   rank = smpi_comm_rank(comm);
392   size = smpi_comm_size(comm);
393   sendsize = smpi_datatype_size(sendtype);
394   recvsize = smpi_datatype_size(recvtype);
395   // Local copy from self
396   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
397   // Send/Recv buffers to/from others;
398   requests = xbt_new(MPI_Request, 2 * (size - 1));
399   index = 0;
400   for(other = 0; other < size; other++) {
401     if(other != rank) {
402       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
403       index++;
404       requests[index] = smpi_irecv_init(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
405       index++;
406     }
407   }
408   // Wait for completion of all comms.
409   smpi_mpi_startall(2 * (size - 1), requests);
410   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
411   xbt_free(requests);
412 }
413
414 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
415   int system_tag = 666;
416   int rank, size, other, index, sendsize, recvsize;
417   MPI_Request* requests;
418
419   rank = smpi_comm_rank(comm);
420   size = smpi_comm_size(comm);
421   sendsize = smpi_datatype_size(sendtype);
422   recvsize = smpi_datatype_size(recvtype);
423   // Local copy from self
424   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
425   // Send buffers to others;
426   requests = xbt_new(MPI_Request, 2 * (size - 1));
427   index = 0;
428   for(other = 0; other < size; other++) {
429     if(other != rank) {
430       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
431       index++;
432       requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
433       index++;
434     }
435   }
436   // Wait for completion of all comms.
437   smpi_mpi_startall(2 * (size - 1), requests);
438   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
439   xbt_free(requests);
440 }
441
442 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
443   int system_tag = 666;
444   int rank, size, dst, index, sendsize, recvsize;
445   MPI_Request* requests;
446
447   rank = smpi_comm_rank(comm);
448   size = smpi_comm_size(comm);
449   if(rank != root) {
450     // Recv buffer from root
451     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
452   } else {
453     sendsize = smpi_datatype_size(sendtype);
454     recvsize = smpi_datatype_size(recvtype);
455     // Local copy from root
456     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
457     // Send buffers to receivers
458     requests = xbt_new(MPI_Request, size - 1);
459     index = 0;
460     for(dst = 0; dst < size; dst++) {
461       if(dst != root) {
462         requests[index] = smpi_isend_init(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
463         index++;
464       }
465     }
466     // Wait for completion of isend's.
467     smpi_mpi_startall(size - 1, requests);
468     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
469     xbt_free(requests);
470   }
471 }
472
473 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
474   int system_tag = 666;
475   int rank, size, dst, index, sendsize, recvsize;
476   MPI_Request* requests;
477
478   rank = smpi_comm_rank(comm);
479   size = smpi_comm_size(comm);
480   if(rank != root) {
481     // Recv buffer from root
482     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
483   } else {
484     sendsize = smpi_datatype_size(sendtype);
485     recvsize = smpi_datatype_size(recvtype);
486     // Local copy from root
487     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
488     // Send buffers to receivers
489     requests = xbt_new(MPI_Request, size - 1);
490     index = 0;
491     for(dst = 0; dst < size; dst++) {
492       if(dst != root) {
493         requests[index] = smpi_isend_init(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
494         index++;
495       }
496     }
497     // Wait for completion of isend's.
498     smpi_mpi_startall(size - 1, requests);
499     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
500     xbt_free(requests);
501   }
502 }
503
504 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
505   int system_tag = 666;
506   int rank, size, src, index, datasize;
507   MPI_Request* requests;
508   void** tmpbufs;
509
510   rank = smpi_comm_rank(comm);
511   size = smpi_comm_size(comm);
512   if(rank != root) {
513     // Send buffer to root
514     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
515   } else {
516     datasize = smpi_datatype_size(datatype);
517     // Local copy from root
518     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
519     // Receive buffers from senders
520     //TODO: make a MPI_barrier here ?
521     requests = xbt_new(MPI_Request, size - 1);
522     tmpbufs = xbt_new(void*, size - 1);
523     index = 0;
524     for(src = 0; src < size; src++) {
525       if(src != root) {
526         tmpbufs[index] = xbt_malloc(count * datasize);
527         requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
528         index++;
529       }
530     }
531     // Wait for completion of irecv's.
532     smpi_mpi_startall(size - 1, requests);
533     for(src = 0; src < size - 1; src++) {
534       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
535       if(index == MPI_UNDEFINED) {
536         break;
537       }
538       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
539     }
540     for(index = 0; index < size - 1; index++) {
541       xbt_free(tmpbufs[index]);
542     }
543     xbt_free(tmpbufs);
544     xbt_free(requests);
545   }
546 }
547
548 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
549   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
550   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
551
552 /*
553 FIXME: buggy implementation
554
555   int system_tag = 666;
556   int rank, size, other, index, datasize;
557   MPI_Request* requests;
558   void** tmpbufs;
559
560   rank = smpi_comm_rank(comm);
561   size = smpi_comm_size(comm);
562   datasize = smpi_datatype_size(datatype);
563   // Local copy from self
564   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
565   // Send/Recv buffers to/from others;
566   //TODO: make a MPI_barrier here ?
567   requests = xbt_new(MPI_Request, 2 * (size - 1));
568   tmpbufs = xbt_new(void*, size - 1);
569   index = 0;
570   for(other = 0; other < size; other++) {
571     if(other != rank) {
572       tmpbufs[index / 2] = xbt_malloc(count * datasize);
573       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
574       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
575       index += 2;
576     }
577   }
578   // Wait for completion of all comms.
579   for(other = 0; other < 2 * (size - 1); other++) {
580     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
581     if(index == MPI_UNDEFINED) {
582       break;
583     }
584     if((index & 1) == 1) {
585       // Request is odd: it's a irecv
586       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
587     }
588   }
589   for(index = 0; index < size - 1; index++) {
590     xbt_free(tmpbufs[index]);
591   }
592   xbt_free(tmpbufs);
593   xbt_free(requests);
594 */
595 }
596
597 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
598   int system_tag = 666;
599   int rank, size, other, index, datasize;
600   int total;
601   MPI_Request* requests;
602   void** tmpbufs;
603
604   rank = smpi_comm_rank(comm);
605   size = smpi_comm_size(comm);
606   datasize = smpi_datatype_size(datatype);
607   // Local copy from self
608   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
609   // Send/Recv buffers to/from others;
610   total = rank + (size - (rank + 1));
611   requests = xbt_new(MPI_Request, total);
612   tmpbufs = xbt_new(void*, rank);
613   index = 0;
614   for(other = 0; other < rank; other++) {
615     tmpbufs[index] = xbt_malloc(count * datasize);
616     requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
617     index++;
618   }
619   for(other = rank + 1; other < size; other++) {
620     requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
621     index++;
622   }
623   // Wait for completion of all comms.
624   smpi_mpi_startall(size - 1, requests);
625   for(other = 0; other < total; other++) {
626     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
627     if(index == MPI_UNDEFINED) {
628       break;
629     }
630     if(index < rank) {
631       // #Request is below rank: it's a irecv
632       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
633     }
634   }
635   for(index = 0; index < size - 1; index++) {
636     xbt_free(tmpbufs[index]);
637   }
638   xbt_free(tmpbufs);
639   xbt_free(requests);
640 }