Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[beginning smpi non contignous]
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "surf/surf.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
15                                 "Logging specific to SMPI (base)");
16
17 static int match_recv(void* a, void* b, smx_action_t ignored) {
18    MPI_Request ref = (MPI_Request)a;
19    MPI_Request req = (MPI_Request)b;
20
21    xbt_assert(ref, "Cannot match recv against null reference");
22    xbt_assert(req, "Cannot match recv against null request");
23    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
24           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
25 }
26
27 static int match_send(void* a, void* b,smx_action_t ignored) {
28    MPI_Request ref = (MPI_Request)a;
29    MPI_Request req = (MPI_Request)b;
30
31    xbt_assert(ref, "Cannot match send against null reference");
32    xbt_assert(req, "Cannot match send against null request");
33    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
34           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
35 }
36
37 static MPI_Request build_request(void *buf, int count,
38                                  MPI_Datatype datatype, int src, int dst,
39                                  int tag, MPI_Comm comm, unsigned flags)
40 {
41   MPI_Request request;
42
43   request = xbt_new(s_smpi_mpi_request_t, 1);
44   request->buf = buf;
45   // FIXME: this will have to be changed to support non-contiguous datatypes
46   request->size = smpi_datatype_size(datatype) * count;
47   request->contiguous=smpi_datatype_contiguous(datatype);
48   if(request->contiguous != 1){
49     request->block_stride = smpi_datatype_block_stride(datatype);
50     request->block_length = smpi_datatype_block_length(datatype);
51     request->block_count = smpi_datatype_block_count(datatype)*count;
52   }
53   request->src = src;
54   request->dst = dst;
55   request->tag = tag;
56   request->comm = comm;
57   request->action = NULL;
58   request->flags = flags;
59 #ifdef HAVE_TRACING
60   request->send = 0;
61   request->recv = 0;
62 #endif
63   return request;
64 }
65
66 void smpi_action_trace_run(char *path)
67 {
68   char *name;
69   xbt_dynar_t todo;
70   xbt_dict_cursor_t cursor;
71
72   action_fp=NULL;
73   if (path) {
74     action_fp = fopen(path, "r");
75     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
76                 strerror(errno));
77   }
78
79   if (!xbt_dict_is_empty(action_queues)) {
80     XBT_WARN
81         ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
82
83
84     xbt_dict_foreach(action_queues, cursor, name, todo) {
85       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
86     }
87   }
88
89   if (path)
90     fclose(action_fp);
91   xbt_dict_free(&action_queues);
92   action_queues = xbt_dict_new_homogeneous(NULL);
93 }
94
95 static void smpi_mpi_request_free_voidp(void* request)
96 {
97   MPI_Request req = request;
98   smpi_mpi_request_free(&req);
99 }
100
101 /* MPI Low level calls */
102 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
103                                int dst, int tag, MPI_Comm comm)
104 {
105   MPI_Request request =
106       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
107                     comm, PERSISTENT | SEND);
108
109   return request;
110 }
111
112 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
113                                int src, int tag, MPI_Comm comm)
114 {
115   MPI_Request request =
116       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
117                     comm, PERSISTENT | RECV);
118
119   return request;
120 }
121
122 void smpi_mpi_start(MPI_Request request)
123 {
124   smx_rdv_t mailbox;
125   int detached = 0;
126
127   xbt_assert(!request->action,
128               "Cannot (re)start a non-finished communication");
129   if(request->flags & RECV) {
130     print_request("New recv", request);
131     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
132     mailbox = smpi_process_mailbox_small();
133     else
134     mailbox = smpi_process_mailbox();
135
136     // FIXME: SIMIX does not yet support non-contiguous datatypes
137     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
138   } else {
139     print_request("New send", request);
140
141     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
142       mailbox = smpi_process_remote_mailbox_small(
143             smpi_group_index(smpi_comm_group(request->comm), request->dst));
144     }else{
145       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
146       mailbox = smpi_process_remote_mailbox(
147                   smpi_group_index(smpi_comm_group(request->comm), request->dst));
148     }
149     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
150       void *oldbuf = request->buf;
151       detached = 1;
152       request->buf = malloc(request->size);
153       if (oldbuf)
154         memcpy(request->buf,oldbuf,request->size);
155       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
156     }
157
158       request->action =
159       simcall_comm_isend(mailbox, request->size, -1.0,
160               request->buf, request->size,
161               &match_send,
162               &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
163               request,
164               // detach if msg size < eager/rdv switch limit
165               detached);
166
167   #ifdef HAVE_TRACING
168       /* FIXME: detached sends are not traceable (request->action == NULL) */
169       if (request->action)
170         simcall_set_category(request->action, TRACE_internal_smpi_get_category());
171   #endif
172
173     }
174
175 }
176
177 void smpi_mpi_startall(int count, MPI_Request * requests)
178 {
179     int i;
180
181   for(i = 0; i < count; i++) {
182     smpi_mpi_start(requests[i]);
183   }
184 }
185
186 void smpi_mpi_request_free(MPI_Request * request)
187 {
188   xbt_free(*request);
189   *request = MPI_REQUEST_NULL;
190 }
191
192 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
193                             int dst, int tag, MPI_Comm comm)
194 {
195   MPI_Request request =
196       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
197                     comm, NON_PERSISTENT | SEND);
198
199   return request;
200 }
201
202 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
203                            int dst, int tag, MPI_Comm comm)
204 {
205   MPI_Request request =
206       smpi_isend_init(buf, count, datatype, dst, tag, comm);
207
208   smpi_mpi_start(request);
209   return request;
210 }
211
212 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
213                             int src, int tag, MPI_Comm comm)
214 {
215   MPI_Request request =
216       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
217                     comm, NON_PERSISTENT | RECV);
218
219   return request;
220 }
221
222 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
223                            int src, int tag, MPI_Comm comm)
224 {
225   MPI_Request request =
226       smpi_irecv_init(buf, count, datatype, src, tag, comm);
227
228   smpi_mpi_start(request);
229   return request;
230 }
231
232 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
233                    int tag, MPI_Comm comm, MPI_Status * status)
234 {
235   MPI_Request request;
236
237   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
238   smpi_mpi_wait(&request, status);
239 }
240
241
242
243 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
244                    int tag, MPI_Comm comm)
245 {
246           MPI_Request request;
247
248           request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
249           smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
250 }
251
252 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
253                        int dst, int sendtag, void *recvbuf, int recvcount,
254                        MPI_Datatype recvtype, int src, int recvtag,
255                        MPI_Comm comm, MPI_Status * status)
256 {
257   MPI_Request requests[2];
258   MPI_Status stats[2];
259
260   requests[0] =
261       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
262   requests[1] =
263       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
264   smpi_mpi_startall(2, requests);
265   smpi_mpi_waitall(2, requests, stats);
266   if(status != MPI_STATUS_IGNORE) {
267     // Copy receive status
268     memcpy(status, &stats[1], sizeof(MPI_Status));
269   }
270 }
271
272 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
273 {
274   return status->count / smpi_datatype_size(datatype);
275 }
276
277 static void finish_wait(MPI_Request * request, MPI_Status * status)
278 {
279   MPI_Request req = *request;
280   // if we have a sender, we should use its data, and not the data from the receive
281   if((req->action)&&
282       (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
283     req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
284
285   if(status != MPI_STATUS_IGNORE) {
286     status->MPI_SOURCE = req->src;
287     status->MPI_TAG = req->tag;
288     status->MPI_ERROR = MPI_SUCCESS;
289     // FIXME: really this should just contain the count of receive-type blocks,
290     // right?
291     status->count = req->size;
292   }
293   req = *request;
294
295   print_request("Finishing", req);
296   if(req->flags & NON_PERSISTENT) {
297     smpi_mpi_request_free(request);
298   } else {
299     req->action = NULL;
300   }
301 }
302
303 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
304 int flag;
305
306    if ((*request)->action == NULL)
307   flag = 1;
308    else 
309     flag = simcall_comm_test((*request)->action);
310    if(flag) {
311        finish_wait(request, status);
312     }
313     return flag;
314 }
315
316 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
317                      MPI_Status * status)
318 {
319   xbt_dynar_t comms;
320   int i, flag, size;
321   int* map;
322
323   *index = MPI_UNDEFINED;
324   flag = 0;
325   if(count > 0) {
326     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
327     map = xbt_new(int, count);
328     size = 0;
329     for(i = 0; i < count; i++) {
330       if(requests[i]->action) {
331          xbt_dynar_push(comms, &requests[i]->action);
332          map[size] = i;
333          size++;
334       }
335     }
336     if(size > 0) {
337       i = simcall_comm_testany(comms);
338       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
339       if(i != MPI_UNDEFINED) {
340         *index = map[i];
341         smpi_mpi_wait(&requests[*index], status);
342         flag = 1;
343       }
344     }
345     xbt_free(map);
346     xbt_dynar_free(&comms);
347   }
348   return flag;
349 }
350
351
352 int smpi_mpi_testall(int count, MPI_Request requests[],
353                      MPI_Status status[])
354 {
355   int flag=1;
356   int i;
357   for(i=0; i<count; i++)
358    if (smpi_mpi_test(&requests[i], &status[i])!=1)
359        flag=0;
360
361   return flag;
362 }
363
364 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
365   int flag=0;
366   //FIXME find another wait to avoid busy waiting ?
367   // the issue here is that we have to wait on a nonexistent comm
368   MPI_Request request;
369   while(flag==0){
370         request = smpi_mpi_iprobe(source, tag, comm, &flag, status);
371         XBT_DEBUG("Busy Waiting on probing : %d", flag);
372         if(!flag) {
373             smpi_mpi_request_free(&request);
374             simcall_process_sleep(0.0001);
375         }
376   }
377 }
378
379 MPI_Request smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
380   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
381             comm, NON_PERSISTENT | RECV);
382   // behave like a receive, but don't do it
383   smx_rdv_t mailbox;
384
385   print_request("New iprobe", request);
386   // We have to test both mailboxes as we don't know if we will receive one one or another
387     if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
388         mailbox = smpi_process_mailbox_small();
389         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
390
391     }
392     if (request->action==NULL){
393         mailbox = smpi_process_mailbox();
394         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
395     }
396
397   if(request->action){
398     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
399     *flag=true;
400     if(status != MPI_STATUS_IGNORE) {
401       status->MPI_SOURCE = req->src;
402       status->MPI_TAG = req->tag;
403       status->MPI_ERROR = MPI_SUCCESS;
404       status->count = req->size;
405     }
406   }
407   else *flag=false;
408
409   return request;
410 }
411
412 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
413 {
414   print_request("Waiting", *request);
415   if ((*request)->action != NULL) { // this is not a detached send
416     simcall_comm_wait((*request)->action, -1.0);
417     finish_wait(request, status);
418   }
419   // FIXME for a detached send, finish_wait is not called:
420 }
421
422 int smpi_mpi_waitany(int count, MPI_Request requests[],
423                      MPI_Status * status)
424 {
425   xbt_dynar_t comms;
426   int i, size, index;
427   int *map;
428
429   index = MPI_UNDEFINED;
430   if(count > 0) {
431     // Wait for a request to complete
432     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
433     map = xbt_new(int, count);
434     size = 0;
435     XBT_DEBUG("Wait for one of");
436     for(i = 0; i < count; i++) {
437       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
438         print_request("Waiting any ", requests[i]);
439         xbt_dynar_push(comms, &requests[i]->action);
440         map[size] = i;
441         size++;
442       }
443     }
444     if(size > 0) {
445       i = simcall_comm_waitany(comms);
446
447       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
448       if (i != MPI_UNDEFINED) {
449         index = map[i];
450         finish_wait(&requests[index], status);
451
452       }
453     }
454     xbt_free(map);
455     xbt_dynar_free(&comms);
456   }
457   return index;
458 }
459
460 void smpi_mpi_waitall(int count, MPI_Request requests[],
461                       MPI_Status status[])
462 {
463   int index, c;
464   MPI_Status stat;
465   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
466
467   for(c = 0; c < count; c++) {
468     if(MC_IS_ENABLED) {
469       smpi_mpi_wait(&requests[c], pstat);
470       index = c;
471     } else {
472       index = smpi_mpi_waitany(count, requests, pstat);
473       if(index == MPI_UNDEFINED) {
474         break;
475       }
476     }
477     if(status != MPI_STATUS_IGNORE) {
478       memcpy(&status[index], pstat, sizeof(*pstat));
479     }
480   }
481 }
482
483 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
484                       MPI_Status status[])
485 {
486   int i, count, index;
487
488   count = 0;
489   for(i = 0; i < incount; i++) {
490      if(smpi_mpi_testany(incount, requests, &index, status)) {
491        indices[count] = index;
492        count++;
493      }
494   }
495   return count;
496 }
497
498 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
499                     MPI_Comm comm)
500 {
501   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
502   nary_tree_bcast(buf, count, datatype, root, comm, 4);
503 }
504
505 void smpi_mpi_barrier(MPI_Comm comm)
506 {
507   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
508   nary_tree_barrier(comm, 4);
509 }
510
511 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
512                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
513                      int root, MPI_Comm comm)
514 {
515   int system_tag = 666;
516   int rank, size, src, index;
517   MPI_Aint lb = 0, recvext = 0;
518   MPI_Request *requests;
519
520   rank = smpi_comm_rank(comm);
521   size = smpi_comm_size(comm);
522   if(rank != root) {
523     // Send buffer to root
524     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
525   } else {
526     // FIXME: check for errors
527     smpi_datatype_extent(recvtype, &lb, &recvext);
528     // Local copy from root
529     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
530         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
531     // Receive buffers from senders
532     requests = xbt_new(MPI_Request, size - 1);
533     index = 0;
534     for(src = 0; src < size; src++) {
535       if(src != root) {
536         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
537                                           recvcount, recvtype, 
538                                           src, system_tag, comm);
539         index++;
540       }
541     }
542     // Wait for completion of irecv's.
543     smpi_mpi_startall(size - 1, requests);
544     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
545     xbt_free(requests);
546   }
547 }
548
549 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
550                       void *recvbuf, int *recvcounts, int *displs,
551                       MPI_Datatype recvtype, int root, MPI_Comm comm)
552 {
553   int system_tag = 666;
554   int rank, size, src, index;
555   MPI_Aint lb = 0, recvext = 0;
556   MPI_Request *requests;
557
558   rank = smpi_comm_rank(comm);
559   size = smpi_comm_size(comm);
560   if(rank != root) {
561     // Send buffer to root
562     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
563   } else {
564     // FIXME: check for errors
565     smpi_datatype_extent(recvtype, &lb, &recvext);
566     // Local copy from root
567     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
568                        (char *)recvbuf + displs[root] * recvext, 
569                        recvcounts[root], recvtype);
570     // Receive buffers from senders
571     requests = xbt_new(MPI_Request, size - 1);
572     index = 0;
573     for(src = 0; src < size; src++) {
574       if(src != root) {
575         requests[index] =
576             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
577                             recvcounts[src], recvtype, src, system_tag, comm);
578         index++;
579       }
580     }
581     // Wait for completion of irecv's.
582     smpi_mpi_startall(size - 1, requests);
583     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
584     xbt_free(requests);
585   }
586 }
587
588 void smpi_mpi_allgather(void *sendbuf, int sendcount,
589                         MPI_Datatype sendtype, void *recvbuf,
590                         int recvcount, MPI_Datatype recvtype,
591                         MPI_Comm comm)
592 {
593   int system_tag = 666;
594   int rank, size, other, index;
595   MPI_Aint lb = 0, recvext = 0;
596   MPI_Request *requests;
597
598   rank = smpi_comm_rank(comm);
599   size = smpi_comm_size(comm);
600   // FIXME: check for errors
601   smpi_datatype_extent(recvtype, &lb, &recvext);
602   // Local copy from self
603   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
604                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
605                      recvtype);
606   // Send/Recv buffers to/from others;
607   requests = xbt_new(MPI_Request, 2 * (size - 1));
608   index = 0;
609   for(other = 0; other < size; other++) {
610     if(other != rank) {
611       requests[index] =
612           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
613                           comm);
614       index++;
615       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
616                                         recvcount, recvtype, other, 
617                                         system_tag, comm);
618       index++;
619     }
620   }
621   // Wait for completion of all comms.
622   smpi_mpi_startall(2 * (size - 1), requests);
623   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
624   xbt_free(requests);
625 }
626
627 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
628                          MPI_Datatype sendtype, void *recvbuf,
629                          int *recvcounts, int *displs,
630                          MPI_Datatype recvtype, MPI_Comm comm)
631 {
632   int system_tag = 666;
633   int rank, size, other, index;
634   MPI_Aint lb = 0, recvext = 0;
635   MPI_Request *requests;
636
637   rank = smpi_comm_rank(comm);
638   size = smpi_comm_size(comm);
639   // FIXME: check for errors
640   smpi_datatype_extent(recvtype, &lb, &recvext);
641   // Local copy from self
642   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
643                      (char *)recvbuf + displs[rank] * recvext, 
644                      recvcounts[rank], recvtype);
645   // Send buffers to others;
646   requests = xbt_new(MPI_Request, 2 * (size - 1));
647   index = 0;
648   for(other = 0; other < size; other++) {
649     if(other != rank) {
650       requests[index] =
651           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
652                           comm);
653       index++;
654       requests[index] =
655           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
656                           recvtype, other, system_tag, comm);
657       index++;
658     }
659   }
660   // Wait for completion of all comms.
661   smpi_mpi_startall(2 * (size - 1), requests);
662   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
663   xbt_free(requests);
664 }
665
666 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
667                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
668                       int root, MPI_Comm comm)
669 {
670   int system_tag = 666;
671   int rank, size, dst, index;
672   MPI_Aint lb = 0, sendext = 0;
673   MPI_Request *requests;
674
675   rank = smpi_comm_rank(comm);
676   size = smpi_comm_size(comm);
677   if(rank != root) {
678     // Recv buffer from root
679     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
680                   MPI_STATUS_IGNORE);
681   } else {
682     // FIXME: check for errors
683     smpi_datatype_extent(sendtype, &lb, &sendext);
684     // Local copy from root
685     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
686       sendcount, sendtype, recvbuf, recvcount, recvtype);
687     // Send buffers to receivers
688     requests = xbt_new(MPI_Request, size - 1);
689     index = 0;
690     for(dst = 0; dst < size; dst++) {
691       if(dst != root) {
692         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
693                                           sendcount, sendtype, dst,
694                                           system_tag, comm);
695         index++;
696       }
697     }
698     // Wait for completion of isend's.
699     smpi_mpi_startall(size - 1, requests);
700     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
701     xbt_free(requests);
702   }
703 }
704
705 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
706                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
707                        MPI_Datatype recvtype, int root, MPI_Comm comm)
708 {
709   int system_tag = 666;
710   int rank, size, dst, index;
711   MPI_Aint lb = 0, sendext = 0;
712   MPI_Request *requests;
713
714   rank = smpi_comm_rank(comm);
715   size = smpi_comm_size(comm);
716   if(rank != root) {
717     // Recv buffer from root
718     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
719                   MPI_STATUS_IGNORE);
720   } else {
721     // FIXME: check for errors
722     smpi_datatype_extent(sendtype, &lb, &sendext);
723     // Local copy from root
724     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
725                        sendtype, recvbuf, recvcount, recvtype);
726     // Send buffers to receivers
727     requests = xbt_new(MPI_Request, size - 1);
728     index = 0;
729     for(dst = 0; dst < size; dst++) {
730       if(dst != root) {
731         requests[index] =
732             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
733                             sendtype, dst, system_tag, comm);
734         index++;
735       }
736     }
737     // Wait for completion of isend's.
738     smpi_mpi_startall(size - 1, requests);
739     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
740     xbt_free(requests);
741   }
742 }
743
744 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
745                      MPI_Datatype datatype, MPI_Op op, int root,
746                      MPI_Comm comm)
747 {
748   int system_tag = 666;
749   int rank, size, src, index;
750   MPI_Aint lb = 0, dataext = 0;
751   MPI_Request *requests;
752   void **tmpbufs;
753
754   rank = smpi_comm_rank(comm);
755   size = smpi_comm_size(comm);
756   if(rank != root) {
757     // Send buffer to root
758     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
759   } else {
760     // FIXME: check for errors
761     smpi_datatype_extent(datatype, &lb, &dataext);
762     // Local copy from root
763     if (sendbuf && recvbuf)
764       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
765     // Receive buffers from senders
766     //TODO: make a MPI_barrier here ?
767     requests = xbt_new(MPI_Request, size - 1);
768     tmpbufs = xbt_new(void *, size - 1);
769     index = 0;
770     for(src = 0; src < size; src++) {
771       if(src != root) {
772         // FIXME: possibly overkill we we have contiguous/noncontiguous data
773         //  mapping...
774         tmpbufs[index] = xbt_malloc(count * dataext);
775         requests[index] =
776             smpi_irecv_init(tmpbufs[index], count, datatype, src,
777                             system_tag, comm);
778         index++;
779       }
780     }
781     // Wait for completion of irecv's.
782     smpi_mpi_startall(size - 1, requests);
783     for(src = 0; src < size - 1; src++) {
784       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
785       XBT_VERB("finished waiting any request with index %d", index);
786       if(index == MPI_UNDEFINED) {
787         break;
788       }
789       if(op) /* op can be MPI_OP_NULL that does nothing */
790         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
791     }
792     for(index = 0; index < size - 1; index++) {
793       xbt_free(tmpbufs[index]);
794     }
795     xbt_free(tmpbufs);
796     xbt_free(requests);
797   }
798 }
799
800 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
801                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
802 {
803   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
804   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
805 }
806
807 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
808                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
809 {
810   int system_tag = 666;
811   int rank, size, other, index;
812   MPI_Aint lb = 0, dataext = 0;
813   MPI_Request *requests;
814   void **tmpbufs;
815
816   rank = smpi_comm_rank(comm);
817   size = smpi_comm_size(comm);
818
819   // FIXME: check for errors
820   smpi_datatype_extent(datatype, &lb, &dataext);
821
822   // Local copy from self
823   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
824
825   // Send/Recv buffers to/from others;
826   requests = xbt_new(MPI_Request, size - 1);
827   tmpbufs = xbt_new(void *, rank);
828   index = 0;
829   for(other = 0; other < rank; other++) {
830     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
831     // mapping...
832     tmpbufs[index] = xbt_malloc(count * dataext);
833     requests[index] =
834         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
835                         comm);
836     index++;
837   }
838   for(other = rank + 1; other < size; other++) {
839     requests[index] =
840         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
841     index++;
842   }
843   // Wait for completion of all comms.
844   smpi_mpi_startall(size - 1, requests);
845   for(other = 0; other < size - 1; other++) {
846     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
847     if(index == MPI_UNDEFINED) {
848       break;
849     }
850     if(index < rank) {
851       // #Request is below rank: it's a irecv
852       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
853     }
854   }
855   for(index = 0; index < rank; index++) {
856     xbt_free(tmpbufs[index]);
857   }
858   xbt_free(tmpbufs);
859   xbt_free(requests);
860 }