Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fc093b5417c6b0c4098e03ee55068ebe54a74bf4
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12                                 "Logging specific to SMPI (base)");
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
21 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22
23 static int match_recv(void* a, void* b) {
24    MPI_Request ref = (MPI_Request)a;
25    MPI_Request req = (MPI_Request)b;
26
27    xbt_assert(ref, "Cannot match recv against null reference");
28    xbt_assert(req, "Cannot match recv against null request");
29    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
30           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
31 }
32
33 static int match_send(void* a, void* b) {
34    MPI_Request ref = (MPI_Request)a;
35    MPI_Request req = (MPI_Request)b;
36
37    xbt_assert(ref, "Cannot match send against null reference");
38    xbt_assert(req, "Cannot match send against null request");
39    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
40           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
41 }
42
43 static MPI_Request build_request(void *buf, int count,
44                                  MPI_Datatype datatype, int src, int dst,
45                                  int tag, MPI_Comm comm, unsigned flags)
46 {
47   MPI_Request request;
48
49   request = xbt_new(s_smpi_mpi_request_t, 1);
50   request->buf = buf;
51   // FIXME: this will have to be changed to support non-contiguous datatypes
52   request->size = smpi_datatype_size(datatype) * count;
53   request->src = src;
54   request->dst = dst;
55   request->tag = tag;
56   request->comm = comm;
57   request->action = NULL;
58   request->flags = flags;
59 #ifdef HAVE_TRACING
60   request->send = 0;
61   request->recv = 0;
62 #endif
63   return request;
64 }
65
66 /* MPI Low level calls */
67 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
68                                int dst, int tag, MPI_Comm comm)
69 {
70   MPI_Request request =
71       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
72                     comm, PERSISTENT | SEND);
73
74   return request;
75 }
76
77 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
78                                int src, int tag, MPI_Comm comm)
79 {
80   MPI_Request request =
81       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
82                     comm, PERSISTENT | RECV);
83
84   return request;
85 }
86
87 void smpi_mpi_start(MPI_Request request)
88 {
89   smx_rdv_t mailbox;
90
91   xbt_assert(!request->action,
92               "Cannot (re)start a non-finished communication");
93   if(request->flags & RECV) {
94     print_request("New recv", request);
95     mailbox = smpi_process_mailbox();
96     // FIXME: SIMIX does not yet support non-contiguous datatypes
97     request->action = SIMIX_req_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
98   } else {
99     print_request("New send", request);
100     mailbox = smpi_process_remote_mailbox(
101                           smpi_group_index(smpi_comm_group(request->comm), request->dst));
102     // FIXME: SIMIX does not yet support non-contiguous datatypes
103
104     request->action = 
105                 SIMIX_req_comm_isend(mailbox, request->size, -1.0,
106                                     request->buf, request->size, &match_send, request,  
107                                     // detach if msg size < eager/rdv switch limit
108                                     request->size < 64*1024 ? 1:0);
109     // if detached  request->action == NULL
110 #ifdef HAVE_TRACING
111     SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
112 #endif
113   }
114 }
115
116 void smpi_mpi_startall(int count, MPI_Request * requests)
117 {
118           int i;
119
120   for(i = 0; i < count; i++) {
121     smpi_mpi_start(requests[i]);
122   }
123 }
124
125 void smpi_mpi_request_free(MPI_Request * request)
126 {
127   xbt_free(*request);
128   *request = MPI_REQUEST_NULL;
129 }
130
131 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
132                             int dst, int tag, MPI_Comm comm)
133 {
134   MPI_Request request =
135       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
136                     comm, NON_PERSISTENT | SEND);
137
138   return request;
139 }
140
141 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
142                            int dst, int tag, MPI_Comm comm)
143 {
144   MPI_Request request =
145       smpi_isend_init(buf, count, datatype, dst, tag, comm);
146
147   smpi_mpi_start(request);
148   return request;
149 }
150
151 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
152                             int src, int tag, MPI_Comm comm)
153 {
154   MPI_Request request =
155       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
156                     comm, NON_PERSISTENT | RECV);
157
158   return request;
159 }
160
161 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
162                            int src, int tag, MPI_Comm comm)
163 {
164   MPI_Request request =
165       smpi_irecv_init(buf, count, datatype, src, tag, comm);
166
167   smpi_mpi_start(request);
168   return request;
169 }
170
171 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
172                    int tag, MPI_Comm comm, MPI_Status * status)
173 {
174   MPI_Request request;
175
176   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
177   smpi_mpi_wait(&request, status);
178 }
179
180 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
181                    int tag, MPI_Comm comm)
182 {
183   MPI_Request request;
184
185   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
186   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
187 }
188
189 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
190                        int dst, int sendtag, void *recvbuf, int recvcount,
191                        MPI_Datatype recvtype, int src, int recvtag,
192                        MPI_Comm comm, MPI_Status * status)
193 {
194   MPI_Request requests[2];
195   MPI_Status stats[2];
196
197   requests[0] =
198       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
199   requests[1] =
200       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
201   smpi_mpi_startall(2, requests);
202   smpi_mpi_waitall(2, requests, stats);
203   if(status != MPI_STATUS_IGNORE) {
204     // Copy receive status
205     memcpy(status, &stats[1], sizeof(MPI_Status));
206   }
207 }
208
209 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
210 {
211   return status->count / smpi_datatype_size(datatype);
212 }
213
214 static void finish_wait(MPI_Request * request, MPI_Status * status)
215 {
216   MPI_Request req = *request;
217
218   if(status != MPI_STATUS_IGNORE) {
219     status->MPI_SOURCE = req->src;
220     status->MPI_TAG = req->tag;
221     status->MPI_ERROR = MPI_SUCCESS;
222     // FIXME: really this should just contain the count of receive-type blocks,
223     // right?
224     status->count = req->size;
225   }
226   print_request("Finishing", req);
227   if(req->flags & NON_PERSISTENT) {
228     smpi_mpi_request_free(request);
229   } else {
230     req->action = NULL;
231   }
232 }
233
234 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
235 int flag;
236
237    if ((*request)->action == NULL)
238         flag = 1;
239    else 
240     flag = SIMIX_req_comm_test((*request)->action);
241    if(flag) {
242                     smpi_mpi_wait(request, status);
243           }
244           return flag;
245 }
246
247 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
248                      MPI_Status * status)
249 {
250   xbt_dynar_t comms;
251   int i, flag, size;
252   int* map;
253
254   *index = MPI_UNDEFINED;
255   flag = 0;
256   if(count > 0) {
257     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
258     map = xbt_new(int, count);
259     size = 0;
260     for(i = 0; i < count; i++) {
261       if(requests[i]->action) {
262          xbt_dynar_push(comms, &requests[i]->action);
263          map[size] = i;
264          size++;
265       }
266     }
267     if(size > 0) {
268       i = SIMIX_req_comm_testany(comms);
269       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
270       if(i != MPI_UNDEFINED) {
271         *index = map[i];
272         smpi_mpi_wait(&requests[*index], status);
273         flag = 1;
274       }
275     }
276     xbt_free(map);
277     xbt_dynar_free(&comms);
278   }
279   return flag;
280 }
281
282 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
283 {
284   print_request("Waiting", *request);
285   if ((*request)->action != NULL ) 
286          SIMIX_req_comm_wait((*request)->action, -1.0);
287   finish_wait(request, status);
288 }
289
290 int smpi_mpi_waitany(int count, MPI_Request requests[],
291                      MPI_Status * status)
292 {
293   xbt_dynar_t comms;
294   int i, size, index;
295   int *map;
296
297   index = MPI_UNDEFINED;
298   if(count > 0) {
299     // Wait for a request to complete
300     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
301     map = xbt_new(int, count);
302     size = 0;
303     XBT_DEBUG("Wait for one of");
304     for(i = 0; i < count; i++) {
305       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
306         print_request("   ", requests[i]);
307         xbt_dynar_push(comms, &requests[i]->action);
308         map[size] = i;
309         size++;
310       }
311     }
312     if(size > 0) {
313       i = SIMIX_req_comm_waitany(comms);
314       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
315       if (i != MPI_UNDEFINED) {
316         index = map[i];
317         finish_wait(&requests[index], status);
318       }
319     }
320     xbt_free(map);
321     xbt_dynar_free(&comms);
322   }
323   return index;
324 }
325
326 void smpi_mpi_waitall(int count, MPI_Request requests[],
327                       MPI_Status status[])
328 {
329   int index, c;
330   MPI_Status stat;
331   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
332
333   for(c = 0; c < count; c++) {
334     if(MC_IS_ENABLED) {
335       smpi_mpi_wait(&requests[c], pstat);
336       index = c;
337     } else {
338       index = smpi_mpi_waitany(count, requests, pstat);
339       if(index == MPI_UNDEFINED) {
340         break;
341       }
342     }
343     if(status != MPI_STATUS_IGNORE) {
344       memcpy(&status[index], pstat, sizeof(*pstat));
345     }
346   }
347 }
348
349 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
350                       MPI_Status status[])
351 {
352   int i, count, index;
353
354   count = 0;
355   for(i = 0; i < incount; i++) {
356      if(smpi_mpi_testany(incount, requests, &index, status)) {
357        indices[count] = index;
358        count++;
359      }
360   }
361   return count;
362 }
363
364 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
365                     MPI_Comm comm)
366 {
367   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
368   nary_tree_bcast(buf, count, datatype, root, comm, 4);
369 }
370
371 void smpi_mpi_barrier(MPI_Comm comm)
372 {
373   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
374   nary_tree_barrier(comm, 4);
375 }
376
377 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
378                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
379                      int root, MPI_Comm comm)
380 {
381   int system_tag = 666;
382   int rank, size, src, index;
383   MPI_Aint lb = 0, recvext = 0;
384   MPI_Request *requests;
385
386   rank = smpi_comm_rank(comm);
387   size = smpi_comm_size(comm);
388   if(rank != root) {
389     // Send buffer to root
390     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
391   } else {
392     // FIXME: check for errors
393     smpi_datatype_extent(recvtype, &lb, &recvext);
394     // Local copy from root
395     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
396         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
397     // Receive buffers from senders
398     requests = xbt_new(MPI_Request, size - 1);
399     index = 0;
400     for(src = 0; src < size; src++) {
401       if(src != root) {
402         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
403                                           recvcount, recvtype, 
404                                           src, system_tag, comm);
405         index++;
406       }
407     }
408     // Wait for completion of irecv's.
409     smpi_mpi_startall(size - 1, requests);
410     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
411     xbt_free(requests);
412   }
413 }
414
415 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
416                       void *recvbuf, int *recvcounts, int *displs,
417                       MPI_Datatype recvtype, int root, MPI_Comm comm)
418 {
419   int system_tag = 666;
420   int rank, size, src, index;
421   MPI_Aint lb = 0, recvext = 0;
422   MPI_Request *requests;
423
424   rank = smpi_comm_rank(comm);
425   size = smpi_comm_size(comm);
426   if(rank != root) {
427     // Send buffer to root
428     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
429   } else {
430     // FIXME: check for errors
431     smpi_datatype_extent(recvtype, &lb, &recvext);
432     // Local copy from root
433     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
434                        (char *)recvbuf + displs[root] * recvext, 
435                        recvcounts[root], recvtype);
436     // Receive buffers from senders
437     requests = xbt_new(MPI_Request, size - 1);
438     index = 0;
439     for(src = 0; src < size; src++) {
440       if(src != root) {
441         requests[index] =
442             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
443                             recvcounts[src], recvtype, src, system_tag, comm);
444         index++;
445       }
446     }
447     // Wait for completion of irecv's.
448     smpi_mpi_startall(size - 1, requests);
449     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
450     xbt_free(requests);
451   }
452 }
453
454 void smpi_mpi_allgather(void *sendbuf, int sendcount,
455                         MPI_Datatype sendtype, void *recvbuf,
456                         int recvcount, MPI_Datatype recvtype,
457                         MPI_Comm comm)
458 {
459   int system_tag = 666;
460   int rank, size, other, index;
461   MPI_Aint lb = 0, recvext = 0;
462   MPI_Request *requests;
463
464   rank = smpi_comm_rank(comm);
465   size = smpi_comm_size(comm);
466   // FIXME: check for errors
467   smpi_datatype_extent(recvtype, &lb, &recvext);
468   // Local copy from self
469   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
470                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
471                      recvtype);
472   // Send/Recv buffers to/from others;
473   requests = xbt_new(MPI_Request, 2 * (size - 1));
474   index = 0;
475   for(other = 0; other < size; other++) {
476     if(other != rank) {
477       requests[index] =
478           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
479                           comm);
480       index++;
481       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
482                                         recvcount, recvtype, other, 
483                                         system_tag, comm);
484       index++;
485     }
486   }
487   // Wait for completion of all comms.
488   smpi_mpi_startall(2 * (size - 1), requests);
489   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
490   xbt_free(requests);
491 }
492
493 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
494                          MPI_Datatype sendtype, void *recvbuf,
495                          int *recvcounts, int *displs,
496                          MPI_Datatype recvtype, MPI_Comm comm)
497 {
498   int system_tag = 666;
499   int rank, size, other, index;
500   MPI_Aint lb = 0, recvext = 0;
501   MPI_Request *requests;
502
503   rank = smpi_comm_rank(comm);
504   size = smpi_comm_size(comm);
505   // FIXME: check for errors
506   smpi_datatype_extent(recvtype, &lb, &recvext);
507   // Local copy from self
508   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
509                      (char *)recvbuf + displs[rank] * recvext, 
510                      recvcounts[rank], recvtype);
511   // Send buffers to others;
512   requests = xbt_new(MPI_Request, 2 * (size - 1));
513   index = 0;
514   for(other = 0; other < size; other++) {
515     if(other != rank) {
516       requests[index] =
517           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
518                           comm);
519       index++;
520       requests[index] =
521           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
522                           recvtype, other, system_tag, comm);
523       index++;
524     }
525   }
526   // Wait for completion of all comms.
527   smpi_mpi_startall(2 * (size - 1), requests);
528   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
529   xbt_free(requests);
530 }
531
532 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
533                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
534                       int root, MPI_Comm comm)
535 {
536   int system_tag = 666;
537   int rank, size, dst, index;
538   MPI_Aint lb = 0, sendext = 0;
539   MPI_Request *requests;
540
541   rank = smpi_comm_rank(comm);
542   size = smpi_comm_size(comm);
543   if(rank != root) {
544     // Recv buffer from root
545     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
546                   MPI_STATUS_IGNORE);
547   } else {
548     // FIXME: check for errors
549     smpi_datatype_extent(sendtype, &lb, &sendext);
550     // Local copy from root
551     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
552       sendcount, sendtype, recvbuf, recvcount, recvtype);
553     // Send buffers to receivers
554     requests = xbt_new(MPI_Request, size - 1);
555     index = 0;
556     for(dst = 0; dst < size; dst++) {
557       if(dst != root) {
558         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
559                                           sendcount, sendtype, dst,
560                                           system_tag, comm);
561         index++;
562       }
563     }
564     // Wait for completion of isend's.
565     smpi_mpi_startall(size - 1, requests);
566     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
567     xbt_free(requests);
568   }
569 }
570
571 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
572                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
573                        MPI_Datatype recvtype, int root, MPI_Comm comm)
574 {
575   int system_tag = 666;
576   int rank, size, dst, index;
577   MPI_Aint lb = 0, sendext = 0;
578   MPI_Request *requests;
579
580   rank = smpi_comm_rank(comm);
581   size = smpi_comm_size(comm);
582   if(rank != root) {
583     // Recv buffer from root
584     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
585                   MPI_STATUS_IGNORE);
586   } else {
587     // FIXME: check for errors
588     smpi_datatype_extent(sendtype, &lb, &sendext);
589     // Local copy from root
590     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
591                        sendtype, recvbuf, recvcount, recvtype);
592     // Send buffers to receivers
593     requests = xbt_new(MPI_Request, size - 1);
594     index = 0;
595     for(dst = 0; dst < size; dst++) {
596       if(dst != root) {
597         requests[index] =
598             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
599                             sendtype, dst, system_tag, comm);
600         index++;
601       }
602     }
603     // Wait for completion of isend's.
604     smpi_mpi_startall(size - 1, requests);
605     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
606     xbt_free(requests);
607   }
608 }
609
610 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
611                      MPI_Datatype datatype, MPI_Op op, int root,
612                      MPI_Comm comm)
613 {
614   int system_tag = 666;
615   int rank, size, src, index;
616   MPI_Aint lb = 0, dataext = 0;
617   MPI_Request *requests;
618   void **tmpbufs;
619
620   rank = smpi_comm_rank(comm);
621   size = smpi_comm_size(comm);
622   if(rank != root) {
623     // Send buffer to root
624     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
625   } else {
626     // FIXME: check for errors
627     smpi_datatype_extent(datatype, &lb, &dataext);
628     // Local copy from root
629     smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
630     // Receive buffers from senders
631     //TODO: make a MPI_barrier here ?
632     requests = xbt_new(MPI_Request, size - 1);
633     tmpbufs = xbt_new(void *, size - 1);
634     index = 0;
635     for(src = 0; src < size; src++) {
636       if(src != root) {
637         // FIXME: possibly overkill we we have contiguous/noncontiguous data
638         //  mapping...
639         tmpbufs[index] = xbt_malloc(count * dataext);
640         requests[index] =
641             smpi_irecv_init(tmpbufs[index], count, datatype, src,
642                             system_tag, comm);
643         index++;
644       }
645     }
646     // Wait for completion of irecv's.
647     smpi_mpi_startall(size - 1, requests);
648     for(src = 0; src < size - 1; src++) {
649       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
650       if(index == MPI_UNDEFINED) {
651         break;
652       }
653       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
654     }
655     for(index = 0; index < size - 1; index++) {
656       xbt_free(tmpbufs[index]);
657     }
658     xbt_free(tmpbufs);
659     xbt_free(requests);
660   }
661 }
662
663 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
664                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
665 {
666   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
667   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
668 }
669
670 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
671                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
672 {
673   int system_tag = 666;
674   int rank, size, other, index;
675   MPI_Aint lb = 0, dataext = 0;
676   MPI_Request *requests;
677   void **tmpbufs;
678
679   rank = smpi_comm_rank(comm);
680   size = smpi_comm_size(comm);
681
682   // FIXME: check for errors
683   smpi_datatype_extent(datatype, &lb, &dataext);
684
685   // Local copy from self
686   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
687
688   // Send/Recv buffers to/from others;
689   requests = xbt_new(MPI_Request, size - 1);
690   tmpbufs = xbt_new(void *, rank);
691   index = 0;
692   for(other = 0; other < rank; other++) {
693     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
694     // mapping...
695     tmpbufs[index] = xbt_malloc(count * dataext);
696     requests[index] =
697         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
698                         comm);
699     index++;
700   }
701   for(other = rank + 1; other < size; other++) {
702     requests[index] =
703         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
704     index++;
705   }
706   // Wait for completion of all comms.
707   smpi_mpi_startall(size - 1, requests);
708   for(other = 0; other < size - 1; other++) {
709     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
710     if(index == MPI_UNDEFINED) {
711       break;
712     }
713     if(index < rank) {
714       // #Request is below rank: it's a irecv
715       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
716     }
717   }
718   for(index = 0; index < rank; index++) {
719     xbt_free(tmpbufs[index]);
720   }
721   xbt_free(tmpbufs);
722   xbt_free(requests);
723 }