Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2e192a05aec82b7da82982d164aac0b90ac4c355
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 #define EAGER_LIMIT 65536
23 #define RDV_TAG     (-10)
24
25 void smpi_process_init(int* argc, char*** argv) {
26   int index;
27   smpi_process_data_t data;
28   smx_process_t proc;
29
30   proc = SIMIX_process_self();
31   index = atoi((*argv)[1]);
32   data = smpi_process_remote_data(index);
33   SIMIX_process_set_data(proc, data);
34   if (*argc > 2) {
35     free((*argv)[1]);
36     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
37     (*argv)[(*argc) - 1] = NULL;
38   }
39   (*argc)--;
40   DEBUG2("<%d> New process in the game: %p", index, proc);
41 }
42
43 void smpi_process_destroy(void) {
44   int index = smpi_process_index();
45
46   DEBUG1("<%d> Process left the game", index);
47 }
48
49 static MPI_Request build_request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) {
50   MPI_Request request;
51
52   request = xbt_new(s_smpi_mpi_request_t, 1);
53   request->buf = buf;
54   request->size = smpi_datatype_size(datatype) * count;
55   request->src = src;
56   request->dst = dst;
57   request->tag = tag;
58   request->comm = comm;
59   request->rdv = NULL;
60   request->pair = NULL;
61   request->complete = 0;
62   request->match = MPI_REQUEST_NULL;
63   request->flags = flags;
64   if(request->size < EAGER_LIMIT) {
65     request->ack = MPI_REQUEST_NULL;
66   } else {
67     request->ack = xbt_new(s_smpi_mpi_request_t, 1);
68     request->ack->buf = NULL;
69     request->ack->size = 0;
70     request->ack->src = dst;
71     request->ack->dst = src;
72     request->ack->tag = RDV_TAG;
73     request->ack->comm = comm;
74     request->ack->rdv = NULL;
75     request->ack->pair = NULL;
76     request->ack->complete = 0;
77     request->ack->match = MPI_REQUEST_NULL;
78     request->ack->flags = NON_PERSISTENT | ((request->flags & RECV) == RECV ? SEND : RECV);
79     smpi_mpi_start(request->ack);
80   }
81   return request;
82 }
83
84 /* MPI Low level calls */
85 MPI_Request smpi_mpi_send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
86   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, PERSISTENT | SEND);
87
88   return request;
89 }
90
91 MPI_Request smpi_mpi_recv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
92   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, PERSISTENT | RECV);
93
94   return request;
95 }
96
97 void smpi_mpi_start(MPI_Request request) {
98   xbt_assert0(request->complete == 0, "Cannot start a non-finished communication");
99   if(request->size >= EAGER_LIMIT) {
100     print_request("RDV ack", request->ack);
101     smpi_mpi_wait(&request->ack, MPI_STATUS_IGNORE);
102   }
103   if((request->flags & RECV) == RECV) {
104     smpi_process_post_recv(request);
105     print_request("New recv", request);
106     request->pair = SIMIX_network_irecv(request->rdv, request->buf, &request->size);
107   } else {
108     smpi_process_post_send(request->comm, request); // FIXME
109     print_request("New send", request);
110     request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, request->buf, request->size, NULL);
111   }
112 }
113
114 void smpi_mpi_startall(int count, MPI_Request* requests) {
115   int i;
116
117   for(i = 0; i < count; i++) {
118     smpi_mpi_start(requests[i]);
119   }
120 }
121
122 void smpi_mpi_request_free(MPI_Request* request) {
123   xbt_free(*request);
124   *request = MPI_REQUEST_NULL;
125 }
126
127 MPI_Request smpi_isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
128   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, NON_PERSISTENT | SEND);
129
130   return request;
131 }
132
133 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
134   MPI_Request request = smpi_isend_init(buf, count, datatype, dst, tag, comm);
135
136   smpi_mpi_start(request);
137   return request;
138 }
139
140 MPI_Request smpi_irecv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
141   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, NON_PERSISTENT | RECV);
142
143   return request;
144 }
145
146 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
147   MPI_Request request = smpi_irecv_init(buf, count, datatype, src, tag, comm);
148
149   smpi_mpi_start(request);
150   return request;
151 }
152
153 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
154   MPI_Request request;
155
156   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
157   smpi_mpi_wait(&request, status);
158 }
159
160 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
161   MPI_Request request;
162
163   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
164   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
165 }
166
167 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
168   MPI_Request requests[2];
169   MPI_Status stats[2];
170
171   requests[0] = smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
172   requests[1] = smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
173   smpi_mpi_startall(2, requests);
174   smpi_mpi_waitall(2, requests, stats);
175   if(status != MPI_STATUS_IGNORE) {
176     // Copy receive status
177     memcpy(status, &stats[1], sizeof(MPI_Status));
178   }
179 }
180
181 static void finish_wait(MPI_Request* request, MPI_Status* status) {
182   if(status != MPI_STATUS_IGNORE) {
183     status->MPI_SOURCE = (*request)->src;
184     status->MPI_TAG = (*request)->tag;
185     status->MPI_ERROR = MPI_SUCCESS;
186   }
187   print_request("finishing wait", *request);
188   if((*request)->complete == 1) {
189     SIMIX_rdv_destroy((*request)->rdv);
190   } else {
191     (*request)->match->complete = 1;
192     (*request)->match->match = MPI_REQUEST_NULL;
193   }
194   if(((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
195     smpi_mpi_request_free(request);
196   } else {
197     (*request)->rdv = NULL;
198     (*request)->pair = NULL;
199   }
200 }
201
202 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
203   int flag = (*request)->complete;
204
205   if(flag) {
206     SIMIX_communication_destroy((*request)->pair);
207     finish_wait(request, status);
208   }
209   return flag;
210 }
211
212 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
213   int i, flag;
214
215   *index = MPI_UNDEFINED;
216   flag = 0;
217   for(i = 0; i < count; i++) {
218     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
219       SIMIX_communication_destroy(requests[i]->pair);
220       finish_wait(&requests[i], status);
221       *index = i;
222       flag = 1;
223       break;
224     }
225   }
226   return flag;
227 }
228
229 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
230   print_request("wait", *request);
231   // data is null if receiver waits before sender enters the rdv
232   if((*request)->complete) {
233     SIMIX_communication_destroy((*request)->pair);
234   } else {
235     SIMIX_network_wait((*request)->pair, -1.0);
236   }
237   finish_wait(request, status);
238 }
239
240 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
241   xbt_dynar_t comms;
242   int i, size, index;
243   int* map;
244
245   index = MPI_UNDEFINED;
246   if(count > 0) {
247     // First check for already completed requests
248     for(i = 0; i < count; i++) {
249       if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
250         index = i;
251         SIMIX_communication_destroy(requests[index]->pair); // always succeeds (but cleans the simix layer)
252         break;
253       }
254     }
255     if(index == MPI_UNDEFINED) {
256       // Otherwise, wait for a request to complete
257       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
258       map = xbt_new(int, count);
259       size = 0;
260       DEBUG0("Wait for one of");
261       for(i = 0; i < count; i++) {
262         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
263           print_request("   ", requests[i]);
264           xbt_dynar_push(comms, &requests[i]->pair);
265           map[size] = i;
266           size++;
267         }
268       }
269       if(size > 0) {
270         index = SIMIX_network_waitany(comms);
271         index = map[index];
272       }
273       xbt_free(map);
274       xbt_dynar_free(&comms);
275     }
276     if(index != MPI_UNDEFINED) {
277       finish_wait(&requests[index], status);
278     }
279   }
280   return index;
281 }
282
283 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
284   int index;
285   MPI_Status stat;
286
287   while(count > 0) {
288     index = smpi_mpi_waitany(count, requests, &stat);
289     if(index == MPI_UNDEFINED) {
290       break;
291     }
292     if(status != MPI_STATUS_IGNORE) {
293       memcpy(&status[index], &stat, sizeof(stat));
294     }
295     // FIXME: check this -v
296     // Move the last request to the found position
297     requests[index] = requests[count - 1];
298     requests[count - 1] = MPI_REQUEST_NULL;
299     count--;
300   }
301 }
302
303 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
304   int i, count;
305
306   count = 0;
307   for(i = 0; i < incount; i++) {
308     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
309       SIMIX_communication_destroy(requests[i]->pair);
310       finish_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
311       indices[count] = i;
312       count++;
313     }
314   }
315   return count;
316 }
317
318 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
319   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
320   nary_tree_bcast(buf, count, datatype, root, comm, 4);
321 }
322
323 void smpi_mpi_barrier(MPI_Comm comm) {
324   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
325   nary_tree_barrier(comm, 4);
326 }
327
328 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
329   int system_tag = 666;
330   int rank, size, src, index, sendsize, recvsize;
331   MPI_Request* requests;
332
333   rank = smpi_comm_rank(comm);
334   size = smpi_comm_size(comm);
335   if(rank != root) {
336     // Send buffer to root
337     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
338   } else {
339     sendsize = smpi_datatype_size(sendtype);
340     recvsize = smpi_datatype_size(recvtype);
341     // Local copy from root
342     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
343     // Receive buffers from senders
344     requests = xbt_new(MPI_Request, size - 1);
345     index = 0;
346     for(src = 0; src < size; src++) {
347       if(src != root) {
348         requests[index] = smpi_irecv_init(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
349         index++;
350       }
351     }
352     // Wait for completion of irecv's.
353     smpi_mpi_startall(size - 1, requests);
354     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
355     xbt_free(requests);
356   }
357 }
358
359 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
360   int system_tag = 666;
361   int rank, size, src, index, sendsize;
362   MPI_Request* requests;
363
364   rank = smpi_comm_rank(comm);
365   size = smpi_comm_size(comm);
366   if(rank != root) {
367     // Send buffer to root
368     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
369   } else {
370     sendsize = smpi_datatype_size(sendtype);
371     // Local copy from root
372     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
373     // Receive buffers from senders
374     requests = xbt_new(MPI_Request, size - 1);
375     index = 0;
376     for(src = 0; src < size; src++) {
377       if(src != root) {
378         requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
379         index++;
380       }
381     }
382     // Wait for completion of irecv's.
383     smpi_mpi_startall(size - 1, requests);
384     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
385     xbt_free(requests);
386   }
387 }
388
389 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
390   int system_tag = 666;
391   int rank, size, other, index, sendsize, recvsize;
392   MPI_Request* requests;
393
394   rank = smpi_comm_rank(comm);
395   size = smpi_comm_size(comm);
396   sendsize = smpi_datatype_size(sendtype);
397   recvsize = smpi_datatype_size(recvtype);
398   // Local copy from self
399   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
400   // Send/Recv buffers to/from others;
401   requests = xbt_new(MPI_Request, 2 * (size - 1));
402   index = 0;
403   for(other = 0; other < size; other++) {
404     if(other != rank) {
405       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
406       index++;
407       requests[index] = smpi_irecv_init(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
408       index++;
409     }
410   }
411   // Wait for completion of all comms.
412   smpi_mpi_startall(2 * (size - 1), requests);
413   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
414   xbt_free(requests);
415 }
416
417 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
418   int system_tag = 666;
419   int rank, size, other, index, sendsize, recvsize;
420   MPI_Request* requests;
421
422   rank = smpi_comm_rank(comm);
423   size = smpi_comm_size(comm);
424   sendsize = smpi_datatype_size(sendtype);
425   recvsize = smpi_datatype_size(recvtype);
426   // Local copy from self
427   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
428   // Send buffers to others;
429   requests = xbt_new(MPI_Request, 2 * (size - 1));
430   index = 0;
431   for(other = 0; other < size; other++) {
432     if(other != rank) {
433       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
434       index++;
435       requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
436       index++;
437     }
438   }
439   // Wait for completion of all comms.
440   smpi_mpi_startall(2 * (size - 1), requests);
441   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
442   xbt_free(requests);
443 }
444
445 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
446   int system_tag = 666;
447   int rank, size, dst, index, sendsize, recvsize;
448   MPI_Request* requests;
449
450   rank = smpi_comm_rank(comm);
451   size = smpi_comm_size(comm);
452   if(rank != root) {
453     // Recv buffer from root
454     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
455   } else {
456     sendsize = smpi_datatype_size(sendtype);
457     recvsize = smpi_datatype_size(recvtype);
458     // Local copy from root
459     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
460     // Send buffers to receivers
461     requests = xbt_new(MPI_Request, size - 1);
462     index = 0;
463     for(dst = 0; dst < size; dst++) {
464       if(dst != root) {
465         requests[index] = smpi_isend_init(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
466         index++;
467       }
468     }
469     // Wait for completion of isend's.
470     smpi_mpi_startall(size - 1, requests);
471     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
472     xbt_free(requests);
473   }
474 }
475
476 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
477   int system_tag = 666;
478   int rank, size, dst, index, sendsize, recvsize;
479   MPI_Request* requests;
480
481   rank = smpi_comm_rank(comm);
482   size = smpi_comm_size(comm);
483   if(rank != root) {
484     // Recv buffer from root
485     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
486   } else {
487     sendsize = smpi_datatype_size(sendtype);
488     recvsize = smpi_datatype_size(recvtype);
489     // Local copy from root
490     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
491     // Send buffers to receivers
492     requests = xbt_new(MPI_Request, size - 1);
493     index = 0;
494     for(dst = 0; dst < size; dst++) {
495       if(dst != root) {
496         requests[index] = smpi_isend_init(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
497         index++;
498       }
499     }
500     // Wait for completion of isend's.
501     smpi_mpi_startall(size - 1, requests);
502     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
503     xbt_free(requests);
504   }
505 }
506
507 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
508   int system_tag = 666;
509   int rank, size, src, index, datasize;
510   MPI_Request* requests;
511   void** tmpbufs;
512
513   rank = smpi_comm_rank(comm);
514   size = smpi_comm_size(comm);
515   if(rank != root) {
516     // Send buffer to root
517     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
518   } else {
519     datasize = smpi_datatype_size(datatype);
520     // Local copy from root
521     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
522     // Receive buffers from senders
523     //TODO: make a MPI_barrier here ?
524     requests = xbt_new(MPI_Request, size - 1);
525     tmpbufs = xbt_new(void*, size - 1);
526     index = 0;
527     for(src = 0; src < size; src++) {
528       if(src != root) {
529         tmpbufs[index] = xbt_malloc(count * datasize);
530         requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
531         index++;
532       }
533     }
534     // Wait for completion of irecv's.
535     smpi_mpi_startall(size - 1, requests);
536     for(src = 0; src < size - 1; src++) {
537       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
538       if(index == MPI_UNDEFINED) {
539         break;
540       }
541       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
542     }
543     for(index = 0; index < size - 1; index++) {
544       xbt_free(tmpbufs[index]);
545     }
546     xbt_free(tmpbufs);
547     xbt_free(requests);
548   }
549 }
550
551 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
552   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
553   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
554
555 /*
556 FIXME: buggy implementation
557
558   int system_tag = 666;
559   int rank, size, other, index, datasize;
560   MPI_Request* requests;
561   void** tmpbufs;
562
563   rank = smpi_comm_rank(comm);
564   size = smpi_comm_size(comm);
565   datasize = smpi_datatype_size(datatype);
566   // Local copy from self
567   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
568   // Send/Recv buffers to/from others;
569   //TODO: make a MPI_barrier here ?
570   requests = xbt_new(MPI_Request, 2 * (size - 1));
571   tmpbufs = xbt_new(void*, size - 1);
572   index = 0;
573   for(other = 0; other < size; other++) {
574     if(other != rank) {
575       tmpbufs[index / 2] = xbt_malloc(count * datasize);
576       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
577       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
578       index += 2;
579     }
580   }
581   // Wait for completion of all comms.
582   for(other = 0; other < 2 * (size - 1); other++) {
583     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
584     if(index == MPI_UNDEFINED) {
585       break;
586     }
587     if((index & 1) == 1) {
588       // Request is odd: it's a irecv
589       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
590     }
591   }
592   for(index = 0; index < size - 1; index++) {
593     xbt_free(tmpbufs[index]);
594   }
595   xbt_free(tmpbufs);
596   xbt_free(requests);
597 */
598 }
599
600 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
601   int system_tag = 666;
602   int rank, size, other, index, datasize;
603   int total;
604   MPI_Request* requests;
605   void** tmpbufs;
606
607   rank = smpi_comm_rank(comm);
608   size = smpi_comm_size(comm);
609   datasize = smpi_datatype_size(datatype);
610   // Local copy from self
611   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
612   // Send/Recv buffers to/from others;
613   total = rank + (size - (rank + 1));
614   requests = xbt_new(MPI_Request, total);
615   tmpbufs = xbt_new(void*, rank);
616   index = 0;
617   for(other = 0; other < rank; other++) {
618     tmpbufs[index] = xbt_malloc(count * datasize);
619     requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
620     index++;
621   }
622   for(other = rank + 1; other < size; other++) {
623     requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
624     index++;
625   }
626   // Wait for completion of all comms.
627   smpi_mpi_startall(size - 1, requests);
628   for(other = 0; other < total; other++) {
629     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
630     if(index == MPI_UNDEFINED) {
631       break;
632     }
633     if(index < rank) {
634       // #Request is below rank: it's a irecv
635       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
636     }
637   }
638   for(index = 0; index < size - 1; index++) {
639     xbt_free(tmpbufs[index]);
640   }
641   xbt_free(tmpbufs);
642   xbt_free(requests);
643 }