Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into vmtrace
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
17
18
19 static int match_recv(void* a, void* b, smx_action_t ignored) {
20    MPI_Request ref = (MPI_Request)a;
21    MPI_Request req = (MPI_Request)b;
22    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
23
24   xbt_assert(ref, "Cannot match recv against null reference");
25   xbt_assert(req, "Cannot match recv against null request");
26   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
27     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
28 }
29
30 static int match_send(void* a, void* b,smx_action_t ignored) {
31    MPI_Request ref = (MPI_Request)a;
32    MPI_Request req = (MPI_Request)b;
33    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
34    xbt_assert(ref, "Cannot match send against null reference");
35    xbt_assert(req, "Cannot match send against null request");
36    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
37           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
38 }
39
40 static MPI_Request build_request(void *buf, int count,
41                                  MPI_Datatype datatype, int src, int dst,
42                                  int tag, MPI_Comm comm, unsigned flags)
43 {
44   MPI_Request request;
45
46   void *old_buf = NULL;
47
48   request = xbt_new(s_smpi_mpi_request_t, 1);
49
50   s_smpi_subtype_t *subtype = datatype->substruct;
51
52   if(datatype->has_subtype == 1){
53     // This part handles the problem of non-contignous memory
54     old_buf = buf;
55     buf = malloc(count*smpi_datatype_size(datatype));
56     if (flags & SEND) {
57       subtype->serialize(old_buf, buf, count, datatype->substruct);
58     }
59   }
60
61   request->buf = buf;
62   // This part handles the problem of non-contignous memory (for the
63   // unserialisation at the reception)
64   request->old_buf = old_buf;
65   request->old_type = datatype;
66
67   request->size = smpi_datatype_size(datatype) * count;
68   request->src = src;
69   request->dst = dst;
70   request->tag = tag;
71   request->comm = comm;
72   request->action = NULL;
73   request->flags = flags;
74   request->detached = 0;
75 #ifdef HAVE_TRACING
76   request->send = 0;
77   request->recv = 0;
78 #endif
79   return request;
80 }
81
82
83 void smpi_empty_status(MPI_Status * status) {
84   if(status != MPI_STATUS_IGNORE) {
85       status->MPI_SOURCE=MPI_ANY_SOURCE;
86       status->MPI_TAG=MPI_ANY_TAG;
87       status->count=0;
88   }
89 }
90
91 void smpi_action_trace_run(char *path)
92 {
93   char *name;
94   xbt_dynar_t todo;
95   xbt_dict_cursor_t cursor;
96
97   action_fp=NULL;
98   if (path) {
99     action_fp = fopen(path, "r");
100     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
101                strerror(errno));
102   }
103
104   if (!xbt_dict_is_empty(action_queues)) {
105     XBT_WARN
106       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
107
108
109     xbt_dict_foreach(action_queues, cursor, name, todo) {
110       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
111     }
112   }
113
114   if (path)
115     fclose(action_fp);
116   xbt_dict_free(&action_queues);
117   action_queues = xbt_dict_new_homogeneous(NULL);
118 }
119
120 static void smpi_mpi_request_free_voidp(void* request)
121 {
122   MPI_Request req = request;
123   smpi_mpi_request_free(&req);
124 }
125
126 /* MPI Low level calls */
127 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
128                                int dst, int tag, MPI_Comm comm)
129 {
130   MPI_Request request =
131     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132                   comm, PERSISTENT | SEND);
133
134   return request;
135 }
136
137 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
138                                int src, int tag, MPI_Comm comm)
139 {
140   MPI_Request request =
141     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
142                   comm, PERSISTENT | RECV);
143
144   return request;
145 }
146
147 void smpi_mpi_start(MPI_Request request)
148 {
149   smx_rdv_t mailbox;
150
151   xbt_assert(!request->action,
152              "Cannot (re)start a non-finished communication");
153   if(request->flags & RECV) {
154     print_request("New recv", request);
155     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
156       mailbox = smpi_process_mailbox_small();
157     else
158       mailbox = smpi_process_mailbox();
159
160     // FIXME: SIMIX does not yet support non-contiguous datatypes
161     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
162   } else {
163
164     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
165 /*    if(receiver == MPI_UNDEFINED) {*/
166 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
167 /*      return;*/
168 /*    }*/
169     print_request("New send", request);
170     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode
171       mailbox = smpi_process_remote_mailbox_small(receiver);
172     }else{
173       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
174       mailbox = smpi_process_remote_mailbox(receiver);
175     }
176     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
177       void *oldbuf = NULL;
178       if(request->old_type->has_subtype == 0){
179         oldbuf = request->buf;
180         request->detached = 1;
181         request->buf = malloc(request->size);
182         if (oldbuf)
183           memcpy(request->buf,oldbuf,request->size);
184       }
185       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
186     }
187
188     request->action =
189       simcall_comm_isend(mailbox, request->size, -1.0,
190                          request->buf, request->size,
191                          &match_send,
192                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
193                          request,
194                          // detach if msg size < eager/rdv switch limit
195                          request->detached);
196
197 #ifdef HAVE_TRACING
198     /* FIXME: detached sends are not traceable (request->action == NULL) */
199     if (request->action)
200       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
201 #endif
202
203   }
204
205 }
206
207 void smpi_mpi_startall(int count, MPI_Request * requests)
208 {
209   int i;
210
211   for(i = 0; i < count; i++) {
212     smpi_mpi_start(requests[i]);
213   }
214 }
215
216 void smpi_mpi_request_free(MPI_Request * request)
217 {
218   xbt_free(*request);
219   *request = MPI_REQUEST_NULL;
220 }
221
222 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
223                             int dst, int tag, MPI_Comm comm)
224 {
225   MPI_Request request =
226     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
227                   comm, NON_PERSISTENT | SEND);
228
229   return request;
230 }
231
232 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
233                            int dst, int tag, MPI_Comm comm)
234 {
235   MPI_Request request =
236     smpi_isend_init(buf, count, datatype, dst, tag, comm);
237
238   smpi_mpi_start(request);
239   return request;
240 }
241
242 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
243                             int src, int tag, MPI_Comm comm)
244 {
245   MPI_Request request =
246     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
247                   comm, NON_PERSISTENT | RECV);
248   return request;
249 }
250
251 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
252                            int src, int tag, MPI_Comm comm)
253 {
254   MPI_Request request =
255     smpi_irecv_init(buf, count, datatype, src, tag, comm);
256
257   smpi_mpi_start(request);
258   return request;
259 }
260
261 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
262                    int tag, MPI_Comm comm, MPI_Status * status)
263 {
264   MPI_Request request;
265   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
266   smpi_mpi_wait(&request, status);
267 }
268
269
270
271 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
272                    int tag, MPI_Comm comm)
273 {
274   MPI_Request request;
275
276   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
277   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
278 }
279
280 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
281                        int dst, int sendtag, void *recvbuf, int recvcount,
282                        MPI_Datatype recvtype, int src, int recvtag,
283                        MPI_Comm comm, MPI_Status * status)
284 {
285   MPI_Request requests[2];
286   MPI_Status stats[2];
287
288   requests[0] =
289     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
290   requests[1] =
291     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
292   smpi_mpi_startall(2, requests);
293   smpi_mpi_waitall(2, requests, stats);
294   if(status != MPI_STATUS_IGNORE) {
295     // Copy receive status
296     memcpy(status, &stats[1], sizeof(MPI_Status));
297   }
298 }
299
300 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
301 {
302   return status->count / smpi_datatype_size(datatype);
303 }
304
305 static void finish_wait(MPI_Request * request, MPI_Status * status)
306 {
307   MPI_Request req = *request;
308   // if we have a sender, we should use its data, and not the data from the receive
309   if((req->action)&&
310      (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
311     req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
312
313   if(status != MPI_STATUS_IGNORE) {
314     status->MPI_SOURCE = req->src;
315     status->MPI_TAG = req->tag;
316     status->MPI_ERROR = MPI_SUCCESS;
317     // FIXME: really this should just contain the count of receive-type blocks,
318     // right?
319     status->count = req->size;
320   }
321   req = *request;
322
323   print_request("Finishing", req);
324   MPI_Datatype datatype = req->old_type;
325   if(datatype->has_subtype == 1){
326       // This part handles the problem of non-contignous memory
327       // the unserialization at the reception
328     s_smpi_subtype_t *subtype = datatype->substruct;
329     if(req->flags & RECV) {
330       subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
331     }
332     if(req->detached == 0) free(req->buf);
333   }
334
335   if(req->flags & NON_PERSISTENT) {
336     smpi_mpi_request_free(request);
337   } else {
338     req->action = NULL;
339   }
340 }
341
342 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
343   int flag;
344
345   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
346   if ((*request)->action == NULL)
347     flag = 1;
348   else
349     flag = simcall_comm_test((*request)->action);
350   if(flag) {
351     finish_wait(request, status);
352   }else{
353     smpi_empty_status(status);
354   }
355   return flag;
356 }
357
358 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
359                      MPI_Status * status)
360 {
361   xbt_dynar_t comms;
362   int i, flag, size;
363   int* map;
364
365   *index = MPI_UNDEFINED;
366   flag = 0;
367   if(count > 0) {
368     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
369     map = xbt_new(int, count);
370     size = 0;
371     for(i = 0; i < count; i++) {
372       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
373          xbt_dynar_push(comms, &requests[i]->action);
374          map[size] = i;
375          size++;
376       }
377     }
378     if(size > 0) {
379       i = simcall_comm_testany(comms);
380       // not MPI_UNDEFINED, as this is a simix return code
381       if(i != -1) {
382         *index = map[i];
383         finish_wait(&requests[*index], status);
384         flag = 1;
385       }
386     }else{
387         //all requests are null or inactive, return true
388         flag=1;
389         smpi_empty_status(status);
390     }
391     xbt_free(map);
392     xbt_dynar_free(&comms);
393   }
394
395   return flag;
396 }
397
398
399 int smpi_mpi_testall(int count, MPI_Request requests[],
400                      MPI_Status status[])
401 {
402   MPI_Status stat;
403   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
404   int flag=1;
405   int i;
406   for(i=0; i<count; i++){
407     if(requests[i]!= MPI_REQUEST_NULL){
408       if (smpi_mpi_test(&requests[i], pstat)!=1){
409         flag=0;
410       }
411     }else{
412       smpi_empty_status(pstat);
413     }
414     if(status != MPI_STATUSES_IGNORE) {
415       memcpy(&status[i], pstat, sizeof(*pstat));
416     }
417   }
418   return flag;
419 }
420
421 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
422   int flag=0;
423   //FIXME find another wait to avoid busy waiting ?
424   // the issue here is that we have to wait on a nonexistent comm
425   while(flag==0){
426     smpi_mpi_iprobe(source, tag, comm, &flag, status);
427     XBT_DEBUG("Busy Waiting on probing : %d", flag);
428     if(!flag) {
429       simcall_process_sleep(0.0001);
430     }
431   }
432 }
433
434 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
435   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
436             comm, NON_PERSISTENT | RECV);
437
438   // behave like a receive, but don't do it
439   smx_rdv_t mailbox;
440
441   print_request("New iprobe", request);
442   // We have to test both mailboxes as we don't know if we will receive one one or another
443     if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
444         mailbox = smpi_process_mailbox_small();
445         XBT_DEBUG("trying to probe the perm recv mailbox");
446         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
447     }
448     if (request->action==NULL){
449         mailbox = smpi_process_mailbox();
450         XBT_DEBUG("trying to probe the other mailbox");
451         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
452     }
453
454   if(request->action){
455     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
456     *flag = 1;
457     if(status != MPI_STATUS_IGNORE) {
458       status->MPI_SOURCE = req->src;
459       status->MPI_TAG = req->tag;
460       status->MPI_ERROR = MPI_SUCCESS;
461       status->count = req->size;
462     }
463   }
464   else *flag = 0;
465   smpi_mpi_request_free(&request);
466
467   return;
468 }
469
470 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
471 {
472   print_request("Waiting", *request);
473   if ((*request)->action != NULL) { // this is not a detached send
474     simcall_comm_wait((*request)->action, -1.0);
475     finish_wait(request, status);
476   }
477   // FIXME for a detached send, finish_wait is not called:
478 }
479
480 int smpi_mpi_waitany(int count, MPI_Request requests[],
481                      MPI_Status * status)
482 {
483   xbt_dynar_t comms;
484   int i, size, index;
485   int *map;
486
487   index = MPI_UNDEFINED;
488   if(count > 0) {
489     // Wait for a request to complete
490     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
491     map = xbt_new(int, count);
492     size = 0;
493     XBT_DEBUG("Wait for one of");
494     for(i = 0; i < count; i++) {
495       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
496         print_request("Waiting any ", requests[i]);
497         xbt_dynar_push(comms, &requests[i]->action);
498         map[size] = i;
499         size++;
500       }
501     }
502     if(size > 0) {
503       i = simcall_comm_waitany(comms);
504
505       // not MPI_UNDEFINED, as this is a simix return code
506       if (i != -1) {
507         index = map[i];
508         finish_wait(&requests[index], status);
509       }
510     }
511     xbt_free(map);
512     xbt_dynar_free(&comms);
513   }
514
515   if (index==MPI_UNDEFINED)
516     smpi_empty_status(status);
517
518   return index;
519 }
520
521 void smpi_mpi_waitall(int count, MPI_Request requests[],
522                       MPI_Status status[])
523 {
524   int  index, c;
525   MPI_Status stat;
526   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
527   //tag invalid requests in the set
528   for(c = 0; c < count; c++) {
529     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
530       if(status != MPI_STATUSES_IGNORE)
531         smpi_empty_status(&status[c]);
532     }else if(requests[c]->src == MPI_PROC_NULL ){
533       if(status != MPI_STATUSES_IGNORE) {
534         smpi_empty_status(&status[c]);
535         status[c].MPI_SOURCE=MPI_PROC_NULL;
536       }
537     }
538   }
539
540   for(c = 0; c < count; c++) {
541       if(MC_is_active()) {
542         smpi_mpi_wait(&requests[c], pstat);
543         index = c;
544       } else {
545         index = smpi_mpi_waitany(count, requests, pstat);
546         if(index == MPI_UNDEFINED) {
547           break;
548        }
549       if(status != MPI_STATUSES_IGNORE) {
550         memcpy(&status[index], pstat, sizeof(*pstat));
551
552       }
553     }
554   }
555 }
556
557 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
558                       MPI_Status status[])
559 {
560   int i, count, index;
561   MPI_Status stat;
562   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
563
564   count = 0;
565   for(i = 0; i < incount; i++)
566   {
567     index=smpi_mpi_waitany(incount, requests, pstat);
568     if(index!=MPI_UNDEFINED){
569       indices[count] = index;
570       count++;
571       if(status != MPI_STATUSES_IGNORE) {
572         memcpy(&status[index], pstat, sizeof(*pstat));
573       }
574     }else{
575       return MPI_UNDEFINED;
576     }
577   }
578   return count;
579 }
580
581 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
582                       MPI_Status status[])
583 {
584   int i, count, count_dead;
585   MPI_Status stat;
586   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
587
588   count = 0;
589   count_dead = 0;
590   for(i = 0; i < incount; i++) {
591     if((requests[i] != MPI_REQUEST_NULL)) {
592       if(smpi_mpi_test(&requests[i], pstat)) {
593          indices[count] = i;
594          count++;
595          if(status != MPI_STATUSES_IGNORE) {
596             memcpy(&status[i], pstat, sizeof(*pstat));
597          }
598       }
599     }else{
600       count_dead++;
601     }
602   }
603   if(count_dead==incount)return MPI_UNDEFINED;
604   else return count;
605 }
606
607 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
608                     MPI_Comm comm)
609 {
610   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
611   nary_tree_bcast(buf, count, datatype, root, comm, 4);
612 }
613
614 void smpi_mpi_barrier(MPI_Comm comm)
615 {
616   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
617   nary_tree_barrier(comm, 4);
618 }
619
620 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
621                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
622                      int root, MPI_Comm comm)
623 {
624   int system_tag = 666;
625   int rank, size, src, index;
626   MPI_Aint lb = 0, recvext = 0;
627   MPI_Request *requests;
628
629   rank = smpi_comm_rank(comm);
630   size = smpi_comm_size(comm);
631   if(rank != root) {
632     // Send buffer to root
633     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
634   } else {
635     // FIXME: check for errors
636     smpi_datatype_extent(recvtype, &lb, &recvext);
637     // Local copy from root
638     smpi_datatype_copy(sendbuf, sendcount, sendtype,
639                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
640     // Receive buffers from senders
641     requests = xbt_new(MPI_Request, size - 1);
642     index = 0;
643     for(src = 0; src < size; src++) {
644       if(src != root) {
645         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
646                                           recvcount, recvtype,
647                                           src, system_tag, comm);
648         index++;
649       }
650     }
651     // Wait for completion of irecv's.
652     smpi_mpi_startall(size - 1, requests);
653     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
654     xbt_free(requests);
655   }
656 }
657
658 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
659                       void *recvbuf, int *recvcounts, int *displs,
660                       MPI_Datatype recvtype, int root, MPI_Comm comm)
661 {
662   int system_tag = 666;
663   int rank, size, src, index;
664   MPI_Aint lb = 0, recvext = 0;
665   MPI_Request *requests;
666
667   rank = smpi_comm_rank(comm);
668   size = smpi_comm_size(comm);
669   if(rank != root) {
670     // Send buffer to root
671     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
672   } else {
673     // FIXME: check for errors
674     smpi_datatype_extent(recvtype, &lb, &recvext);
675     // Local copy from root
676     smpi_datatype_copy(sendbuf, sendcount, sendtype,
677                        (char *)recvbuf + displs[root] * recvext,
678                        recvcounts[root], recvtype);
679     // Receive buffers from senders
680     requests = xbt_new(MPI_Request, size - 1);
681     index = 0;
682     for(src = 0; src < size; src++) {
683       if(src != root) {
684         requests[index] =
685           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
686                           recvcounts[src], recvtype, src, system_tag, comm);
687         index++;
688       }
689     }
690     // Wait for completion of irecv's.
691     smpi_mpi_startall(size - 1, requests);
692     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
693     xbt_free(requests);
694   }
695 }
696
697 void smpi_mpi_allgather(void *sendbuf, int sendcount,
698                         MPI_Datatype sendtype, void *recvbuf,
699                         int recvcount, MPI_Datatype recvtype,
700                         MPI_Comm comm)
701 {
702   int system_tag = 666;
703   int rank, size, other, index;
704   MPI_Aint lb = 0, recvext = 0;
705   MPI_Request *requests;
706
707   rank = smpi_comm_rank(comm);
708   size = smpi_comm_size(comm);
709   // FIXME: check for errors
710   smpi_datatype_extent(recvtype, &lb, &recvext);
711   // Local copy from self
712   smpi_datatype_copy(sendbuf, sendcount, sendtype,
713                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
714                      recvtype);
715   // Send/Recv buffers to/from others;
716   requests = xbt_new(MPI_Request, 2 * (size - 1));
717   index = 0;
718   for(other = 0; other < size; other++) {
719     if(other != rank) {
720       requests[index] =
721         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
722                         comm);
723       index++;
724       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
725                                         recvcount, recvtype, other,
726                                         system_tag, comm);
727       index++;
728     }
729   }
730   // Wait for completion of all comms.
731   smpi_mpi_startall(2 * (size - 1), requests);
732   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
733   xbt_free(requests);
734 }
735
736 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
737                          MPI_Datatype sendtype, void *recvbuf,
738                          int *recvcounts, int *displs,
739                          MPI_Datatype recvtype, MPI_Comm comm)
740 {
741   int system_tag = 666;
742   int rank, size, other, index;
743   MPI_Aint lb = 0, recvext = 0;
744   MPI_Request *requests;
745
746   rank = smpi_comm_rank(comm);
747   size = smpi_comm_size(comm);
748   // FIXME: check for errors
749   smpi_datatype_extent(recvtype, &lb, &recvext);
750   // Local copy from self
751   smpi_datatype_copy(sendbuf, sendcount, sendtype,
752                      (char *)recvbuf + displs[rank] * recvext,
753                      recvcounts[rank], recvtype);
754   // Send buffers to others;
755   requests = xbt_new(MPI_Request, 2 * (size - 1));
756   index = 0;
757   for(other = 0; other < size; other++) {
758     if(other != rank) {
759       requests[index] =
760         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
761                         comm);
762       index++;
763       requests[index] =
764         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
765                         recvtype, other, system_tag, comm);
766       index++;
767     }
768   }
769   // Wait for completion of all comms.
770   smpi_mpi_startall(2 * (size - 1), requests);
771   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
772   xbt_free(requests);
773 }
774
775 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
776                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
777                       int root, MPI_Comm comm)
778 {
779   int system_tag = 666;
780   int rank, size, dst, index;
781   MPI_Aint lb = 0, sendext = 0;
782   MPI_Request *requests;
783
784   rank = smpi_comm_rank(comm);
785   size = smpi_comm_size(comm);
786   if(rank != root) {
787     // Recv buffer from root
788     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
789                   MPI_STATUS_IGNORE);
790   } else {
791     // FIXME: check for errors
792     smpi_datatype_extent(sendtype, &lb, &sendext);
793     // Local copy from root
794     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
795                        sendcount, sendtype, recvbuf, recvcount, recvtype);
796     // Send buffers to receivers
797     requests = xbt_new(MPI_Request, size - 1);
798     index = 0;
799     for(dst = 0; dst < size; dst++) {
800       if(dst != root) {
801         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
802                                           sendcount, sendtype, dst,
803                                           system_tag, comm);
804         index++;
805       }
806     }
807     // Wait for completion of isend's.
808     smpi_mpi_startall(size - 1, requests);
809     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
810     xbt_free(requests);
811   }
812 }
813
814 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
815                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
816                        MPI_Datatype recvtype, int root, MPI_Comm comm)
817 {
818   int system_tag = 666;
819   int rank, size, dst, index;
820   MPI_Aint lb = 0, sendext = 0;
821   MPI_Request *requests;
822
823   rank = smpi_comm_rank(comm);
824   size = smpi_comm_size(comm);
825   if(rank != root) {
826     // Recv buffer from root
827     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
828                   MPI_STATUS_IGNORE);
829   } else {
830     // FIXME: check for errors
831     smpi_datatype_extent(sendtype, &lb, &sendext);
832     // Local copy from root
833     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
834                        sendtype, recvbuf, recvcount, recvtype);
835     // Send buffers to receivers
836     requests = xbt_new(MPI_Request, size - 1);
837     index = 0;
838     for(dst = 0; dst < size; dst++) {
839       if(dst != root) {
840         requests[index] =
841           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
842                           sendtype, dst, system_tag, comm);
843         index++;
844       }
845     }
846     // Wait for completion of isend's.
847     smpi_mpi_startall(size - 1, requests);
848     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
849     xbt_free(requests);
850   }
851 }
852
853 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
854                      MPI_Datatype datatype, MPI_Op op, int root,
855                      MPI_Comm comm)
856 {
857   int system_tag = 666;
858   int rank, size, src, index;
859   MPI_Aint lb = 0, dataext = 0;
860   MPI_Request *requests;
861   void **tmpbufs;
862
863   rank = smpi_comm_rank(comm);
864   size = smpi_comm_size(comm);
865   if(rank != root) {
866     // Send buffer to root
867     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
868   } else {
869     // FIXME: check for errors
870     smpi_datatype_extent(datatype, &lb, &dataext);
871     // Local copy from root
872     if (sendbuf && recvbuf)
873       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
874     // Receive buffers from senders
875     //TODO: make a MPI_barrier here ?
876     requests = xbt_new(MPI_Request, size - 1);
877     tmpbufs = xbt_new(void *, size - 1);
878     index = 0;
879     for(src = 0; src < size; src++) {
880       if(src != root) {
881         // FIXME: possibly overkill we we have contiguous/noncontiguous data
882         //  mapping...
883         tmpbufs[index] = xbt_malloc(count * dataext);
884         requests[index] =
885           smpi_irecv_init(tmpbufs[index], count, datatype, src,
886                           system_tag, comm);
887         index++;
888       }
889     }
890     // Wait for completion of irecv's.
891     smpi_mpi_startall(size - 1, requests);
892     for(src = 0; src < size - 1; src++) {
893       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
894       XBT_DEBUG("finished waiting any request with index %d", index);
895       if(index == MPI_UNDEFINED) {
896         break;
897       }
898       if(op) /* op can be MPI_OP_NULL that does nothing */
899         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
900     }
901     for(index = 0; index < size - 1; index++) {
902       xbt_free(tmpbufs[index]);
903     }
904     xbt_free(tmpbufs);
905     xbt_free(requests);
906   }
907 }
908
909 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
910                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
911 {
912   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
913   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
914 }
915
916 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
917                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
918 {
919   int system_tag = 666;
920   int rank, size, other, index;
921   MPI_Aint lb = 0, dataext = 0;
922   MPI_Request *requests;
923   void **tmpbufs;
924
925   rank = smpi_comm_rank(comm);
926   size = smpi_comm_size(comm);
927
928   // FIXME: check for errors
929   smpi_datatype_extent(datatype, &lb, &dataext);
930
931   // Local copy from self
932   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
933
934   // Send/Recv buffers to/from others;
935   requests = xbt_new(MPI_Request, size - 1);
936   tmpbufs = xbt_new(void *, rank);
937   index = 0;
938   for(other = 0; other < rank; other++) {
939     // FIXME: possibly overkill we we have contiguous/noncontiguous data
940     // mapping...
941     tmpbufs[index] = xbt_malloc(count * dataext);
942     requests[index] =
943       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
944                       comm);
945     index++;
946   }
947   for(other = rank + 1; other < size; other++) {
948     requests[index] =
949       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
950     index++;
951   }
952   // Wait for completion of all comms.
953   smpi_mpi_startall(size - 1, requests);
954   for(other = 0; other < size - 1; other++) {
955     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
956     if(index == MPI_UNDEFINED) {
957       break;
958     }
959     if(index < rank) {
960       // #Request is below rank: it's a irecv
961       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
962     }
963   }
964   for(index = 0; index < rank; index++) {
965     xbt_free(tmpbufs[index]);
966   }
967   xbt_free(tmpbufs);
968   xbt_free(requests);
969 }