Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'killgraskill'
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/virtu.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
17
18
19 static int match_recv(void* a, void* b, smx_action_t ignored) {
20    MPI_Request ref = (MPI_Request)a;
21    MPI_Request req = (MPI_Request)b;
22    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
23
24   xbt_assert(ref, "Cannot match recv against null reference");
25   xbt_assert(req, "Cannot match recv against null request");
26   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
27     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
28 }
29
30 static int match_send(void* a, void* b,smx_action_t ignored) {
31    MPI_Request ref = (MPI_Request)a;
32    MPI_Request req = (MPI_Request)b;
33    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
34    xbt_assert(ref, "Cannot match send against null reference");
35    xbt_assert(req, "Cannot match send against null request");
36    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
37           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
38 }
39
40 static MPI_Request build_request(void *buf, int count,
41                                  MPI_Datatype datatype, int src, int dst,
42                                  int tag, MPI_Comm comm, unsigned flags)
43 {
44   MPI_Request request;
45
46   void *old_buf = NULL;
47
48   request = xbt_new(s_smpi_mpi_request_t, 1);
49
50   s_smpi_subtype_t *subtype = datatype->substruct;
51
52   if(datatype->has_subtype == 1){
53     // This part handles the problem of non-contignous memory
54     old_buf = buf;
55     buf = malloc(count*smpi_datatype_size(datatype));
56     if (flags & SEND) {
57       subtype->serialize(old_buf, buf, count, datatype->substruct);
58     }
59   }
60
61   request->buf = buf;
62   // This part handles the problem of non-contignous memory (for the
63   // unserialisation at the reception)
64   request->old_buf = old_buf;
65   request->old_type = datatype;
66
67   request->size = smpi_datatype_size(datatype) * count;
68   request->src = src;
69   request->dst = dst;
70   request->tag = tag;
71   request->comm = comm;
72   request->action = NULL;
73   request->flags = flags;
74   request->detached = 0;
75 #ifdef HAVE_TRACING
76   request->send = 0;
77   request->recv = 0;
78 #endif
79   return request;
80 }
81
82
83 void smpi_empty_status(MPI_Status * status) {
84   if(status != MPI_STATUS_IGNORE) {
85       status->MPI_SOURCE=MPI_ANY_SOURCE;
86       status->MPI_TAG=MPI_ANY_TAG;
87       status->count=0;
88   }
89 }
90
91 void smpi_action_trace_run(char *path)
92 {
93   char *name;
94   xbt_dynar_t todo;
95   xbt_dict_cursor_t cursor;
96
97   action_fp=NULL;
98   if (path) {
99     action_fp = fopen(path, "r");
100     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
101                strerror(errno));
102   }
103
104   if (!xbt_dict_is_empty(action_queues)) {
105     XBT_WARN
106       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
107
108
109     xbt_dict_foreach(action_queues, cursor, name, todo) {
110       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
111     }
112   }
113
114   if (path)
115     fclose(action_fp);
116   xbt_dict_free(&action_queues);
117   action_queues = xbt_dict_new_homogeneous(NULL);
118 }
119
120 static void smpi_mpi_request_free_voidp(void* request)
121 {
122   MPI_Request req = request;
123   smpi_mpi_request_free(&req);
124 }
125
126 /* MPI Low level calls */
127 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
128                                int dst, int tag, MPI_Comm comm)
129 {
130   MPI_Request request =
131     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132                   comm, PERSISTENT | SEND);
133
134   return request;
135 }
136
137 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
138                                int src, int tag, MPI_Comm comm)
139 {
140   MPI_Request request =
141     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
142                   comm, PERSISTENT | RECV);
143
144   return request;
145 }
146
147 void smpi_mpi_start(MPI_Request request)
148 {
149   smx_rdv_t mailbox;
150
151   xbt_assert(!request->action,
152              "Cannot (re)start a non-finished communication");
153   if(request->flags & RECV) {
154     print_request("New recv", request);
155     if (request->size < surf_cfg_get_int("smpi/async_small_thres"))
156       mailbox = smpi_process_mailbox_small();
157     else
158       mailbox = smpi_process_mailbox();
159
160     // FIXME: SIMIX does not yet support non-contiguous datatypes
161     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
162   } else {
163
164     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
165 /*    if(receiver == MPI_UNDEFINED) {*/
166 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
167 /*      return;*/
168 /*    }*/
169     print_request("New send", request);
170     if (request->size < surf_cfg_get_int("smpi/async_small_thres")) { // eager mode
171       mailbox = smpi_process_remote_mailbox_small(receiver);
172     }else{
173       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
174       mailbox = smpi_process_remote_mailbox(receiver);
175     }
176     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
177       void *oldbuf = NULL;
178       if(request->old_type->has_subtype == 0){
179         oldbuf = request->buf;
180         request->detached = 1;
181         request->buf = malloc(request->size);
182         if (oldbuf)
183           memcpy(request->buf,oldbuf,request->size);
184       }
185       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
186     }
187
188     request->action =
189       simcall_comm_isend(mailbox, request->size, -1.0,
190                          request->buf, request->size,
191                          &match_send,
192                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
193                          request,
194                          // detach if msg size < eager/rdv switch limit
195                          request->detached);
196
197 #ifdef HAVE_TRACING
198     /* FIXME: detached sends are not traceable (request->action == NULL) */
199     if (request->action)
200       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
201 #endif
202
203   }
204
205 }
206
207 void smpi_mpi_startall(int count, MPI_Request * requests)
208 {
209   int i;
210
211   for(i = 0; i < count; i++) {
212     smpi_mpi_start(requests[i]);
213   }
214 }
215
216 void smpi_mpi_request_free(MPI_Request * request)
217 {
218   xbt_free(*request);
219   *request = MPI_REQUEST_NULL;
220 }
221
222 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
223                             int dst, int tag, MPI_Comm comm)
224 {
225   MPI_Request request =
226     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
227                   comm, NON_PERSISTENT | SEND);
228
229   return request;
230 }
231
232 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
233                            int dst, int tag, MPI_Comm comm)
234 {
235   MPI_Request request =
236     smpi_isend_init(buf, count, datatype, dst, tag, comm);
237
238   smpi_mpi_start(request);
239   return request;
240 }
241
242 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
243                             int src, int tag, MPI_Comm comm)
244 {
245   MPI_Request request =
246     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
247                   comm, NON_PERSISTENT | RECV);
248   return request;
249 }
250
251 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
252                            int src, int tag, MPI_Comm comm)
253 {
254   MPI_Request request =
255     smpi_irecv_init(buf, count, datatype, src, tag, comm);
256
257   smpi_mpi_start(request);
258   return request;
259 }
260
261 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
262                    int tag, MPI_Comm comm, MPI_Status * status)
263 {
264   MPI_Request request;
265   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
266   smpi_mpi_wait(&request, status);
267 }
268
269
270
271 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
272                    int tag, MPI_Comm comm)
273 {
274   MPI_Request request;
275
276   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
277   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
278 }
279
280 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
281                        int dst, int sendtag, void *recvbuf, int recvcount,
282                        MPI_Datatype recvtype, int src, int recvtag,
283                        MPI_Comm comm, MPI_Status * status)
284 {
285   MPI_Request requests[2];
286   MPI_Status stats[2];
287
288   requests[0] =
289     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
290   requests[1] =
291     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
292   smpi_mpi_startall(2, requests);
293   smpi_mpi_waitall(2, requests, stats);
294   if(status != MPI_STATUS_IGNORE) {
295     // Copy receive status
296     memcpy(status, &stats[1], sizeof(MPI_Status));
297   }
298 }
299
300 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
301 {
302   return status->count / smpi_datatype_size(datatype);
303 }
304
305 static void finish_wait(MPI_Request * request, MPI_Status * status)
306 {
307   MPI_Request req = *request;
308   // if we have a sender, we should use its data, and not the data from the receive
309   //FIXME : mail fail if req->action has already been freed, the pointer being invalid
310   if((req->action)&&
311      (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
312     req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
313
314   if(status != MPI_STATUS_IGNORE) {
315     status->MPI_SOURCE = req->src;
316     status->MPI_TAG = req->tag;
317     //if((*request)->action && ((MPI_Request)SIMIX_comm_get_src_data((*request)->action))->size == (*request)->size)
318     status->MPI_ERROR = MPI_SUCCESS;
319     //else status->MPI_ERROR = MPI_ERR_TRUNCATE;
320     // this handles the case were size in receive differs from size in send
321     // FIXME: really this should just contain the count of receive-type blocks,
322     // right?
323     status->count = req->size;
324   }
325   req = *request;
326
327   print_request("Finishing", req);
328   MPI_Datatype datatype = req->old_type;
329   if(datatype->has_subtype == 1){
330       // This part handles the problem of non-contignous memory
331       // the unserialization at the reception
332     s_smpi_subtype_t *subtype = datatype->substruct;
333     if(req->flags & RECV) {
334       subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
335     }
336     if(req->detached == 0) free(req->buf);
337   }
338
339   if(req->flags & NON_PERSISTENT) {
340     smpi_mpi_request_free(request);
341   } else {
342     req->action = NULL;
343   }
344 }
345
346 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
347   int flag;
348
349   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
350   if ((*request)->action == NULL)
351     flag = 1;
352   else
353     flag = simcall_comm_test((*request)->action);
354   if(flag) {
355     finish_wait(request, status);
356   }else{
357     smpi_empty_status(status);
358   }
359   return flag;
360 }
361
362 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
363                      MPI_Status * status)
364 {
365   xbt_dynar_t comms;
366   int i, flag, size;
367   int* map;
368
369   *index = MPI_UNDEFINED;
370   flag = 0;
371   if(count > 0) {
372     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
373     map = xbt_new(int, count);
374     size = 0;
375     for(i = 0; i < count; i++) {
376       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
377          xbt_dynar_push(comms, &requests[i]->action);
378          map[size] = i;
379          size++;
380       }
381     }
382     if(size > 0) {
383       i = simcall_comm_testany(comms);
384       // not MPI_UNDEFINED, as this is a simix return code
385       if(i != -1) {
386         *index = map[i];
387         finish_wait(&requests[*index], status);
388         flag = 1;
389       }
390     }else{
391         //all requests are null or inactive, return true
392         flag=1;
393         smpi_empty_status(status);
394     }
395     xbt_free(map);
396     xbt_dynar_free(&comms);
397   }
398
399   return flag;
400 }
401
402
403 int smpi_mpi_testall(int count, MPI_Request requests[],
404                      MPI_Status status[])
405 {
406   MPI_Status stat;
407   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
408   int flag=1;
409   int i;
410   for(i=0; i<count; i++){
411     if(requests[i]!= MPI_REQUEST_NULL){
412       if (smpi_mpi_test(&requests[i], pstat)!=1){
413         flag=0;
414       }
415     }else{
416       smpi_empty_status(pstat);
417     }
418     if(status != MPI_STATUSES_IGNORE) {
419       memcpy(&status[i], pstat, sizeof(*pstat));
420     }
421   }
422   return flag;
423 }
424
425 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
426   int flag=0;
427   //FIXME find another wait to avoid busy waiting ?
428   // the issue here is that we have to wait on a nonexistent comm
429   while(flag==0){
430     smpi_mpi_iprobe(source, tag, comm, &flag, status);
431     XBT_DEBUG("Busy Waiting on probing : %d", flag);
432     if(!flag) {
433       simcall_process_sleep(0.0001);
434     }
435   }
436 }
437
438 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
439   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
440             comm, NON_PERSISTENT | RECV);
441
442   // behave like a receive, but don't do it
443   smx_rdv_t mailbox;
444
445   print_request("New iprobe", request);
446   // We have to test both mailboxes as we don't know if we will receive one one or another
447     if (surf_cfg_get_int("smpi/async_small_thres")>0){
448         mailbox = smpi_process_mailbox_small();
449         XBT_DEBUG("trying to probe the perm recv mailbox");
450         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
451     }
452     if (request->action==NULL){
453         mailbox = smpi_process_mailbox();
454         XBT_DEBUG("trying to probe the other mailbox");
455         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
456     }
457
458   if(request->action){
459     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
460     *flag = 1;
461     if(status != MPI_STATUS_IGNORE) {
462       status->MPI_SOURCE = req->src;
463       status->MPI_TAG = req->tag;
464       status->MPI_ERROR = MPI_SUCCESS;
465       status->count = req->size;
466     }
467   }
468   else *flag = 0;
469   smpi_mpi_request_free(&request);
470
471   return;
472 }
473
474 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
475 {
476   print_request("Waiting", *request);
477   if ((*request)->action != NULL) { // this is not a detached send
478     simcall_comm_wait((*request)->action, -1.0);
479     finish_wait(request, status);
480   }
481   // FIXME for a detached send, finish_wait is not called:
482 }
483
484 int smpi_mpi_waitany(int count, MPI_Request requests[],
485                      MPI_Status * status)
486 {
487   xbt_dynar_t comms;
488   int i, size, index;
489   int *map;
490
491   index = MPI_UNDEFINED;
492   if(count > 0) {
493     // Wait for a request to complete
494     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
495     map = xbt_new(int, count);
496     size = 0;
497     XBT_DEBUG("Wait for one of");
498     for(i = 0; i < count; i++) {
499       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
500         print_request("Waiting any ", requests[i]);
501         xbt_dynar_push(comms, &requests[i]->action);
502         map[size] = i;
503         size++;
504       }
505     }
506     if(size > 0) {
507       i = simcall_comm_waitany(comms);
508
509       // not MPI_UNDEFINED, as this is a simix return code
510       if (i != -1) {
511         index = map[i];
512         finish_wait(&requests[index], status);
513       }
514     }
515     xbt_free(map);
516     xbt_dynar_free(&comms);
517   }
518
519   if (index==MPI_UNDEFINED)
520     smpi_empty_status(status);
521
522   return index;
523 }
524
525 int smpi_mpi_waitall(int count, MPI_Request requests[],
526                       MPI_Status status[])
527 {
528   int  index, c;
529   MPI_Status stat;
530   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
531   int retvalue=MPI_SUCCESS;
532   //tag invalid requests in the set
533   for(c = 0; c < count; c++) {
534     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
535       if(status != MPI_STATUSES_IGNORE)
536         smpi_empty_status(&status[c]);
537     }else if(requests[c]->src == MPI_PROC_NULL ){
538       if(status != MPI_STATUSES_IGNORE) {
539         smpi_empty_status(&status[c]);
540         status[c].MPI_SOURCE=MPI_PROC_NULL;
541       }
542     }
543   }
544
545   for(c = 0; c < count; c++) {
546       if(MC_is_active()) {
547         smpi_mpi_wait(&requests[c], pstat);
548         index = c;
549       } else {
550         index = smpi_mpi_waitany(count, requests, pstat);
551         if(index == MPI_UNDEFINED) {
552           break;
553        }
554       if(status != MPI_STATUSES_IGNORE) {
555         memcpy(&status[index], pstat, sizeof(*pstat));
556         if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
557
558       }
559     }
560   }
561   return retvalue;
562 }
563
564 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
565                       MPI_Status status[])
566 {
567   int i, count, index;
568   MPI_Status stat;
569   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
570
571   count = 0;
572   for(i = 0; i < incount; i++)
573   {
574     index=smpi_mpi_waitany(incount, requests, pstat);
575     if(index!=MPI_UNDEFINED){
576       indices[count] = index;
577       count++;
578       if(status != MPI_STATUSES_IGNORE) {
579         memcpy(&status[index], pstat, sizeof(*pstat));
580       }
581     }else{
582       return MPI_UNDEFINED;
583     }
584   }
585   return count;
586 }
587
588 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
589                       MPI_Status status[])
590 {
591   int i, count, count_dead;
592   MPI_Status stat;
593   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
594
595   count = 0;
596   count_dead = 0;
597   for(i = 0; i < incount; i++) {
598     if((requests[i] != MPI_REQUEST_NULL)) {
599       if(smpi_mpi_test(&requests[i], pstat)) {
600          indices[count] = i;
601          count++;
602          if(status != MPI_STATUSES_IGNORE) {
603             memcpy(&status[i], pstat, sizeof(*pstat));
604          }
605       }
606     }else{
607       count_dead++;
608     }
609   }
610   if(count_dead==incount)return MPI_UNDEFINED;
611   else return count;
612 }
613
614 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
615                     MPI_Comm comm)
616 {
617   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
618   nary_tree_bcast(buf, count, datatype, root, comm, 4);
619 }
620
621 void smpi_mpi_barrier(MPI_Comm comm)
622 {
623   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
624   nary_tree_barrier(comm, 4);
625 }
626
627 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
628                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
629                      int root, MPI_Comm comm)
630 {
631   int system_tag = 666;
632   int rank, size, src, index;
633   MPI_Aint lb = 0, recvext = 0;
634   MPI_Request *requests;
635
636   rank = smpi_comm_rank(comm);
637   size = smpi_comm_size(comm);
638   if(rank != root) {
639     // Send buffer to root
640     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
641   } else {
642     // FIXME: check for errors
643     smpi_datatype_extent(recvtype, &lb, &recvext);
644     // Local copy from root
645     smpi_datatype_copy(sendbuf, sendcount, sendtype,
646                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
647     // Receive buffers from senders
648     requests = xbt_new(MPI_Request, size - 1);
649     index = 0;
650     for(src = 0; src < size; src++) {
651       if(src != root) {
652         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
653                                           recvcount, recvtype,
654                                           src, system_tag, comm);
655         index++;
656       }
657     }
658     // Wait for completion of irecv's.
659     smpi_mpi_startall(size - 1, requests);
660     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
661     xbt_free(requests);
662   }
663 }
664
665 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
666                       void *recvbuf, int *recvcounts, int *displs,
667                       MPI_Datatype recvtype, int root, MPI_Comm comm)
668 {
669   int system_tag = 666;
670   int rank, size, src, index;
671   MPI_Aint lb = 0, recvext = 0;
672   MPI_Request *requests;
673
674   rank = smpi_comm_rank(comm);
675   size = smpi_comm_size(comm);
676   if(rank != root) {
677     // Send buffer to root
678     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
679   } else {
680     // FIXME: check for errors
681     smpi_datatype_extent(recvtype, &lb, &recvext);
682     // Local copy from root
683     smpi_datatype_copy(sendbuf, sendcount, sendtype,
684                        (char *)recvbuf + displs[root] * recvext,
685                        recvcounts[root], recvtype);
686     // Receive buffers from senders
687     requests = xbt_new(MPI_Request, size - 1);
688     index = 0;
689     for(src = 0; src < size; src++) {
690       if(src != root) {
691         requests[index] =
692           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
693                           recvcounts[src], recvtype, src, system_tag, comm);
694         index++;
695       }
696     }
697     // Wait for completion of irecv's.
698     smpi_mpi_startall(size - 1, requests);
699     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
700     xbt_free(requests);
701   }
702 }
703
704 void smpi_mpi_allgather(void *sendbuf, int sendcount,
705                         MPI_Datatype sendtype, void *recvbuf,
706                         int recvcount, MPI_Datatype recvtype,
707                         MPI_Comm comm)
708 {
709   int system_tag = 666;
710   int rank, size, other, index;
711   MPI_Aint lb = 0, recvext = 0;
712   MPI_Request *requests;
713
714   rank = smpi_comm_rank(comm);
715   size = smpi_comm_size(comm);
716   // FIXME: check for errors
717   smpi_datatype_extent(recvtype, &lb, &recvext);
718   // Local copy from self
719   smpi_datatype_copy(sendbuf, sendcount, sendtype,
720                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
721                      recvtype);
722   // Send/Recv buffers to/from others;
723   requests = xbt_new(MPI_Request, 2 * (size - 1));
724   index = 0;
725   for(other = 0; other < size; other++) {
726     if(other != rank) {
727       requests[index] =
728         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
729                         comm);
730       index++;
731       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
732                                         recvcount, recvtype, other,
733                                         system_tag, comm);
734       index++;
735     }
736   }
737   // Wait for completion of all comms.
738   smpi_mpi_startall(2 * (size - 1), requests);
739   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
740   xbt_free(requests);
741 }
742
743 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
744                          MPI_Datatype sendtype, void *recvbuf,
745                          int *recvcounts, int *displs,
746                          MPI_Datatype recvtype, MPI_Comm comm)
747 {
748   int system_tag = 666;
749   int rank, size, other, index;
750   MPI_Aint lb = 0, recvext = 0;
751   MPI_Request *requests;
752
753   rank = smpi_comm_rank(comm);
754   size = smpi_comm_size(comm);
755   // FIXME: check for errors
756   smpi_datatype_extent(recvtype, &lb, &recvext);
757   // Local copy from self
758   smpi_datatype_copy(sendbuf, sendcount, sendtype,
759                      (char *)recvbuf + displs[rank] * recvext,
760                      recvcounts[rank], recvtype);
761   // Send buffers to others;
762   requests = xbt_new(MPI_Request, 2 * (size - 1));
763   index = 0;
764   for(other = 0; other < size; other++) {
765     if(other != rank) {
766       requests[index] =
767         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
768                         comm);
769       index++;
770       requests[index] =
771         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
772                         recvtype, other, system_tag, comm);
773       index++;
774     }
775   }
776   // Wait for completion of all comms.
777   smpi_mpi_startall(2 * (size - 1), requests);
778   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
779   xbt_free(requests);
780 }
781
782 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
783                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
784                       int root, MPI_Comm comm)
785 {
786   int system_tag = 666;
787   int rank, size, dst, index;
788   MPI_Aint lb = 0, sendext = 0;
789   MPI_Request *requests;
790
791   rank = smpi_comm_rank(comm);
792   size = smpi_comm_size(comm);
793   if(rank != root) {
794     // Recv buffer from root
795     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
796                   MPI_STATUS_IGNORE);
797   } else {
798     // FIXME: check for errors
799     smpi_datatype_extent(sendtype, &lb, &sendext);
800     // Local copy from root
801     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
802                        sendcount, sendtype, recvbuf, recvcount, recvtype);
803     // Send buffers to receivers
804     requests = xbt_new(MPI_Request, size - 1);
805     index = 0;
806     for(dst = 0; dst < size; dst++) {
807       if(dst != root) {
808         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
809                                           sendcount, sendtype, dst,
810                                           system_tag, comm);
811         index++;
812       }
813     }
814     // Wait for completion of isend's.
815     smpi_mpi_startall(size - 1, requests);
816     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
817     xbt_free(requests);
818   }
819 }
820
821 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
822                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
823                        MPI_Datatype recvtype, int root, MPI_Comm comm)
824 {
825   int system_tag = 666;
826   int rank, size, dst, index;
827   MPI_Aint lb = 0, sendext = 0;
828   MPI_Request *requests;
829
830   rank = smpi_comm_rank(comm);
831   size = smpi_comm_size(comm);
832   if(rank != root) {
833     // Recv buffer from root
834     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
835                   MPI_STATUS_IGNORE);
836   } else {
837     // FIXME: check for errors
838     smpi_datatype_extent(sendtype, &lb, &sendext);
839     // Local copy from root
840     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
841                        sendtype, recvbuf, recvcount, recvtype);
842     // Send buffers to receivers
843     requests = xbt_new(MPI_Request, size - 1);
844     index = 0;
845     for(dst = 0; dst < size; dst++) {
846       if(dst != root) {
847         requests[index] =
848           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
849                           sendtype, dst, system_tag, comm);
850         index++;
851       }
852     }
853     // Wait for completion of isend's.
854     smpi_mpi_startall(size - 1, requests);
855     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
856     xbt_free(requests);
857   }
858 }
859
860 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
861                      MPI_Datatype datatype, MPI_Op op, int root,
862                      MPI_Comm comm)
863 {
864   int system_tag = 666;
865   int rank, size, src, index;
866   MPI_Aint lb = 0, dataext = 0;
867   MPI_Request *requests;
868   void **tmpbufs;
869
870   rank = smpi_comm_rank(comm);
871   size = smpi_comm_size(comm);
872   if(rank != root) {
873     // Send buffer to root
874     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
875   } else {
876     // FIXME: check for errors
877     smpi_datatype_extent(datatype, &lb, &dataext);
878     // Local copy from root
879     if (sendbuf && recvbuf)
880       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
881     // Receive buffers from senders
882     //TODO: make a MPI_barrier here ?
883     requests = xbt_new(MPI_Request, size - 1);
884     tmpbufs = xbt_new(void *, size - 1);
885     index = 0;
886     for(src = 0; src < size; src++) {
887       if(src != root) {
888         // FIXME: possibly overkill we we have contiguous/noncontiguous data
889         //  mapping...
890         tmpbufs[index] = xbt_malloc(count * dataext);
891         requests[index] =
892           smpi_irecv_init(tmpbufs[index], count, datatype, src,
893                           system_tag, comm);
894         index++;
895       }
896     }
897     // Wait for completion of irecv's.
898     smpi_mpi_startall(size - 1, requests);
899     for(src = 0; src < size - 1; src++) {
900       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
901       XBT_DEBUG("finished waiting any request with index %d", index);
902       if(index == MPI_UNDEFINED) {
903         break;
904       }
905       if(op) /* op can be MPI_OP_NULL that does nothing */
906         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
907     }
908     for(index = 0; index < size - 1; index++) {
909       xbt_free(tmpbufs[index]);
910     }
911     xbt_free(tmpbufs);
912     xbt_free(requests);
913   }
914 }
915
916 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
917                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
918 {
919   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
920   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
921 }
922
923 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
924                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
925 {
926   int system_tag = 666;
927   int rank, size, other, index;
928   MPI_Aint lb = 0, dataext = 0;
929   MPI_Request *requests;
930   void **tmpbufs;
931
932   rank = smpi_comm_rank(comm);
933   size = smpi_comm_size(comm);
934
935   // FIXME: check for errors
936   smpi_datatype_extent(datatype, &lb, &dataext);
937
938   // Local copy from self
939   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
940
941   // Send/Recv buffers to/from others;
942   requests = xbt_new(MPI_Request, size - 1);
943   tmpbufs = xbt_new(void *, rank);
944   index = 0;
945   for(other = 0; other < rank; other++) {
946     // FIXME: possibly overkill we we have contiguous/noncontiguous data
947     // mapping...
948     tmpbufs[index] = xbt_malloc(count * dataext);
949     requests[index] =
950       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
951                       comm);
952     index++;
953   }
954   for(other = rank + 1; other < size; other++) {
955     requests[index] =
956       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
957     index++;
958   }
959   // Wait for completion of all comms.
960   smpi_mpi_startall(size - 1, requests);
961   for(other = 0; other < size - 1; other++) {
962     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
963     if(index == MPI_UNDEFINED) {
964       break;
965     }
966     if(index < rank) {
967       // #Request is below rank: it's a irecv
968       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
969     }
970   }
971   for(index = 0; index < rank; index++) {
972     xbt_free(tmpbufs[index]);
973   }
974   xbt_free(tmpbufs);
975   xbt_free(requests);
976 }