Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
82fade3879d26e0b512cb9bf585958010da9551b
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/virtu.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
17
18
19 static int match_recv(void* a, void* b, smx_action_t ignored) {
20    MPI_Request ref = (MPI_Request)a;
21    MPI_Request req = (MPI_Request)b;
22    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
23
24   xbt_assert(ref, "Cannot match recv against null reference");
25   xbt_assert(req, "Cannot match recv against null request");
26   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
27     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
28 }
29
30 static int match_send(void* a, void* b,smx_action_t ignored) {
31    MPI_Request ref = (MPI_Request)a;
32    MPI_Request req = (MPI_Request)b;
33    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
34    xbt_assert(ref, "Cannot match send against null reference");
35    xbt_assert(req, "Cannot match send against null request");
36    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
37           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
38 }
39
40 static MPI_Request build_request(void *buf, int count,
41                                  MPI_Datatype datatype, int src, int dst,
42                                  int tag, MPI_Comm comm, unsigned flags)
43 {
44   MPI_Request request;
45
46   void *old_buf = NULL;
47
48   request = xbt_new(s_smpi_mpi_request_t, 1);
49
50   s_smpi_subtype_t *subtype = datatype->substruct;
51
52   if(datatype->has_subtype == 1){
53     // This part handles the problem of non-contiguous memory
54     old_buf = buf;
55     buf = malloc(count*smpi_datatype_size(datatype));
56     if (flags & SEND) {
57       subtype->serialize(old_buf, buf, count, datatype->substruct);
58     }
59   }
60
61   request->buf = buf;
62   // This part handles the problem of non-contiguous memory (for the
63   // unserialisation at the reception)
64   request->old_buf = old_buf;
65   request->old_type = datatype;
66
67   request->size = smpi_datatype_size(datatype) * count;
68   request->src = src;
69   request->dst = dst;
70   request->tag = tag;
71   request->comm = comm;
72   request->action = NULL;
73   request->flags = flags;
74   request->detached = 0;
75 #ifdef HAVE_TRACING
76   request->send = 0;
77   request->recv = 0;
78 #endif
79   return request;
80 }
81
82
83 void smpi_empty_status(MPI_Status * status) {
84   if(status != MPI_STATUS_IGNORE) {
85       status->MPI_SOURCE=MPI_ANY_SOURCE;
86       status->MPI_TAG=MPI_ANY_TAG;
87       status->count=0;
88   }
89 }
90
91 void smpi_action_trace_run(char *path)
92 {
93   char *name;
94   xbt_dynar_t todo;
95   xbt_dict_cursor_t cursor;
96
97   action_fp=NULL;
98   if (path) {
99     action_fp = fopen(path, "r");
100     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
101                strerror(errno));
102   }
103
104   if (!xbt_dict_is_empty(action_queues)) {
105     XBT_WARN
106       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
107
108
109     xbt_dict_foreach(action_queues, cursor, name, todo) {
110       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
111     }
112   }
113
114   if (path)
115     fclose(action_fp);
116   xbt_dict_free(&action_queues);
117   action_queues = xbt_dict_new_homogeneous(NULL);
118 }
119
120 static void smpi_mpi_request_free_voidp(void* request)
121 {
122   MPI_Request req = request;
123   smpi_mpi_request_free(&req);
124 }
125
126 /* MPI Low level calls */
127 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
128                                int dst, int tag, MPI_Comm comm)
129 {
130   MPI_Request request =
131     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132                   comm, PERSISTENT | SEND);
133
134   return request;
135 }
136
137 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
138                                int src, int tag, MPI_Comm comm)
139 {
140   MPI_Request request =
141     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
142                   comm, PERSISTENT | RECV);
143
144   return request;
145 }
146
147 void smpi_mpi_start(MPI_Request request)
148 {
149   smx_rdv_t mailbox;
150
151   xbt_assert(!request->action,
152              "Cannot (re)start a non-finished communication");
153   if(request->flags & RECV) {
154     print_request("New recv", request);
155     if (request->size < surf_cfg_get_int("smpi/async_small_thres"))
156       mailbox = smpi_process_mailbox_small();
157     else
158       mailbox = smpi_process_mailbox();
159
160     // FIXME: SIMIX does not yet support non-contiguous datatypes
161     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
162     if (request->action)request->action->comm.refcount++;
163   } else {
164
165     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
166 /*    if(receiver == MPI_UNDEFINED) {*/
167 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
168 /*      return;*/
169 /*    }*/
170     print_request("New send", request);
171     if (request->size < surf_cfg_get_int("smpi/async_small_thres")) { // eager mode
172       mailbox = smpi_process_remote_mailbox_small(receiver);
173     }else{
174       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
175       mailbox = smpi_process_remote_mailbox(receiver);
176     }
177     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
178       void *oldbuf = NULL;
179       if(request->old_type->has_subtype == 0){
180         oldbuf = request->buf;
181         request->detached = 1;
182         if (oldbuf){
183           request->buf = malloc(request->size);
184           memcpy(request->buf,oldbuf,request->size);
185         }
186       }
187       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
188     }
189
190     request->action =
191       simcall_comm_isend(mailbox, request->size, -1.0,
192                          request->buf, request->size,
193                          &match_send,
194                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
195                          request,
196                          // detach if msg size < eager/rdv switch limit
197                          request->detached);
198     if (request->action)request->action->comm.refcount++;
199
200 #ifdef HAVE_TRACING
201     /* FIXME: detached sends are not traceable (request->action == NULL) */
202     if (request->action)
203       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
204 #endif
205
206   }
207
208 }
209
210 void smpi_mpi_startall(int count, MPI_Request * requests)
211 {
212   int i;
213
214   for(i = 0; i < count; i++) {
215     smpi_mpi_start(requests[i]);
216   }
217 }
218
219 void smpi_mpi_request_free(MPI_Request * request)
220 {
221   xbt_free(*request);
222   *request = MPI_REQUEST_NULL;
223 }
224
225 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
226                             int dst, int tag, MPI_Comm comm)
227 {
228   MPI_Request request =
229     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
230                   comm, NON_PERSISTENT | SEND);
231
232   return request;
233 }
234
235 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
236                            int dst, int tag, MPI_Comm comm)
237 {
238   MPI_Request request =
239     smpi_isend_init(buf, count, datatype, dst, tag, comm);
240
241   smpi_mpi_start(request);
242   return request;
243 }
244
245 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
246                             int src, int tag, MPI_Comm comm)
247 {
248   MPI_Request request =
249     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
250                   comm, NON_PERSISTENT | RECV);
251   return request;
252 }
253
254 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
255                            int src, int tag, MPI_Comm comm)
256 {
257   MPI_Request request =
258     smpi_irecv_init(buf, count, datatype, src, tag, comm);
259
260   smpi_mpi_start(request);
261   return request;
262 }
263
264 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
265                    int tag, MPI_Comm comm, MPI_Status * status)
266 {
267   MPI_Request request;
268   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
269   smpi_mpi_wait(&request, status);
270 }
271
272
273
274 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
275                    int tag, MPI_Comm comm)
276 {
277   MPI_Request request =
278     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
279                   comm, NON_PERSISTENT | SEND | RECV_DELETE);
280
281   smpi_mpi_start(request);
282   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
283
284 }
285
286 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
287                        int dst, int sendtag, void *recvbuf, int recvcount,
288                        MPI_Datatype recvtype, int src, int recvtag,
289                        MPI_Comm comm, MPI_Status * status)
290 {
291   MPI_Request requests[2];
292   MPI_Status stats[2];
293
294   requests[0] =
295     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
296   requests[1] =
297     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
298   smpi_mpi_startall(2, requests);
299   smpi_mpi_waitall(2, requests, stats);
300   if(status != MPI_STATUS_IGNORE) {
301     // Copy receive status
302     memcpy(status, &stats[1], sizeof(MPI_Status));
303   }
304 }
305
306 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
307 {
308   return status->count / smpi_datatype_size(datatype);
309 }
310
311 static void finish_wait(MPI_Request * request, MPI_Status * status)
312 {
313   MPI_Request req = *request;
314   // if we have a sender, we should use its data, and not the data from the receive
315   //FIXME : may fail if req->action has already been freed, the pointer being invalid
316   if((req->action)&&
317      (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
318     //req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
319
320   if(status != MPI_STATUS_IGNORE) {
321     status->MPI_SOURCE = req->src;
322     status->MPI_TAG = req->tag;
323     //if((*request)->action && ((MPI_Request)SIMIX_comm_get_src_data((*request)->action))->size == (*request)->size)
324     status->MPI_ERROR = MPI_SUCCESS;
325     //else status->MPI_ERROR = MPI_ERR_TRUNCATE;
326     // this handles the case were size in receive differs from size in send
327     // FIXME: really this should just contain the count of receive-type blocks,
328     // right?
329     status->count = req->size;
330   }
331   req = *request;
332
333   print_request("Finishing", req);
334   MPI_Datatype datatype = req->old_type;
335   if(datatype->has_subtype == 1){
336       // This part handles the problem of non-contignous memory
337       // the unserialization at the reception
338     s_smpi_subtype_t *subtype = datatype->substruct;
339     if(req->flags & RECV) {
340       subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
341     }
342     if(req->detached == 0) free(req->buf);
343   }
344
345
346
347   if(req->flags & NON_PERSISTENT) {
348     if(req->flags & RECV &&
349        req->action &&
350       (req->action->state == SIMIX_DONE))
351     {
352       MPI_Request sender_request = (MPI_Request)SIMIX_comm_get_src_data(req->action);
353       if((sender_request!=MPI_REQUEST_NULL) &&
354         ( sender_request->detached ) &&
355         ( sender_request->flags & RECV_DELETE))
356       {
357         //we are in a receiver's wait from a detached send
358         //we have to clean the sender's side request here.... but only if done by a send, not an isend
359         //the request lives senderside for an isend. As detached is currently for send + isend, we use RECV_DELETE to separate them
360         //FIXME : see if just removing detached status for isend is also good
361         smpi_mpi_request_free(&sender_request);
362       }
363     }
364
365
366     if(req->action){
367       //if we want to free our request, we have to invalidate it at the other end of the comm
368
369       if(req->flags & SEND){
370         req->action->comm.src_data=MPI_REQUEST_NULL;
371       }else{
372         req->action->comm.dst_data=MPI_REQUEST_NULL;
373       }
374
375       smx_action_t temp=req->action;
376       if(req->action->comm.refcount == 1)req->action = NULL;
377       SIMIX_comm_destroy(temp);
378     }
379
380
381
382     smpi_mpi_request_free(request);
383
384
385   } else {
386     if(req->action)SIMIX_comm_destroy(req->action);
387     req->action = NULL;
388   }
389 }
390
391 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
392   int flag;
393
394   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
395   if ((*request)->action == NULL)
396     flag = 1;
397   else
398     flag = simcall_comm_test((*request)->action);
399   if(flag) {
400     finish_wait(request, status);
401   }else{
402     smpi_empty_status(status);
403   }
404   return flag;
405 }
406
407 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
408                      MPI_Status * status)
409 {
410   xbt_dynar_t comms;
411   int i, flag, size;
412   int* map;
413
414   *index = MPI_UNDEFINED;
415   flag = 0;
416   if(count > 0) {
417     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
418     map = xbt_new(int, count);
419     size = 0;
420     for(i = 0; i < count; i++) {
421       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
422          xbt_dynar_push(comms, &requests[i]->action);
423          map[size] = i;
424          size++;
425       }
426     }
427     if(size > 0) {
428       i = simcall_comm_testany(comms);
429       // not MPI_UNDEFINED, as this is a simix return code
430       if(i != -1) {
431         *index = map[i];
432         finish_wait(&requests[*index], status);
433         flag = 1;
434       }
435     }else{
436         //all requests are null or inactive, return true
437         flag=1;
438         smpi_empty_status(status);
439     }
440     xbt_free(map);
441     xbt_dynar_free(&comms);
442   }
443
444   return flag;
445 }
446
447
448 int smpi_mpi_testall(int count, MPI_Request requests[],
449                      MPI_Status status[])
450 {
451   MPI_Status stat;
452   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
453   int flag=1;
454   int i;
455   for(i=0; i<count; i++){
456     if(requests[i]!= MPI_REQUEST_NULL){
457       if (smpi_mpi_test(&requests[i], pstat)!=1){
458         flag=0;
459       }
460     }else{
461       smpi_empty_status(pstat);
462     }
463     if(status != MPI_STATUSES_IGNORE) {
464       memcpy(&status[i], pstat, sizeof(*pstat));
465     }
466   }
467   return flag;
468 }
469
470 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
471   int flag=0;
472   //FIXME find another wait to avoid busy waiting ?
473   // the issue here is that we have to wait on a nonexistent comm
474   while(flag==0){
475     smpi_mpi_iprobe(source, tag, comm, &flag, status);
476     XBT_DEBUG("Busy Waiting on probing : %d", flag);
477     if(!flag) {
478       simcall_process_sleep(0.0001);
479     }
480   }
481 }
482
483 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
484   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
485             comm, NON_PERSISTENT | RECV);
486
487   // behave like a receive, but don't do it
488   smx_rdv_t mailbox;
489
490   print_request("New iprobe", request);
491   // We have to test both mailboxes as we don't know if we will receive one one or another
492     if (surf_cfg_get_int("smpi/async_small_thres")>0){
493         mailbox = smpi_process_mailbox_small();
494         XBT_DEBUG("trying to probe the perm recv mailbox");
495         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
496     }
497     if (request->action==NULL){
498         mailbox = smpi_process_mailbox();
499         XBT_DEBUG("trying to probe the other mailbox");
500         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
501     }
502
503   if(request->action){
504     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
505     *flag = 1;
506     if(status != MPI_STATUS_IGNORE) {
507       status->MPI_SOURCE = req->src;
508       status->MPI_TAG = req->tag;
509       status->MPI_ERROR = MPI_SUCCESS;
510       status->count = req->size;
511     }
512   }
513   else *flag = 0;
514   smpi_mpi_request_free(&request);
515
516   return;
517 }
518
519 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
520 {
521   print_request("Waiting", *request);
522   if ((*request)->action != NULL) { // this is not a detached send
523     simcall_comm_wait((*request)->action, -1.0);
524     finish_wait(request, status);
525   }
526   // FIXME for a detached send, finish_wait is not called:
527 }
528
529 int smpi_mpi_waitany(int count, MPI_Request requests[],
530                      MPI_Status * status)
531 {
532   xbt_dynar_t comms;
533   int i, size, index;
534   int *map;
535
536   index = MPI_UNDEFINED;
537   if(count > 0) {
538     // Wait for a request to complete
539     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
540     map = xbt_new(int, count);
541     size = 0;
542     XBT_DEBUG("Wait for one of");
543     for(i = 0; i < count; i++) {
544       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
545         print_request("Waiting any ", requests[i]);
546         xbt_dynar_push(comms, &requests[i]->action);
547         map[size] = i;
548         size++;
549       }
550     }
551     if(size > 0) {
552       i = simcall_comm_waitany(comms);
553
554       // not MPI_UNDEFINED, as this is a simix return code
555       if (i != -1) {
556         index = map[i];
557         finish_wait(&requests[index], status);
558       }
559     }
560     xbt_free(map);
561     xbt_dynar_free(&comms);
562   }
563
564   if (index==MPI_UNDEFINED)
565     smpi_empty_status(status);
566
567   return index;
568 }
569
570 int smpi_mpi_waitall(int count, MPI_Request requests[],
571                       MPI_Status status[])
572 {
573   int  index, c;
574   MPI_Status stat;
575   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
576   int retvalue=MPI_SUCCESS;
577   //tag invalid requests in the set
578   for(c = 0; c < count; c++) {
579     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
580       if(status != MPI_STATUSES_IGNORE)
581         smpi_empty_status(&status[c]);
582     }else if(requests[c]->src == MPI_PROC_NULL ){
583       if(status != MPI_STATUSES_IGNORE) {
584         smpi_empty_status(&status[c]);
585         status[c].MPI_SOURCE=MPI_PROC_NULL;
586       }
587     }
588   }
589
590   for(c = 0; c < count; c++) {
591       if(MC_is_active()) {
592         smpi_mpi_wait(&requests[c], pstat);
593         index = c;
594       } else {
595         index = smpi_mpi_waitany(count, requests, pstat);
596         if(index == MPI_UNDEFINED) {
597           break;
598        }
599       if(status != MPI_STATUSES_IGNORE) {
600         memcpy(&status[index], pstat, sizeof(*pstat));
601         if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
602
603       }
604     }
605   }
606   return retvalue;
607 }
608
609 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
610                       MPI_Status status[])
611 {
612   int i, count, index;
613   MPI_Status stat;
614   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
615
616   count = 0;
617   for(i = 0; i < incount; i++)
618   {
619     index=smpi_mpi_waitany(incount, requests, pstat);
620     if(index!=MPI_UNDEFINED){
621       indices[count] = index;
622       count++;
623       if(status != MPI_STATUSES_IGNORE) {
624         memcpy(&status[index], pstat, sizeof(*pstat));
625       }
626     }else{
627       return MPI_UNDEFINED;
628     }
629   }
630   return count;
631 }
632
633 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
634                       MPI_Status status[])
635 {
636   int i, count, count_dead;
637   MPI_Status stat;
638   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
639
640   count = 0;
641   count_dead = 0;
642   for(i = 0; i < incount; i++) {
643     if((requests[i] != MPI_REQUEST_NULL)) {
644       if(smpi_mpi_test(&requests[i], pstat)) {
645          indices[count] = i;
646          count++;
647          if(status != MPI_STATUSES_IGNORE) {
648             memcpy(&status[i], pstat, sizeof(*pstat));
649          }
650       }
651     }else{
652       count_dead++;
653     }
654   }
655   if(count_dead==incount)return MPI_UNDEFINED;
656   else return count;
657 }
658
659 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
660                     MPI_Comm comm)
661 {
662   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
663   nary_tree_bcast(buf, count, datatype, root, comm, 4);
664 }
665
666 void smpi_mpi_barrier(MPI_Comm comm)
667 {
668   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
669   nary_tree_barrier(comm, 4);
670 }
671
672 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
673                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
674                      int root, MPI_Comm comm)
675 {
676   int system_tag = 666;
677   int rank, size, src, index;
678   MPI_Aint lb = 0, recvext = 0;
679   MPI_Request *requests;
680
681   rank = smpi_comm_rank(comm);
682   size = smpi_comm_size(comm);
683   if(rank != root) {
684     // Send buffer to root
685     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
686   } else {
687     // FIXME: check for errors
688     smpi_datatype_extent(recvtype, &lb, &recvext);
689     // Local copy from root
690     smpi_datatype_copy(sendbuf, sendcount, sendtype,
691                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
692     // Receive buffers from senders
693     requests = xbt_new(MPI_Request, size - 1);
694     index = 0;
695     for(src = 0; src < size; src++) {
696       if(src != root) {
697         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
698                                           recvcount, recvtype,
699                                           src, system_tag, comm);
700         index++;
701       }
702     }
703     // Wait for completion of irecv's.
704     smpi_mpi_startall(size - 1, requests);
705     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
706     xbt_free(requests);
707   }
708 }
709
710 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
711                       void *recvbuf, int *recvcounts, int *displs,
712                       MPI_Datatype recvtype, int root, MPI_Comm comm)
713 {
714   int system_tag = 666;
715   int rank, size, src, index;
716   MPI_Aint lb = 0, recvext = 0;
717   MPI_Request *requests;
718
719   rank = smpi_comm_rank(comm);
720   size = smpi_comm_size(comm);
721   if(rank != root) {
722     // Send buffer to root
723     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
724   } else {
725     // FIXME: check for errors
726     smpi_datatype_extent(recvtype, &lb, &recvext);
727     // Local copy from root
728     smpi_datatype_copy(sendbuf, sendcount, sendtype,
729                        (char *)recvbuf + displs[root] * recvext,
730                        recvcounts[root], recvtype);
731     // Receive buffers from senders
732     requests = xbt_new(MPI_Request, size - 1);
733     index = 0;
734     for(src = 0; src < size; src++) {
735       if(src != root) {
736         requests[index] =
737           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
738                           recvcounts[src], recvtype, src, system_tag, comm);
739         index++;
740       }
741     }
742     // Wait for completion of irecv's.
743     smpi_mpi_startall(size - 1, requests);
744     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
745     xbt_free(requests);
746   }
747 }
748
749 void smpi_mpi_allgather(void *sendbuf, int sendcount,
750                         MPI_Datatype sendtype, void *recvbuf,
751                         int recvcount, MPI_Datatype recvtype,
752                         MPI_Comm comm)
753 {
754   int system_tag = 666;
755   int rank, size, other, index;
756   MPI_Aint lb = 0, recvext = 0;
757   MPI_Request *requests;
758
759   rank = smpi_comm_rank(comm);
760   size = smpi_comm_size(comm);
761   // FIXME: check for errors
762   smpi_datatype_extent(recvtype, &lb, &recvext);
763   // Local copy from self
764   smpi_datatype_copy(sendbuf, sendcount, sendtype,
765                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
766                      recvtype);
767   // Send/Recv buffers to/from others;
768   requests = xbt_new(MPI_Request, 2 * (size - 1));
769   index = 0;
770   for(other = 0; other < size; other++) {
771     if(other != rank) {
772       requests[index] =
773         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
774                         comm);
775       index++;
776       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
777                                         recvcount, recvtype, other,
778                                         system_tag, comm);
779       index++;
780     }
781   }
782   // Wait for completion of all comms.
783   smpi_mpi_startall(2 * (size - 1), requests);
784   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
785   xbt_free(requests);
786 }
787
788 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
789                          MPI_Datatype sendtype, void *recvbuf,
790                          int *recvcounts, int *displs,
791                          MPI_Datatype recvtype, MPI_Comm comm)
792 {
793   int system_tag = 666;
794   int rank, size, other, index;
795   MPI_Aint lb = 0, recvext = 0;
796   MPI_Request *requests;
797
798   rank = smpi_comm_rank(comm);
799   size = smpi_comm_size(comm);
800   // FIXME: check for errors
801   smpi_datatype_extent(recvtype, &lb, &recvext);
802   // Local copy from self
803   smpi_datatype_copy(sendbuf, sendcount, sendtype,
804                      (char *)recvbuf + displs[rank] * recvext,
805                      recvcounts[rank], recvtype);
806   // Send buffers to others;
807   requests = xbt_new(MPI_Request, 2 * (size - 1));
808   index = 0;
809   for(other = 0; other < size; other++) {
810     if(other != rank) {
811       requests[index] =
812         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
813                         comm);
814       index++;
815       requests[index] =
816         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
817                         recvtype, other, system_tag, comm);
818       index++;
819     }
820   }
821   // Wait for completion of all comms.
822   smpi_mpi_startall(2 * (size - 1), requests);
823   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
824   xbt_free(requests);
825 }
826
827 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
828                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
829                       int root, MPI_Comm comm)
830 {
831   int system_tag = 666;
832   int rank, size, dst, index;
833   MPI_Aint lb = 0, sendext = 0;
834   MPI_Request *requests;
835
836   rank = smpi_comm_rank(comm);
837   size = smpi_comm_size(comm);
838   if(rank != root) {
839     // Recv buffer from root
840     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
841                   MPI_STATUS_IGNORE);
842   } else {
843     // FIXME: check for errors
844     smpi_datatype_extent(sendtype, &lb, &sendext);
845     // Local copy from root
846     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
847                        sendcount, sendtype, recvbuf, recvcount, recvtype);
848     // Send buffers to receivers
849     requests = xbt_new(MPI_Request, size - 1);
850     index = 0;
851     for(dst = 0; dst < size; dst++) {
852       if(dst != root) {
853         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
854                                           sendcount, sendtype, dst,
855                                           system_tag, comm);
856         index++;
857       }
858     }
859     // Wait for completion of isend's.
860     smpi_mpi_startall(size - 1, requests);
861     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
862     xbt_free(requests);
863   }
864 }
865
866 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
867                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
868                        MPI_Datatype recvtype, int root, MPI_Comm comm)
869 {
870   int system_tag = 666;
871   int rank, size, dst, index;
872   MPI_Aint lb = 0, sendext = 0;
873   MPI_Request *requests;
874
875   rank = smpi_comm_rank(comm);
876   size = smpi_comm_size(comm);
877   if(rank != root) {
878     // Recv buffer from root
879     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
880                   MPI_STATUS_IGNORE);
881   } else {
882     // FIXME: check for errors
883     smpi_datatype_extent(sendtype, &lb, &sendext);
884     // Local copy from root
885     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
886                        sendtype, recvbuf, recvcount, recvtype);
887     // Send buffers to receivers
888     requests = xbt_new(MPI_Request, size - 1);
889     index = 0;
890     for(dst = 0; dst < size; dst++) {
891       if(dst != root) {
892         requests[index] =
893           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
894                           sendtype, dst, system_tag, comm);
895         index++;
896       }
897     }
898     // Wait for completion of isend's.
899     smpi_mpi_startall(size - 1, requests);
900     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
901     xbt_free(requests);
902   }
903 }
904
905 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
906                      MPI_Datatype datatype, MPI_Op op, int root,
907                      MPI_Comm comm)
908 {
909   int system_tag = 666;
910   int rank, size, src, index;
911   MPI_Aint lb = 0, dataext = 0;
912   MPI_Request *requests;
913   void **tmpbufs;
914
915   rank = smpi_comm_rank(comm);
916   size = smpi_comm_size(comm);
917   if(rank != root) {
918     // Send buffer to root
919     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
920   } else {
921     // FIXME: check for errors
922     smpi_datatype_extent(datatype, &lb, &dataext);
923     // Local copy from root
924     if (sendbuf && recvbuf)
925       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
926     // Receive buffers from senders
927     //TODO: make a MPI_barrier here ?
928     requests = xbt_new(MPI_Request, size - 1);
929     tmpbufs = xbt_new(void *, size - 1);
930     index = 0;
931     for(src = 0; src < size; src++) {
932       if(src != root) {
933         // FIXME: possibly overkill we we have contiguous/noncontiguous data
934         //  mapping...
935         tmpbufs[index] = xbt_malloc(count * dataext);
936         requests[index] =
937           smpi_irecv_init(tmpbufs[index], count, datatype, src,
938                           system_tag, comm);
939         index++;
940       }
941     }
942     // Wait for completion of irecv's.
943     smpi_mpi_startall(size - 1, requests);
944     for(src = 0; src < size - 1; src++) {
945       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
946       XBT_DEBUG("finished waiting any request with index %d", index);
947       if(index == MPI_UNDEFINED) {
948         break;
949       }
950       if(op) /* op can be MPI_OP_NULL that does nothing */
951         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
952     }
953     for(index = 0; index < size - 1; index++) {
954       xbt_free(tmpbufs[index]);
955     }
956     xbt_free(tmpbufs);
957     xbt_free(requests);
958   }
959 }
960
961 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
962                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
963 {
964   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
965   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
966 }
967
968 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
969                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
970 {
971   int system_tag = 666;
972   int rank, size, other, index;
973   MPI_Aint lb = 0, dataext = 0;
974   MPI_Request *requests;
975   void **tmpbufs;
976
977   rank = smpi_comm_rank(comm);
978   size = smpi_comm_size(comm);
979
980   // FIXME: check for errors
981   smpi_datatype_extent(datatype, &lb, &dataext);
982
983   // Local copy from self
984   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
985
986   // Send/Recv buffers to/from others;
987   requests = xbt_new(MPI_Request, size - 1);
988   tmpbufs = xbt_new(void *, rank);
989   index = 0;
990   for(other = 0; other < rank; other++) {
991     // FIXME: possibly overkill we we have contiguous/noncontiguous data
992     // mapping...
993     tmpbufs[index] = xbt_malloc(count * dataext);
994     requests[index] =
995       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
996                       comm);
997     index++;
998   }
999   for(other = rank + 1; other < size; other++) {
1000     requests[index] =
1001       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1002     index++;
1003   }
1004   // Wait for completion of all comms.
1005   smpi_mpi_startall(size - 1, requests);
1006   for(other = 0; other < size - 1; other++) {
1007     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1008     if(index == MPI_UNDEFINED) {
1009       break;
1010     }
1011     if(index < rank) {
1012       // #Request is below rank: it's a irecv
1013       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1014     }
1015   }
1016   for(index = 0; index < rank; index++) {
1017     xbt_free(tmpbufs[index]);
1018   }
1019   xbt_free(tmpbufs);
1020   xbt_free(requests);
1021 }