Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Enhance the communication filtering mechanism
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12                                 "Logging specific to SMPI (base)");
13
14 static int match_recv(void* a, void* b, smx_action_t ignored) {
15    MPI_Request ref = (MPI_Request)a;
16    MPI_Request req = (MPI_Request)b;
17
18    xbt_assert(ref, "Cannot match recv against null reference");
19    xbt_assert(req, "Cannot match recv against null request");
20    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
21           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
22 }
23
24 static int match_send(void* a, void* b,smx_action_t ignored) {
25    MPI_Request ref = (MPI_Request)a;
26    MPI_Request req = (MPI_Request)b;
27
28    xbt_assert(ref, "Cannot match send against null reference");
29    xbt_assert(req, "Cannot match send against null request");
30    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
31           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
32 }
33
34 static MPI_Request build_request(void *buf, int count,
35                                  MPI_Datatype datatype, int src, int dst,
36                                  int tag, MPI_Comm comm, unsigned flags)
37 {
38   MPI_Request request;
39
40   request = xbt_new(s_smpi_mpi_request_t, 1);
41   request->buf = buf;
42   // FIXME: this will have to be changed to support non-contiguous datatypes
43   request->size = smpi_datatype_size(datatype) * count;
44   request->src = src;
45   request->dst = dst;
46   request->tag = tag;
47   request->comm = comm;
48   request->action = NULL;
49   request->flags = flags;
50 #ifdef HAVE_TRACING
51   request->send = 0;
52   request->recv = 0;
53 #endif
54   return request;
55 }
56
57 static void smpi_mpi_request_free_voidp(void* request)
58 {
59   MPI_Request req = request;
60   smpi_mpi_request_free(&req);
61 }
62
63 /* MPI Low level calls */
64 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
65                                int dst, int tag, MPI_Comm comm)
66 {
67   MPI_Request request =
68       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
69                     comm, PERSISTENT | SEND);
70
71   return request;
72 }
73
74 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
75                                int src, int tag, MPI_Comm comm)
76 {
77   MPI_Request request =
78       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
79                     comm, PERSISTENT | RECV);
80
81   return request;
82 }
83
84 void smpi_mpi_start(MPI_Request request)
85 {
86   smx_rdv_t mailbox;
87   int detached = 0;
88
89   xbt_assert(!request->action,
90               "Cannot (re)start a non-finished communication");
91   if(request->flags & RECV) {
92     print_request("New recv", request);
93     mailbox = smpi_process_mailbox();
94     // FIXME: SIMIX does not yet support non-contiguous datatypes
95     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
96   } else {
97     print_request("New send", request);
98     mailbox = smpi_process_remote_mailbox(
99                           smpi_group_index(smpi_comm_group(request->comm), request->dst));
100     // FIXME: SIMIX does not yet support non-contiguous datatypes
101
102     if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
103         void *oldbuf = request->buf;
104         detached = 1;
105         request->buf = malloc(request->size);
106         memcpy(request->buf,oldbuf,request->size);
107         XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
108     } else {
109         XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
110     }
111     request->action = 
112                 simcall_comm_isend(mailbox, request->size, -1.0,
113                                     request->buf, request->size,
114                                     &match_send,
115                                     &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
116                                     request,
117                                     // detach if msg size < eager/rdv switch limit
118                                     detached);
119
120 #ifdef HAVE_TRACING
121     /* FIXME: detached sends are not traceable (request->action == NULL) */
122     if (request->action)
123       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
124 #endif
125   }
126 }
127
128 void smpi_mpi_startall(int count, MPI_Request * requests)
129 {
130           int i;
131
132   for(i = 0; i < count; i++) {
133     smpi_mpi_start(requests[i]);
134   }
135 }
136
137 void smpi_mpi_request_free(MPI_Request * request)
138 {
139   xbt_free(*request);
140   *request = MPI_REQUEST_NULL;
141 }
142
143 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
144                             int dst, int tag, MPI_Comm comm)
145 {
146   MPI_Request request =
147       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
148                     comm, NON_PERSISTENT | SEND);
149
150   return request;
151 }
152
153 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
154                            int dst, int tag, MPI_Comm comm)
155 {
156   MPI_Request request =
157       smpi_isend_init(buf, count, datatype, dst, tag, comm);
158
159   smpi_mpi_start(request);
160   return request;
161 }
162
163 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
164                             int src, int tag, MPI_Comm comm)
165 {
166   MPI_Request request =
167       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
168                     comm, NON_PERSISTENT | RECV);
169
170   return request;
171 }
172
173 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
174                            int src, int tag, MPI_Comm comm)
175 {
176   MPI_Request request =
177       smpi_irecv_init(buf, count, datatype, src, tag, comm);
178
179   smpi_mpi_start(request);
180   return request;
181 }
182
183 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
184                    int tag, MPI_Comm comm, MPI_Status * status)
185 {
186   MPI_Request request;
187
188   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
189   smpi_mpi_wait(&request, status);
190 }
191
192 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
193                    int tag, MPI_Comm comm)
194 {
195   MPI_Request request;
196
197   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
198   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
199 }
200
201 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
202                        int dst, int sendtag, void *recvbuf, int recvcount,
203                        MPI_Datatype recvtype, int src, int recvtag,
204                        MPI_Comm comm, MPI_Status * status)
205 {
206   MPI_Request requests[2];
207   MPI_Status stats[2];
208
209   requests[0] =
210       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
211   requests[1] =
212       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
213   smpi_mpi_startall(2, requests);
214   smpi_mpi_waitall(2, requests, stats);
215   if(status != MPI_STATUS_IGNORE) {
216     // Copy receive status
217     memcpy(status, &stats[1], sizeof(MPI_Status));
218   }
219 }
220
221 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
222 {
223   return status->count / smpi_datatype_size(datatype);
224 }
225
226 static void finish_wait(MPI_Request * request, MPI_Status * status)
227 {
228   MPI_Request req = *request;
229
230   if(status != MPI_STATUS_IGNORE) {
231     status->MPI_SOURCE = req->src;
232     status->MPI_TAG = req->tag;
233     status->MPI_ERROR = MPI_SUCCESS;
234     // FIXME: really this should just contain the count of receive-type blocks,
235     // right?
236     status->count = req->size;
237   }
238   print_request("Finishing", req);
239   if(req->flags & NON_PERSISTENT) {
240     smpi_mpi_request_free(request);
241   } else {
242     req->action = NULL;
243   }
244 }
245
246 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
247 int flag;
248
249    if ((*request)->action == NULL)
250         flag = 1;
251    else 
252     flag = simcall_comm_test((*request)->action);
253    if(flag) {
254                     smpi_mpi_wait(request, status);
255           }
256           return flag;
257 }
258
259 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
260                      MPI_Status * status)
261 {
262   xbt_dynar_t comms;
263   int i, flag, size;
264   int* map;
265
266   *index = MPI_UNDEFINED;
267   flag = 0;
268   if(count > 0) {
269     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
270     map = xbt_new(int, count);
271     size = 0;
272     for(i = 0; i < count; i++) {
273       if(requests[i]->action) {
274          xbt_dynar_push(comms, &requests[i]->action);
275          map[size] = i;
276          size++;
277       }
278     }
279     if(size > 0) {
280       i = simcall_comm_testany(comms);
281       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
282       if(i != MPI_UNDEFINED) {
283         *index = map[i];
284         smpi_mpi_wait(&requests[*index], status);
285         flag = 1;
286       }
287     }
288     xbt_free(map);
289     xbt_dynar_free(&comms);
290   }
291   return flag;
292 }
293
294 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
295 {
296   print_request("Waiting", *request);
297   if ((*request)->action != NULL) { // this is not a detached send
298     simcall_comm_wait((*request)->action, -1.0);
299     finish_wait(request, status);
300   }
301   // FIXME for a detached send, finish_wait is not called:
302 }
303
304 int smpi_mpi_waitany(int count, MPI_Request requests[],
305                      MPI_Status * status)
306 {
307   xbt_dynar_t comms;
308   int i, size, index;
309   int *map;
310
311   index = MPI_UNDEFINED;
312   if(count > 0) {
313     // Wait for a request to complete
314     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
315     map = xbt_new(int, count);
316     size = 0;
317     XBT_DEBUG("Wait for one of");
318     for(i = 0; i < count; i++) {
319       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
320         print_request("   ", requests[i]);
321         xbt_dynar_push(comms, &requests[i]->action);
322         map[size] = i;
323         size++;
324       }
325     }
326     if(size > 0) {
327       i = simcall_comm_waitany(comms);
328       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
329       if (i != MPI_UNDEFINED) {
330         index = map[i];
331         finish_wait(&requests[index], status);
332       }
333     }
334     xbt_free(map);
335     xbt_dynar_free(&comms);
336   }
337   return index;
338 }
339
340 void smpi_mpi_waitall(int count, MPI_Request requests[],
341                       MPI_Status status[])
342 {
343   int index, c;
344   MPI_Status stat;
345   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
346
347   for(c = 0; c < count; c++) {
348     if(MC_IS_ENABLED) {
349       smpi_mpi_wait(&requests[c], pstat);
350       index = c;
351     } else {
352       index = smpi_mpi_waitany(count, requests, pstat);
353       if(index == MPI_UNDEFINED) {
354         break;
355       }
356     }
357     if(status != MPI_STATUS_IGNORE) {
358       memcpy(&status[index], pstat, sizeof(*pstat));
359     }
360   }
361 }
362
363 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
364                       MPI_Status status[])
365 {
366   int i, count, index;
367
368   count = 0;
369   for(i = 0; i < incount; i++) {
370      if(smpi_mpi_testany(incount, requests, &index, status)) {
371        indices[count] = index;
372        count++;
373      }
374   }
375   return count;
376 }
377
378 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
379                     MPI_Comm comm)
380 {
381   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
382   nary_tree_bcast(buf, count, datatype, root, comm, 4);
383 }
384
385 void smpi_mpi_barrier(MPI_Comm comm)
386 {
387   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
388   nary_tree_barrier(comm, 4);
389 }
390
391 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
392                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
393                      int root, MPI_Comm comm)
394 {
395   int system_tag = 666;
396   int rank, size, src, index;
397   MPI_Aint lb = 0, recvext = 0;
398   MPI_Request *requests;
399
400   rank = smpi_comm_rank(comm);
401   size = smpi_comm_size(comm);
402   if(rank != root) {
403     // Send buffer to root
404     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
405   } else {
406     // FIXME: check for errors
407     smpi_datatype_extent(recvtype, &lb, &recvext);
408     // Local copy from root
409     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
410         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
411     // Receive buffers from senders
412     requests = xbt_new(MPI_Request, size - 1);
413     index = 0;
414     for(src = 0; src < size; src++) {
415       if(src != root) {
416         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
417                                           recvcount, recvtype, 
418                                           src, system_tag, comm);
419         index++;
420       }
421     }
422     // Wait for completion of irecv's.
423     smpi_mpi_startall(size - 1, requests);
424     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
425     xbt_free(requests);
426   }
427 }
428
429 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
430                       void *recvbuf, int *recvcounts, int *displs,
431                       MPI_Datatype recvtype, int root, MPI_Comm comm)
432 {
433   int system_tag = 666;
434   int rank, size, src, index;
435   MPI_Aint lb = 0, recvext = 0;
436   MPI_Request *requests;
437
438   rank = smpi_comm_rank(comm);
439   size = smpi_comm_size(comm);
440   if(rank != root) {
441     // Send buffer to root
442     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
443   } else {
444     // FIXME: check for errors
445     smpi_datatype_extent(recvtype, &lb, &recvext);
446     // Local copy from root
447     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
448                        (char *)recvbuf + displs[root] * recvext, 
449                        recvcounts[root], recvtype);
450     // Receive buffers from senders
451     requests = xbt_new(MPI_Request, size - 1);
452     index = 0;
453     for(src = 0; src < size; src++) {
454       if(src != root) {
455         requests[index] =
456             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
457                             recvcounts[src], recvtype, src, system_tag, comm);
458         index++;
459       }
460     }
461     // Wait for completion of irecv's.
462     smpi_mpi_startall(size - 1, requests);
463     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
464     xbt_free(requests);
465   }
466 }
467
468 void smpi_mpi_allgather(void *sendbuf, int sendcount,
469                         MPI_Datatype sendtype, void *recvbuf,
470                         int recvcount, MPI_Datatype recvtype,
471                         MPI_Comm comm)
472 {
473   int system_tag = 666;
474   int rank, size, other, index;
475   MPI_Aint lb = 0, recvext = 0;
476   MPI_Request *requests;
477
478   rank = smpi_comm_rank(comm);
479   size = smpi_comm_size(comm);
480   // FIXME: check for errors
481   smpi_datatype_extent(recvtype, &lb, &recvext);
482   // Local copy from self
483   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
484                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
485                      recvtype);
486   // Send/Recv buffers to/from others;
487   requests = xbt_new(MPI_Request, 2 * (size - 1));
488   index = 0;
489   for(other = 0; other < size; other++) {
490     if(other != rank) {
491       requests[index] =
492           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
493                           comm);
494       index++;
495       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
496                                         recvcount, recvtype, other, 
497                                         system_tag, comm);
498       index++;
499     }
500   }
501   // Wait for completion of all comms.
502   smpi_mpi_startall(2 * (size - 1), requests);
503   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
504   xbt_free(requests);
505 }
506
507 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
508                          MPI_Datatype sendtype, void *recvbuf,
509                          int *recvcounts, int *displs,
510                          MPI_Datatype recvtype, MPI_Comm comm)
511 {
512   int system_tag = 666;
513   int rank, size, other, index;
514   MPI_Aint lb = 0, recvext = 0;
515   MPI_Request *requests;
516
517   rank = smpi_comm_rank(comm);
518   size = smpi_comm_size(comm);
519   // FIXME: check for errors
520   smpi_datatype_extent(recvtype, &lb, &recvext);
521   // Local copy from self
522   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
523                      (char *)recvbuf + displs[rank] * recvext, 
524                      recvcounts[rank], recvtype);
525   // Send buffers to others;
526   requests = xbt_new(MPI_Request, 2 * (size - 1));
527   index = 0;
528   for(other = 0; other < size; other++) {
529     if(other != rank) {
530       requests[index] =
531           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
532                           comm);
533       index++;
534       requests[index] =
535           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
536                           recvtype, other, system_tag, comm);
537       index++;
538     }
539   }
540   // Wait for completion of all comms.
541   smpi_mpi_startall(2 * (size - 1), requests);
542   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
543   xbt_free(requests);
544 }
545
546 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
547                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
548                       int root, MPI_Comm comm)
549 {
550   int system_tag = 666;
551   int rank, size, dst, index;
552   MPI_Aint lb = 0, sendext = 0;
553   MPI_Request *requests;
554
555   rank = smpi_comm_rank(comm);
556   size = smpi_comm_size(comm);
557   if(rank != root) {
558     // Recv buffer from root
559     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
560                   MPI_STATUS_IGNORE);
561   } else {
562     // FIXME: check for errors
563     smpi_datatype_extent(sendtype, &lb, &sendext);
564     // Local copy from root
565     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
566       sendcount, sendtype, recvbuf, recvcount, recvtype);
567     // Send buffers to receivers
568     requests = xbt_new(MPI_Request, size - 1);
569     index = 0;
570     for(dst = 0; dst < size; dst++) {
571       if(dst != root) {
572         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
573                                           sendcount, sendtype, dst,
574                                           system_tag, comm);
575         index++;
576       }
577     }
578     // Wait for completion of isend's.
579     smpi_mpi_startall(size - 1, requests);
580     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
581     xbt_free(requests);
582   }
583 }
584
585 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
586                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
587                        MPI_Datatype recvtype, int root, MPI_Comm comm)
588 {
589   int system_tag = 666;
590   int rank, size, dst, index;
591   MPI_Aint lb = 0, sendext = 0;
592   MPI_Request *requests;
593
594   rank = smpi_comm_rank(comm);
595   size = smpi_comm_size(comm);
596   if(rank != root) {
597     // Recv buffer from root
598     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
599                   MPI_STATUS_IGNORE);
600   } else {
601     // FIXME: check for errors
602     smpi_datatype_extent(sendtype, &lb, &sendext);
603     // Local copy from root
604     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
605                        sendtype, recvbuf, recvcount, recvtype);
606     // Send buffers to receivers
607     requests = xbt_new(MPI_Request, size - 1);
608     index = 0;
609     for(dst = 0; dst < size; dst++) {
610       if(dst != root) {
611         requests[index] =
612             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
613                             sendtype, dst, system_tag, comm);
614         index++;
615       }
616     }
617     // Wait for completion of isend's.
618     smpi_mpi_startall(size - 1, requests);
619     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
620     xbt_free(requests);
621   }
622 }
623
624 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
625                      MPI_Datatype datatype, MPI_Op op, int root,
626                      MPI_Comm comm)
627 {
628   int system_tag = 666;
629   int rank, size, src, index;
630   MPI_Aint lb = 0, dataext = 0;
631   MPI_Request *requests;
632   void **tmpbufs;
633
634   rank = smpi_comm_rank(comm);
635   size = smpi_comm_size(comm);
636   if(rank != root) {
637     // Send buffer to root
638     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
639   } else {
640     // FIXME: check for errors
641     smpi_datatype_extent(datatype, &lb, &dataext);
642     // Local copy from root
643     smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
644     // Receive buffers from senders
645     //TODO: make a MPI_barrier here ?
646     requests = xbt_new(MPI_Request, size - 1);
647     tmpbufs = xbt_new(void *, size - 1);
648     index = 0;
649     for(src = 0; src < size; src++) {
650       if(src != root) {
651         // FIXME: possibly overkill we we have contiguous/noncontiguous data
652         //  mapping...
653         tmpbufs[index] = xbt_malloc(count * dataext);
654         requests[index] =
655             smpi_irecv_init(tmpbufs[index], count, datatype, src,
656                             system_tag, comm);
657         index++;
658       }
659     }
660     // Wait for completion of irecv's.
661     smpi_mpi_startall(size - 1, requests);
662     for(src = 0; src < size - 1; src++) {
663       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
664       if(index == MPI_UNDEFINED) {
665         break;
666       }
667       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
668     }
669     for(index = 0; index < size - 1; index++) {
670       xbt_free(tmpbufs[index]);
671     }
672     xbt_free(tmpbufs);
673     xbt_free(requests);
674   }
675 }
676
677 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
678                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
679 {
680   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
681   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
682 }
683
684 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
685                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
686 {
687   int system_tag = 666;
688   int rank, size, other, index;
689   MPI_Aint lb = 0, dataext = 0;
690   MPI_Request *requests;
691   void **tmpbufs;
692
693   rank = smpi_comm_rank(comm);
694   size = smpi_comm_size(comm);
695
696   // FIXME: check for errors
697   smpi_datatype_extent(datatype, &lb, &dataext);
698
699   // Local copy from self
700   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
701
702   // Send/Recv buffers to/from others;
703   requests = xbt_new(MPI_Request, size - 1);
704   tmpbufs = xbt_new(void *, rank);
705   index = 0;
706   for(other = 0; other < rank; other++) {
707     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
708     // mapping...
709     tmpbufs[index] = xbt_malloc(count * dataext);
710     requests[index] =
711         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
712                         comm);
713     index++;
714   }
715   for(other = rank + 1; other < size; other++) {
716     requests[index] =
717         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
718     index++;
719   }
720   // Wait for completion of all comms.
721   smpi_mpi_startall(size - 1, requests);
722   for(other = 0; other < size - 1; other++) {
723     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
724     if(index == MPI_UNDEFINED) {
725       break;
726     }
727     if(index < rank) {
728       // #Request is below rank: it's a irecv
729       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
730     }
731   }
732   for(index = 0; index < rank; index++) {
733     xbt_free(tmpbufs[index]);
734   }
735   xbt_free(tmpbufs);
736   xbt_free(requests);
737 }