Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add what's missing to load traces from SMPI.
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
13                                 "Logging specific to SMPI (base)");
14
15 static int match_recv(void* a, void* b, smx_action_t ignored) {
16    MPI_Request ref = (MPI_Request)a;
17    MPI_Request req = (MPI_Request)b;
18
19    xbt_assert(ref, "Cannot match recv against null reference");
20    xbt_assert(req, "Cannot match recv against null request");
21    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
22           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
23 }
24
25 static int match_send(void* a, void* b,smx_action_t ignored) {
26    MPI_Request ref = (MPI_Request)a;
27    MPI_Request req = (MPI_Request)b;
28
29    xbt_assert(ref, "Cannot match send against null reference");
30    xbt_assert(req, "Cannot match send against null request");
31    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
32           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
33 }
34
35 static MPI_Request build_request(void *buf, int count,
36                                  MPI_Datatype datatype, int src, int dst,
37                                  int tag, MPI_Comm comm, unsigned flags)
38 {
39   MPI_Request request;
40
41   request = xbt_new(s_smpi_mpi_request_t, 1);
42   request->buf = buf;
43   // FIXME: this will have to be changed to support non-contiguous datatypes
44   request->size = smpi_datatype_size(datatype) * count;
45   request->src = src;
46   request->dst = dst;
47   request->tag = tag;
48   request->comm = comm;
49   request->action = NULL;
50   request->flags = flags;
51 #ifdef HAVE_TRACING
52   request->send = 0;
53   request->recv = 0;
54 #endif
55   return request;
56 }
57
58 void smpi_action_trace_run(char *path)
59 {
60   char *name;
61   xbt_dynar_t todo;
62   xbt_dict_cursor_t cursor;
63
64   action_fp=NULL;
65   if (path) {
66     action_fp = fopen(path, "r");
67     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
68                 strerror(errno));
69   }
70
71   if (!xbt_dict_is_empty(action_queues)) {
72     XBT_WARN
73         ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
74
75
76     xbt_dict_foreach(action_queues, cursor, name, todo) {
77       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
78     }
79   }
80
81   if (path)
82     fclose(action_fp);
83   xbt_dict_free(&action_queues);
84   action_queues = xbt_dict_new_homogeneous(NULL);
85 }
86
87 static void smpi_mpi_request_free_voidp(void* request)
88 {
89   MPI_Request req = request;
90   smpi_mpi_request_free(&req);
91 }
92
93 /* MPI Low level calls */
94 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
95                                int dst, int tag, MPI_Comm comm)
96 {
97   MPI_Request request =
98       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
99                     comm, PERSISTENT | SEND);
100
101   return request;
102 }
103
104 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
105                                int src, int tag, MPI_Comm comm)
106 {
107   MPI_Request request =
108       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
109                     comm, PERSISTENT | RECV);
110
111   return request;
112 }
113
114 void smpi_mpi_start(MPI_Request request)
115 {
116   smx_rdv_t mailbox;
117   int detached = 0;
118
119   xbt_assert(!request->action,
120               "Cannot (re)start a non-finished communication");
121   if(request->flags & RECV) {
122     print_request("New recv", request);
123     mailbox = smpi_process_mailbox();
124     // FIXME: SIMIX does not yet support non-contiguous datatypes
125     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
126   } else {
127     print_request("New send", request);
128     mailbox = smpi_process_remote_mailbox(
129         smpi_group_index(smpi_comm_group(request->comm), request->dst));
130     // FIXME: SIMIX does not yet support non-contiguous datatypes
131
132     if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
133       void *oldbuf = request->buf;
134       detached = 1;
135       request->buf = malloc(request->size);
136       memcpy(request->buf,oldbuf,request->size);
137       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
138     } else {
139       XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
140     }
141     request->action = 
142     simcall_comm_isend(mailbox, request->size, -1.0,
143             request->buf, request->size,
144             &match_send,
145             &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
146             request,
147             // detach if msg size < eager/rdv switch limit
148             detached);
149
150 #ifdef HAVE_TRACING
151     /* FIXME: detached sends are not traceable (request->action == NULL) */
152     if (request->action)
153       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
154 #endif
155   }
156 }
157
158 void smpi_mpi_startall(int count, MPI_Request * requests)
159 {
160     int i;
161
162   for(i = 0; i < count; i++) {
163     smpi_mpi_start(requests[i]);
164   }
165 }
166
167 void smpi_mpi_request_free(MPI_Request * request)
168 {
169   xbt_free(*request);
170   *request = MPI_REQUEST_NULL;
171 }
172
173 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
174                             int dst, int tag, MPI_Comm comm)
175 {
176   MPI_Request request =
177       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
178                     comm, NON_PERSISTENT | SEND);
179
180   return request;
181 }
182
183 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
184                            int dst, int tag, MPI_Comm comm)
185 {
186   MPI_Request request =
187       smpi_isend_init(buf, count, datatype, dst, tag, comm);
188
189   smpi_mpi_start(request);
190   return request;
191 }
192
193 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
194                             int src, int tag, MPI_Comm comm)
195 {
196   MPI_Request request =
197       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
198                     comm, NON_PERSISTENT | RECV);
199
200   return request;
201 }
202
203 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
204                            int src, int tag, MPI_Comm comm)
205 {
206   MPI_Request request =
207       smpi_irecv_init(buf, count, datatype, src, tag, comm);
208
209   smpi_mpi_start(request);
210   return request;
211 }
212
213 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
214                    int tag, MPI_Comm comm, MPI_Status * status)
215 {
216   MPI_Request request;
217
218   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
219   smpi_mpi_wait(&request, status);
220 }
221
222 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
223                    int tag, MPI_Comm comm)
224 {
225   MPI_Request request;
226
227   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
228   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
229 }
230
231 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
232                        int dst, int sendtag, void *recvbuf, int recvcount,
233                        MPI_Datatype recvtype, int src, int recvtag,
234                        MPI_Comm comm, MPI_Status * status)
235 {
236   MPI_Request requests[2];
237   MPI_Status stats[2];
238
239   requests[0] =
240       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
241   requests[1] =
242       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
243   smpi_mpi_startall(2, requests);
244   smpi_mpi_waitall(2, requests, stats);
245   if(status != MPI_STATUS_IGNORE) {
246     // Copy receive status
247     memcpy(status, &stats[1], sizeof(MPI_Status));
248   }
249 }
250
251 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
252 {
253   return status->count / smpi_datatype_size(datatype);
254 }
255
256 static void finish_wait(MPI_Request * request, MPI_Status * status)
257 {
258   MPI_Request req = *request;
259
260   if(status != MPI_STATUS_IGNORE) {
261     status->MPI_SOURCE = req->src;
262     status->MPI_TAG = req->tag;
263     status->MPI_ERROR = MPI_SUCCESS;
264     // FIXME: really this should just contain the count of receive-type blocks,
265     // right?
266     status->count = req->size;
267   }
268   print_request("Finishing", req);
269   if(req->flags & NON_PERSISTENT) {
270     smpi_mpi_request_free(request);
271   } else {
272     req->action = NULL;
273   }
274 }
275
276 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
277 int flag;
278
279    if ((*request)->action == NULL)
280   flag = 1;
281    else 
282     flag = simcall_comm_test((*request)->action);
283    if(flag) {
284         smpi_mpi_wait(request, status);
285     }
286     return flag;
287 }
288
289 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
290                      MPI_Status * status)
291 {
292   xbt_dynar_t comms;
293   int i, flag, size;
294   int* map;
295
296   *index = MPI_UNDEFINED;
297   flag = 0;
298   if(count > 0) {
299     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
300     map = xbt_new(int, count);
301     size = 0;
302     for(i = 0; i < count; i++) {
303       if(requests[i]->action) {
304          xbt_dynar_push(comms, &requests[i]->action);
305          map[size] = i;
306          size++;
307       }
308     }
309     if(size > 0) {
310       i = simcall_comm_testany(comms);
311       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
312       if(i != MPI_UNDEFINED) {
313         *index = map[i];
314         smpi_mpi_wait(&requests[*index], status);
315         flag = 1;
316       }
317     }
318     xbt_free(map);
319     xbt_dynar_free(&comms);
320   }
321   return flag;
322 }
323
324 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
325 {
326   print_request("Waiting", *request);
327   if ((*request)->action != NULL) { // this is not a detached send
328     simcall_comm_wait((*request)->action, -1.0);
329     finish_wait(request, status);
330   }
331   // FIXME for a detached send, finish_wait is not called:
332 }
333
334 int smpi_mpi_waitany(int count, MPI_Request requests[],
335                      MPI_Status * status)
336 {
337   xbt_dynar_t comms;
338   int i, size, index;
339   int *map;
340
341   index = MPI_UNDEFINED;
342   if(count > 0) {
343     // Wait for a request to complete
344     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
345     map = xbt_new(int, count);
346     size = 0;
347     XBT_DEBUG("Wait for one of");
348     for(i = 0; i < count; i++) {
349       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
350         print_request("   ", requests[i]);
351         xbt_dynar_push(comms, &requests[i]->action);
352         map[size] = i;
353         size++;
354       }
355     }
356     if(size > 0) {
357       i = simcall_comm_waitany(comms);
358       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
359       if (i != MPI_UNDEFINED) {
360         index = map[i];
361         finish_wait(&requests[index], status);
362       }
363     }
364     xbt_free(map);
365     xbt_dynar_free(&comms);
366   }
367   return index;
368 }
369
370 void smpi_mpi_waitall(int count, MPI_Request requests[],
371                       MPI_Status status[])
372 {
373   int index, c;
374   MPI_Status stat;
375   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
376
377   for(c = 0; c < count; c++) {
378     if(MC_IS_ENABLED) {
379       smpi_mpi_wait(&requests[c], pstat);
380       index = c;
381     } else {
382       index = smpi_mpi_waitany(count, requests, pstat);
383       if(index == MPI_UNDEFINED) {
384         break;
385       }
386     }
387     if(status != MPI_STATUS_IGNORE) {
388       memcpy(&status[index], pstat, sizeof(*pstat));
389     }
390   }
391 }
392
393 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
394                       MPI_Status status[])
395 {
396   int i, count, index;
397
398   count = 0;
399   for(i = 0; i < incount; i++) {
400      if(smpi_mpi_testany(incount, requests, &index, status)) {
401        indices[count] = index;
402        count++;
403      }
404   }
405   return count;
406 }
407
408 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
409                     MPI_Comm comm)
410 {
411   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
412   nary_tree_bcast(buf, count, datatype, root, comm, 4);
413 }
414
415 void smpi_mpi_barrier(MPI_Comm comm)
416 {
417   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
418   nary_tree_barrier(comm, 4);
419 }
420
421 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
422                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
423                      int root, MPI_Comm comm)
424 {
425   int system_tag = 666;
426   int rank, size, src, index;
427   MPI_Aint lb = 0, recvext = 0;
428   MPI_Request *requests;
429
430   rank = smpi_comm_rank(comm);
431   size = smpi_comm_size(comm);
432   if(rank != root) {
433     // Send buffer to root
434     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
435   } else {
436     // FIXME: check for errors
437     smpi_datatype_extent(recvtype, &lb, &recvext);
438     // Local copy from root
439     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
440         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
441     // Receive buffers from senders
442     requests = xbt_new(MPI_Request, size - 1);
443     index = 0;
444     for(src = 0; src < size; src++) {
445       if(src != root) {
446         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
447                                           recvcount, recvtype, 
448                                           src, system_tag, comm);
449         index++;
450       }
451     }
452     // Wait for completion of irecv's.
453     smpi_mpi_startall(size - 1, requests);
454     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
455     xbt_free(requests);
456   }
457 }
458
459 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
460                       void *recvbuf, int *recvcounts, int *displs,
461                       MPI_Datatype recvtype, int root, MPI_Comm comm)
462 {
463   int system_tag = 666;
464   int rank, size, src, index;
465   MPI_Aint lb = 0, recvext = 0;
466   MPI_Request *requests;
467
468   rank = smpi_comm_rank(comm);
469   size = smpi_comm_size(comm);
470   if(rank != root) {
471     // Send buffer to root
472     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
473   } else {
474     // FIXME: check for errors
475     smpi_datatype_extent(recvtype, &lb, &recvext);
476     // Local copy from root
477     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
478                        (char *)recvbuf + displs[root] * recvext, 
479                        recvcounts[root], recvtype);
480     // Receive buffers from senders
481     requests = xbt_new(MPI_Request, size - 1);
482     index = 0;
483     for(src = 0; src < size; src++) {
484       if(src != root) {
485         requests[index] =
486             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
487                             recvcounts[src], recvtype, src, system_tag, comm);
488         index++;
489       }
490     }
491     // Wait for completion of irecv's.
492     smpi_mpi_startall(size - 1, requests);
493     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
494     xbt_free(requests);
495   }
496 }
497
498 void smpi_mpi_allgather(void *sendbuf, int sendcount,
499                         MPI_Datatype sendtype, void *recvbuf,
500                         int recvcount, MPI_Datatype recvtype,
501                         MPI_Comm comm)
502 {
503   int system_tag = 666;
504   int rank, size, other, index;
505   MPI_Aint lb = 0, recvext = 0;
506   MPI_Request *requests;
507
508   rank = smpi_comm_rank(comm);
509   size = smpi_comm_size(comm);
510   // FIXME: check for errors
511   smpi_datatype_extent(recvtype, &lb, &recvext);
512   // Local copy from self
513   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
514                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
515                      recvtype);
516   // Send/Recv buffers to/from others;
517   requests = xbt_new(MPI_Request, 2 * (size - 1));
518   index = 0;
519   for(other = 0; other < size; other++) {
520     if(other != rank) {
521       requests[index] =
522           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
523                           comm);
524       index++;
525       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
526                                         recvcount, recvtype, other, 
527                                         system_tag, comm);
528       index++;
529     }
530   }
531   // Wait for completion of all comms.
532   smpi_mpi_startall(2 * (size - 1), requests);
533   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
534   xbt_free(requests);
535 }
536
537 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
538                          MPI_Datatype sendtype, void *recvbuf,
539                          int *recvcounts, int *displs,
540                          MPI_Datatype recvtype, MPI_Comm comm)
541 {
542   int system_tag = 666;
543   int rank, size, other, index;
544   MPI_Aint lb = 0, recvext = 0;
545   MPI_Request *requests;
546
547   rank = smpi_comm_rank(comm);
548   size = smpi_comm_size(comm);
549   // FIXME: check for errors
550   smpi_datatype_extent(recvtype, &lb, &recvext);
551   // Local copy from self
552   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
553                      (char *)recvbuf + displs[rank] * recvext, 
554                      recvcounts[rank], recvtype);
555   // Send buffers to others;
556   requests = xbt_new(MPI_Request, 2 * (size - 1));
557   index = 0;
558   for(other = 0; other < size; other++) {
559     if(other != rank) {
560       requests[index] =
561           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
562                           comm);
563       index++;
564       requests[index] =
565           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
566                           recvtype, other, system_tag, comm);
567       index++;
568     }
569   }
570   // Wait for completion of all comms.
571   smpi_mpi_startall(2 * (size - 1), requests);
572   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
573   xbt_free(requests);
574 }
575
576 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
577                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
578                       int root, MPI_Comm comm)
579 {
580   int system_tag = 666;
581   int rank, size, dst, index;
582   MPI_Aint lb = 0, sendext = 0;
583   MPI_Request *requests;
584
585   rank = smpi_comm_rank(comm);
586   size = smpi_comm_size(comm);
587   if(rank != root) {
588     // Recv buffer from root
589     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
590                   MPI_STATUS_IGNORE);
591   } else {
592     // FIXME: check for errors
593     smpi_datatype_extent(sendtype, &lb, &sendext);
594     // Local copy from root
595     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
596       sendcount, sendtype, recvbuf, recvcount, recvtype);
597     // Send buffers to receivers
598     requests = xbt_new(MPI_Request, size - 1);
599     index = 0;
600     for(dst = 0; dst < size; dst++) {
601       if(dst != root) {
602         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
603                                           sendcount, sendtype, dst,
604                                           system_tag, comm);
605         index++;
606       }
607     }
608     // Wait for completion of isend's.
609     smpi_mpi_startall(size - 1, requests);
610     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
611     xbt_free(requests);
612   }
613 }
614
615 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
616                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
617                        MPI_Datatype recvtype, int root, MPI_Comm comm)
618 {
619   int system_tag = 666;
620   int rank, size, dst, index;
621   MPI_Aint lb = 0, sendext = 0;
622   MPI_Request *requests;
623
624   rank = smpi_comm_rank(comm);
625   size = smpi_comm_size(comm);
626   if(rank != root) {
627     // Recv buffer from root
628     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
629                   MPI_STATUS_IGNORE);
630   } else {
631     // FIXME: check for errors
632     smpi_datatype_extent(sendtype, &lb, &sendext);
633     // Local copy from root
634     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
635                        sendtype, recvbuf, recvcount, recvtype);
636     // Send buffers to receivers
637     requests = xbt_new(MPI_Request, size - 1);
638     index = 0;
639     for(dst = 0; dst < size; dst++) {
640       if(dst != root) {
641         requests[index] =
642             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
643                             sendtype, dst, system_tag, comm);
644         index++;
645       }
646     }
647     // Wait for completion of isend's.
648     smpi_mpi_startall(size - 1, requests);
649     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
650     xbt_free(requests);
651   }
652 }
653
654 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
655                      MPI_Datatype datatype, MPI_Op op, int root,
656                      MPI_Comm comm)
657 {
658   int system_tag = 666;
659   int rank, size, src, index;
660   MPI_Aint lb = 0, dataext = 0;
661   MPI_Request *requests;
662   void **tmpbufs;
663
664   rank = smpi_comm_rank(comm);
665   size = smpi_comm_size(comm);
666   if(rank != root) {
667     // Send buffer to root
668     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
669   } else {
670     // FIXME: check for errors
671     smpi_datatype_extent(datatype, &lb, &dataext);
672     // Local copy from root
673     smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
674     // Receive buffers from senders
675     //TODO: make a MPI_barrier here ?
676     requests = xbt_new(MPI_Request, size - 1);
677     tmpbufs = xbt_new(void *, size - 1);
678     index = 0;
679     for(src = 0; src < size; src++) {
680       if(src != root) {
681         // FIXME: possibly overkill we we have contiguous/noncontiguous data
682         //  mapping...
683         tmpbufs[index] = xbt_malloc(count * dataext);
684         requests[index] =
685             smpi_irecv_init(tmpbufs[index], count, datatype, src,
686                             system_tag, comm);
687         index++;
688       }
689     }
690     // Wait for completion of irecv's.
691     smpi_mpi_startall(size - 1, requests);
692     for(src = 0; src < size - 1; src++) {
693       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
694       if(index == MPI_UNDEFINED) {
695         break;
696       }
697       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
698     }
699     for(index = 0; index < size - 1; index++) {
700       xbt_free(tmpbufs[index]);
701     }
702     xbt_free(tmpbufs);
703     xbt_free(requests);
704   }
705 }
706
707 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
708                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
709 {
710   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
711   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
712 }
713
714 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
715                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
716 {
717   int system_tag = 666;
718   int rank, size, other, index;
719   MPI_Aint lb = 0, dataext = 0;
720   MPI_Request *requests;
721   void **tmpbufs;
722
723   rank = smpi_comm_rank(comm);
724   size = smpi_comm_size(comm);
725
726   // FIXME: check for errors
727   smpi_datatype_extent(datatype, &lb, &dataext);
728
729   // Local copy from self
730   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
731
732   // Send/Recv buffers to/from others;
733   requests = xbt_new(MPI_Request, size - 1);
734   tmpbufs = xbt_new(void *, rank);
735   index = 0;
736   for(other = 0; other < rank; other++) {
737     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
738     // mapping...
739     tmpbufs[index] = xbt_malloc(count * dataext);
740     requests[index] =
741         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
742                         comm);
743     index++;
744   }
745   for(other = rank + 1; other < size; other++) {
746     requests[index] =
747         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
748     index++;
749   }
750   // Wait for completion of all comms.
751   smpi_mpi_startall(size - 1, requests);
752   for(other = 0; other < size - 1; other++) {
753     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
754     if(index == MPI_UNDEFINED) {
755       break;
756     }
757     if(index < rank) {
758       // #Request is below rank: it's a irecv
759       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
760     }
761   }
762   for(index = 0; index < rank; index++) {
763     xbt_free(tmpbufs[index]);
764   }
765   xbt_free(tmpbufs);
766   xbt_free(requests);
767 }