Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
have waitall output an error if an issue is encountered in any comm, and avoid loopin...
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
17
18
19 static int match_recv(void* a, void* b, smx_action_t ignored) {
20    MPI_Request ref = (MPI_Request)a;
21    MPI_Request req = (MPI_Request)b;
22    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
23
24   xbt_assert(ref, "Cannot match recv against null reference");
25   xbt_assert(req, "Cannot match recv against null request");
26   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
27     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
28 }
29
30 static int match_send(void* a, void* b,smx_action_t ignored) {
31    MPI_Request ref = (MPI_Request)a;
32    MPI_Request req = (MPI_Request)b;
33    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
34    xbt_assert(ref, "Cannot match send against null reference");
35    xbt_assert(req, "Cannot match send against null request");
36    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
37           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
38 }
39
40 static MPI_Request build_request(void *buf, int count,
41                                  MPI_Datatype datatype, int src, int dst,
42                                  int tag, MPI_Comm comm, unsigned flags)
43 {
44   MPI_Request request;
45
46   void *old_buf = NULL;
47
48   request = xbt_new(s_smpi_mpi_request_t, 1);
49
50   s_smpi_subtype_t *subtype = datatype->substruct;
51
52   if(datatype->has_subtype == 1){
53     // This part handles the problem of non-contignous memory
54     old_buf = buf;
55     buf = malloc(count*smpi_datatype_size(datatype));
56     if (flags & SEND) {
57       subtype->serialize(old_buf, buf, count, datatype->substruct);
58     }
59   }
60
61   request->buf = buf;
62   // This part handles the problem of non-contignous memory (for the
63   // unserialisation at the reception)
64   request->old_buf = old_buf;
65   request->old_type = datatype;
66
67   request->size = smpi_datatype_size(datatype) * count;
68   request->src = src;
69   request->dst = dst;
70   request->tag = tag;
71   request->comm = comm;
72   request->action = NULL;
73   request->flags = flags;
74   request->detached = 0;
75 #ifdef HAVE_TRACING
76   request->send = 0;
77   request->recv = 0;
78 #endif
79   return request;
80 }
81
82
83 void smpi_empty_status(MPI_Status * status) {
84   if(status != MPI_STATUS_IGNORE) {
85       status->MPI_SOURCE=MPI_ANY_SOURCE;
86       status->MPI_TAG=MPI_ANY_TAG;
87       status->count=0;
88   }
89 }
90
91 void smpi_action_trace_run(char *path)
92 {
93   char *name;
94   xbt_dynar_t todo;
95   xbt_dict_cursor_t cursor;
96
97   action_fp=NULL;
98   if (path) {
99     action_fp = fopen(path, "r");
100     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
101                strerror(errno));
102   }
103
104   if (!xbt_dict_is_empty(action_queues)) {
105     XBT_WARN
106       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
107
108
109     xbt_dict_foreach(action_queues, cursor, name, todo) {
110       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
111     }
112   }
113
114   if (path)
115     fclose(action_fp);
116   xbt_dict_free(&action_queues);
117   action_queues = xbt_dict_new_homogeneous(NULL);
118 }
119
120 static void smpi_mpi_request_free_voidp(void* request)
121 {
122   MPI_Request req = request;
123   smpi_mpi_request_free(&req);
124 }
125
126 /* MPI Low level calls */
127 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
128                                int dst, int tag, MPI_Comm comm)
129 {
130   MPI_Request request =
131     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132                   comm, PERSISTENT | SEND);
133
134   return request;
135 }
136
137 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
138                                int src, int tag, MPI_Comm comm)
139 {
140   MPI_Request request =
141     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
142                   comm, PERSISTENT | RECV);
143
144   return request;
145 }
146
147 void smpi_mpi_start(MPI_Request request)
148 {
149   smx_rdv_t mailbox;
150
151   xbt_assert(!request->action,
152              "Cannot (re)start a non-finished communication");
153   if(request->flags & RECV) {
154     print_request("New recv", request);
155     if (request->size < surf_cfg_get_int("smpi/async_small_thres"))
156       mailbox = smpi_process_mailbox_small();
157     else
158       mailbox = smpi_process_mailbox();
159
160     // FIXME: SIMIX does not yet support non-contiguous datatypes
161     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
162   } else {
163
164     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
165 /*    if(receiver == MPI_UNDEFINED) {*/
166 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
167 /*      return;*/
168 /*    }*/
169     print_request("New send", request);
170     if (request->size < surf_cfg_get_int("smpi/async_small_thres")) { // eager mode
171       mailbox = smpi_process_remote_mailbox_small(receiver);
172     }else{
173       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
174       mailbox = smpi_process_remote_mailbox(receiver);
175     }
176     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
177       void *oldbuf = NULL;
178       if(request->old_type->has_subtype == 0){
179         oldbuf = request->buf;
180         request->detached = 1;
181         request->buf = malloc(request->size);
182         if (oldbuf)
183           memcpy(request->buf,oldbuf,request->size);
184       }
185       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
186     }
187
188     request->action =
189       simcall_comm_isend(mailbox, request->size, -1.0,
190                          request->buf, request->size,
191                          &match_send,
192                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
193                          request,
194                          // detach if msg size < eager/rdv switch limit
195                          request->detached);
196
197 #ifdef HAVE_TRACING
198     /* FIXME: detached sends are not traceable (request->action == NULL) */
199     if (request->action)
200       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
201 #endif
202
203   }
204
205 }
206
207 void smpi_mpi_startall(int count, MPI_Request * requests)
208 {
209   int i;
210
211   for(i = 0; i < count; i++) {
212     smpi_mpi_start(requests[i]);
213   }
214 }
215
216 void smpi_mpi_request_free(MPI_Request * request)
217 {
218   xbt_free(*request);
219   *request = MPI_REQUEST_NULL;
220 }
221
222 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
223                             int dst, int tag, MPI_Comm comm)
224 {
225   MPI_Request request =
226     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
227                   comm, NON_PERSISTENT | SEND);
228
229   return request;
230 }
231
232 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
233                            int dst, int tag, MPI_Comm comm)
234 {
235   MPI_Request request =
236     smpi_isend_init(buf, count, datatype, dst, tag, comm);
237
238   smpi_mpi_start(request);
239   return request;
240 }
241
242 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
243                             int src, int tag, MPI_Comm comm)
244 {
245   MPI_Request request =
246     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
247                   comm, NON_PERSISTENT | RECV);
248   return request;
249 }
250
251 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
252                            int src, int tag, MPI_Comm comm)
253 {
254   MPI_Request request =
255     smpi_irecv_init(buf, count, datatype, src, tag, comm);
256
257   smpi_mpi_start(request);
258   return request;
259 }
260
261 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
262                    int tag, MPI_Comm comm, MPI_Status * status)
263 {
264   MPI_Request request;
265   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
266   smpi_mpi_wait(&request, status);
267 }
268
269
270
271 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
272                    int tag, MPI_Comm comm)
273 {
274   MPI_Request request;
275
276   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
277   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
278 }
279
280 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
281                        int dst, int sendtag, void *recvbuf, int recvcount,
282                        MPI_Datatype recvtype, int src, int recvtag,
283                        MPI_Comm comm, MPI_Status * status)
284 {
285   MPI_Request requests[2];
286   MPI_Status stats[2];
287
288   requests[0] =
289     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
290   requests[1] =
291     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
292   smpi_mpi_startall(2, requests);
293   smpi_mpi_waitall(2, requests, stats);
294   if(status != MPI_STATUS_IGNORE) {
295     // Copy receive status
296     memcpy(status, &stats[1], sizeof(MPI_Status));
297   }
298 }
299
300 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
301 {
302   return status->count / smpi_datatype_size(datatype);
303 }
304
305 static void finish_wait(MPI_Request * request, MPI_Status * status)
306 {
307   MPI_Request req = *request;
308   // if we have a sender, we should use its data, and not the data from the receive
309   //FIXME : mail fail if req->action has already been freed, the pointer being invalid
310   if((req->action)&&
311      (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
312     req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
313
314   if(status != MPI_STATUS_IGNORE) {
315     status->MPI_SOURCE = req->src;
316     status->MPI_TAG = req->tag;
317     //if((*request)->action && ((MPI_Request)SIMIX_comm_get_src_data((*request)->action))->size == (*request)->size)
318     status->MPI_ERROR = MPI_SUCCESS;
319     //else status->MPI_ERROR = MPI_ERR_TRUNCATE;
320     // this handles the case were size in receive differs from size in send
321     // FIXME: really this should just contain the count of receive-type blocks,
322     // right?
323     status->count = req->size;
324   }
325   req = *request;
326
327   print_request("Finishing", req);
328   MPI_Datatype datatype = req->old_type;
329   if(datatype->has_subtype == 1){
330       // This part handles the problem of non-contignous memory
331       // the unserialization at the reception
332     s_smpi_subtype_t *subtype = datatype->substruct;
333     if(req->flags & RECV) {
334       subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
335     }
336     if(req->detached == 0) free(req->buf);
337   }
338
339   if(req->flags & NON_PERSISTENT) {
340     smpi_mpi_request_free(request);
341   } else {
342     req->action = NULL;
343   }
344 }
345
346 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
347   int flag;
348
349   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
350   if ((*request)->action == NULL)
351     flag = 1;
352   else
353     flag = simcall_comm_test((*request)->action);
354   if(flag) {
355     finish_wait(request, status);
356   }else{
357     smpi_empty_status(status);
358   }
359   return flag;
360 }
361
362 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
363                      MPI_Status * status)
364 {
365   xbt_dynar_t comms;
366   int i, flag, size;
367   int* map;
368
369   *index = MPI_UNDEFINED;
370   flag = 0;
371   if(count > 0) {
372     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
373     map = xbt_new(int, count);
374     size = 0;
375     for(i = 0; i < count; i++) {
376       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
377          xbt_dynar_push(comms, &requests[i]->action);
378          map[size] = i;
379          size++;
380       }
381     }
382     if(size > 0) {
383       i = simcall_comm_testany(comms);
384       // not MPI_UNDEFINED, as this is a simix return code
385       if(i != -1) {
386         *index = map[i];
387         finish_wait(&requests[*index], status);
388         flag = 1;
389       }
390     }else{
391         //all requests are null or inactive, return true
392         flag=1;
393         smpi_empty_status(status);
394     }
395     xbt_free(map);
396     xbt_dynar_free(&comms);
397   }
398
399   return flag;
400 }
401
402
403 int smpi_mpi_testall(int count, MPI_Request requests[],
404                      MPI_Status status[])
405 {
406   MPI_Status stat;
407   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
408   int flag=1;
409   int i;
410   for(i=0; i<count; i++){
411     if(requests[i]!= MPI_REQUEST_NULL){
412       if (smpi_mpi_test(&requests[i], pstat)!=1){
413         flag=0;
414       }
415     }else{
416       smpi_empty_status(pstat);
417     }
418     if(status != MPI_STATUSES_IGNORE) {
419       memcpy(&status[i], pstat, sizeof(*pstat));
420     }
421   }
422   return flag;
423 }
424
425 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
426   int flag=0;
427   //FIXME find another wait to avoid busy waiting ?
428   // the issue here is that we have to wait on a nonexistent comm
429   while(flag==0){
430     smpi_mpi_iprobe(source, tag, comm, &flag, status);
431     XBT_DEBUG("Busy Waiting on probing : %d", flag);
432     if(!flag) {
433       simcall_process_sleep(0.0001);
434     }
435   }
436 }
437
438 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
439   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
440             comm, NON_PERSISTENT | RECV);
441
442   // behave like a receive, but don't do it
443   smx_rdv_t mailbox;
444
445   print_request("New iprobe", request);
446   // We have to test both mailboxes as we don't know if we will receive one one or another
447     if (surf_cfg_get_int("smpi/async_small_thres")>0){
448         mailbox = smpi_process_mailbox_small();
449         XBT_DEBUG("trying to probe the perm recv mailbox");
450         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
451     }
452     if (request->action==NULL){
453         mailbox = smpi_process_mailbox();
454         XBT_DEBUG("trying to probe the other mailbox");
455         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
456     }
457
458   if(request->action){
459     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
460     *flag = 1;
461     if(status != MPI_STATUS_IGNORE) {
462       status->MPI_SOURCE = req->src;
463       status->MPI_TAG = req->tag;
464       if(req->size == request->size)
465         status->MPI_ERROR = MPI_SUCCESS;
466       else status->MPI_ERROR = MPI_ERR_TRUNCATE;
467       status->count = request->size;
468     }
469   }
470   else *flag = 0;
471   smpi_mpi_request_free(&request);
472
473   return;
474 }
475
476 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
477 {
478   print_request("Waiting", *request);
479   if ((*request)->action != NULL) { // this is not a detached send
480     simcall_comm_wait((*request)->action, -1.0);
481     finish_wait(request, status);
482   }
483   // FIXME for a detached send, finish_wait is not called:
484 }
485
486 int smpi_mpi_waitany(int count, MPI_Request requests[],
487                      MPI_Status * status)
488 {
489   xbt_dynar_t comms;
490   int i, size, index;
491   int *map;
492
493   index = MPI_UNDEFINED;
494   if(count > 0) {
495     // Wait for a request to complete
496     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
497     map = xbt_new(int, count);
498     size = 0;
499     XBT_DEBUG("Wait for one of");
500     for(i = 0; i < count; i++) {
501       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
502         print_request("Waiting any ", requests[i]);
503         xbt_dynar_push(comms, &requests[i]->action);
504         map[size] = i;
505         size++;
506       }
507     }
508     if(size > 0) {
509       i = simcall_comm_waitany(comms);
510
511       // not MPI_UNDEFINED, as this is a simix return code
512       if (i != -1) {
513         index = map[i];
514         finish_wait(&requests[index], status);
515       }
516     }
517     xbt_free(map);
518     xbt_dynar_free(&comms);
519   }
520
521   if (index==MPI_UNDEFINED)
522     smpi_empty_status(status);
523
524   return index;
525 }
526
527 int smpi_mpi_waitall(int count, MPI_Request requests[],
528                       MPI_Status status[])
529 {
530   int  index, c;
531   MPI_Status stat;
532   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
533   int retvalue=MPI_SUCCESS;
534   //tag invalid requests in the set
535   for(c = 0; c < count; c++) {
536     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
537       if(status != MPI_STATUSES_IGNORE)
538         smpi_empty_status(&status[c]);
539     }else if(requests[c]->src == MPI_PROC_NULL ){
540       if(status != MPI_STATUSES_IGNORE) {
541         smpi_empty_status(&status[c]);
542         status[c].MPI_SOURCE=MPI_PROC_NULL;
543       }
544     }
545   }
546
547   for(c = 0; c < count; c++) {
548       if(MC_is_active()) {
549         smpi_mpi_wait(&requests[c], pstat);
550         index = c;
551       } else {
552         index = smpi_mpi_waitany(count, requests, pstat);
553         if(index == MPI_UNDEFINED) {
554           break;
555        }
556       if(status != MPI_STATUSES_IGNORE) {
557         memcpy(&status[index], pstat, sizeof(*pstat));
558         if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
559
560       }
561     }
562   }
563   return retvalue;
564 }
565
566 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
567                       MPI_Status status[])
568 {
569   int i, count, index;
570   MPI_Status stat;
571   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
572
573   count = 0;
574   for(i = 0; i < incount; i++)
575   {
576     index=smpi_mpi_waitany(incount, requests, pstat);
577     if(index!=MPI_UNDEFINED){
578       indices[count] = index;
579       count++;
580       if(status != MPI_STATUSES_IGNORE) {
581         memcpy(&status[index], pstat, sizeof(*pstat));
582       }
583     }else{
584       return MPI_UNDEFINED;
585     }
586   }
587   return count;
588 }
589
590 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
591                       MPI_Status status[])
592 {
593   int i, count, count_dead;
594   MPI_Status stat;
595   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
596
597   count = 0;
598   count_dead = 0;
599   for(i = 0; i < incount; i++) {
600     if((requests[i] != MPI_REQUEST_NULL)) {
601       if(smpi_mpi_test(&requests[i], pstat)) {
602          indices[count] = i;
603          count++;
604          if(status != MPI_STATUSES_IGNORE) {
605             memcpy(&status[i], pstat, sizeof(*pstat));
606          }
607       }
608     }else{
609       count_dead++;
610     }
611   }
612   if(count_dead==incount)return MPI_UNDEFINED;
613   else return count;
614 }
615
616 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
617                     MPI_Comm comm)
618 {
619   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
620   nary_tree_bcast(buf, count, datatype, root, comm, 4);
621 }
622
623 void smpi_mpi_barrier(MPI_Comm comm)
624 {
625   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
626   nary_tree_barrier(comm, 4);
627 }
628
629 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
630                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
631                      int root, MPI_Comm comm)
632 {
633   int system_tag = 666;
634   int rank, size, src, index;
635   MPI_Aint lb = 0, recvext = 0;
636   MPI_Request *requests;
637
638   rank = smpi_comm_rank(comm);
639   size = smpi_comm_size(comm);
640   if(rank != root) {
641     // Send buffer to root
642     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
643   } else {
644     // FIXME: check for errors
645     smpi_datatype_extent(recvtype, &lb, &recvext);
646     // Local copy from root
647     smpi_datatype_copy(sendbuf, sendcount, sendtype,
648                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
649     // Receive buffers from senders
650     requests = xbt_new(MPI_Request, size - 1);
651     index = 0;
652     for(src = 0; src < size; src++) {
653       if(src != root) {
654         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
655                                           recvcount, recvtype,
656                                           src, system_tag, comm);
657         index++;
658       }
659     }
660     // Wait for completion of irecv's.
661     smpi_mpi_startall(size - 1, requests);
662     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
663     xbt_free(requests);
664   }
665 }
666
667 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
668                       void *recvbuf, int *recvcounts, int *displs,
669                       MPI_Datatype recvtype, int root, MPI_Comm comm)
670 {
671   int system_tag = 666;
672   int rank, size, src, index;
673   MPI_Aint lb = 0, recvext = 0;
674   MPI_Request *requests;
675
676   rank = smpi_comm_rank(comm);
677   size = smpi_comm_size(comm);
678   if(rank != root) {
679     // Send buffer to root
680     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
681   } else {
682     // FIXME: check for errors
683     smpi_datatype_extent(recvtype, &lb, &recvext);
684     // Local copy from root
685     smpi_datatype_copy(sendbuf, sendcount, sendtype,
686                        (char *)recvbuf + displs[root] * recvext,
687                        recvcounts[root], recvtype);
688     // Receive buffers from senders
689     requests = xbt_new(MPI_Request, size - 1);
690     index = 0;
691     for(src = 0; src < size; src++) {
692       if(src != root) {
693         requests[index] =
694           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
695                           recvcounts[src], recvtype, src, system_tag, comm);
696         index++;
697       }
698     }
699     // Wait for completion of irecv's.
700     smpi_mpi_startall(size - 1, requests);
701     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
702     xbt_free(requests);
703   }
704 }
705
706 void smpi_mpi_allgather(void *sendbuf, int sendcount,
707                         MPI_Datatype sendtype, void *recvbuf,
708                         int recvcount, MPI_Datatype recvtype,
709                         MPI_Comm comm)
710 {
711   int system_tag = 666;
712   int rank, size, other, index;
713   MPI_Aint lb = 0, recvext = 0;
714   MPI_Request *requests;
715
716   rank = smpi_comm_rank(comm);
717   size = smpi_comm_size(comm);
718   // FIXME: check for errors
719   smpi_datatype_extent(recvtype, &lb, &recvext);
720   // Local copy from self
721   smpi_datatype_copy(sendbuf, sendcount, sendtype,
722                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
723                      recvtype);
724   // Send/Recv buffers to/from others;
725   requests = xbt_new(MPI_Request, 2 * (size - 1));
726   index = 0;
727   for(other = 0; other < size; other++) {
728     if(other != rank) {
729       requests[index] =
730         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
731                         comm);
732       index++;
733       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
734                                         recvcount, recvtype, other,
735                                         system_tag, comm);
736       index++;
737     }
738   }
739   // Wait for completion of all comms.
740   smpi_mpi_startall(2 * (size - 1), requests);
741   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
742   xbt_free(requests);
743 }
744
745 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
746                          MPI_Datatype sendtype, void *recvbuf,
747                          int *recvcounts, int *displs,
748                          MPI_Datatype recvtype, MPI_Comm comm)
749 {
750   int system_tag = 666;
751   int rank, size, other, index;
752   MPI_Aint lb = 0, recvext = 0;
753   MPI_Request *requests;
754
755   rank = smpi_comm_rank(comm);
756   size = smpi_comm_size(comm);
757   // FIXME: check for errors
758   smpi_datatype_extent(recvtype, &lb, &recvext);
759   // Local copy from self
760   smpi_datatype_copy(sendbuf, sendcount, sendtype,
761                      (char *)recvbuf + displs[rank] * recvext,
762                      recvcounts[rank], recvtype);
763   // Send buffers to others;
764   requests = xbt_new(MPI_Request, 2 * (size - 1));
765   index = 0;
766   for(other = 0; other < size; other++) {
767     if(other != rank) {
768       requests[index] =
769         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
770                         comm);
771       index++;
772       requests[index] =
773         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
774                         recvtype, other, system_tag, comm);
775       index++;
776     }
777   }
778   // Wait for completion of all comms.
779   smpi_mpi_startall(2 * (size - 1), requests);
780   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
781   xbt_free(requests);
782 }
783
784 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
785                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
786                       int root, MPI_Comm comm)
787 {
788   int system_tag = 666;
789   int rank, size, dst, index;
790   MPI_Aint lb = 0, sendext = 0;
791   MPI_Request *requests;
792
793   rank = smpi_comm_rank(comm);
794   size = smpi_comm_size(comm);
795   if(rank != root) {
796     // Recv buffer from root
797     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
798                   MPI_STATUS_IGNORE);
799   } else {
800     // FIXME: check for errors
801     smpi_datatype_extent(sendtype, &lb, &sendext);
802     // Local copy from root
803     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
804                        sendcount, sendtype, recvbuf, recvcount, recvtype);
805     // Send buffers to receivers
806     requests = xbt_new(MPI_Request, size - 1);
807     index = 0;
808     for(dst = 0; dst < size; dst++) {
809       if(dst != root) {
810         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
811                                           sendcount, sendtype, dst,
812                                           system_tag, comm);
813         index++;
814       }
815     }
816     // Wait for completion of isend's.
817     smpi_mpi_startall(size - 1, requests);
818     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
819     xbt_free(requests);
820   }
821 }
822
823 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
824                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
825                        MPI_Datatype recvtype, int root, MPI_Comm comm)
826 {
827   int system_tag = 666;
828   int rank, size, dst, index;
829   MPI_Aint lb = 0, sendext = 0;
830   MPI_Request *requests;
831
832   rank = smpi_comm_rank(comm);
833   size = smpi_comm_size(comm);
834   if(rank != root) {
835     // Recv buffer from root
836     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
837                   MPI_STATUS_IGNORE);
838   } else {
839     // FIXME: check for errors
840     smpi_datatype_extent(sendtype, &lb, &sendext);
841     // Local copy from root
842     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
843                        sendtype, recvbuf, recvcount, recvtype);
844     // Send buffers to receivers
845     requests = xbt_new(MPI_Request, size - 1);
846     index = 0;
847     for(dst = 0; dst < size; dst++) {
848       if(dst != root) {
849         requests[index] =
850           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
851                           sendtype, dst, system_tag, comm);
852         index++;
853       }
854     }
855     // Wait for completion of isend's.
856     smpi_mpi_startall(size - 1, requests);
857     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
858     xbt_free(requests);
859   }
860 }
861
862 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
863                      MPI_Datatype datatype, MPI_Op op, int root,
864                      MPI_Comm comm)
865 {
866   int system_tag = 666;
867   int rank, size, src, index;
868   MPI_Aint lb = 0, dataext = 0;
869   MPI_Request *requests;
870   void **tmpbufs;
871
872   rank = smpi_comm_rank(comm);
873   size = smpi_comm_size(comm);
874   if(rank != root) {
875     // Send buffer to root
876     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
877   } else {
878     // FIXME: check for errors
879     smpi_datatype_extent(datatype, &lb, &dataext);
880     // Local copy from root
881     if (sendbuf && recvbuf)
882       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
883     // Receive buffers from senders
884     //TODO: make a MPI_barrier here ?
885     requests = xbt_new(MPI_Request, size - 1);
886     tmpbufs = xbt_new(void *, size - 1);
887     index = 0;
888     for(src = 0; src < size; src++) {
889       if(src != root) {
890         // FIXME: possibly overkill we we have contiguous/noncontiguous data
891         //  mapping...
892         tmpbufs[index] = xbt_malloc(count * dataext);
893         requests[index] =
894           smpi_irecv_init(tmpbufs[index], count, datatype, src,
895                           system_tag, comm);
896         index++;
897       }
898     }
899     // Wait for completion of irecv's.
900     smpi_mpi_startall(size - 1, requests);
901     for(src = 0; src < size - 1; src++) {
902       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
903       XBT_DEBUG("finished waiting any request with index %d", index);
904       if(index == MPI_UNDEFINED) {
905         break;
906       }
907       if(op) /* op can be MPI_OP_NULL that does nothing */
908         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
909     }
910     for(index = 0; index < size - 1; index++) {
911       xbt_free(tmpbufs[index]);
912     }
913     xbt_free(tmpbufs);
914     xbt_free(requests);
915   }
916 }
917
918 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
919                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
920 {
921   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
922   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
923 }
924
925 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
926                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
927 {
928   int system_tag = 666;
929   int rank, size, other, index;
930   MPI_Aint lb = 0, dataext = 0;
931   MPI_Request *requests;
932   void **tmpbufs;
933
934   rank = smpi_comm_rank(comm);
935   size = smpi_comm_size(comm);
936
937   // FIXME: check for errors
938   smpi_datatype_extent(datatype, &lb, &dataext);
939
940   // Local copy from self
941   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
942
943   // Send/Recv buffers to/from others;
944   requests = xbt_new(MPI_Request, size - 1);
945   tmpbufs = xbt_new(void *, rank);
946   index = 0;
947   for(other = 0; other < rank; other++) {
948     // FIXME: possibly overkill we we have contiguous/noncontiguous data
949     // mapping...
950     tmpbufs[index] = xbt_malloc(count * dataext);
951     requests[index] =
952       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
953                       comm);
954     index++;
955   }
956   for(other = rank + 1; other < size; other++) {
957     requests[index] =
958       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
959     index++;
960   }
961   // Wait for completion of all comms.
962   smpi_mpi_startall(size - 1, requests);
963   for(other = 0; other < size - 1; other++) {
964     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
965     if(index == MPI_UNDEFINED) {
966       break;
967     }
968     if(index < rank) {
969       // #Request is below rank: it's a irecv
970       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
971     }
972   }
973   for(index = 0; index < rank; index++) {
974     xbt_free(tmpbufs[index]);
975   }
976   xbt_free(tmpbufs);
977   xbt_free(requests);
978 }