Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix errors about uninitialized variables.
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "surf/surf.h"
13
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
16
17
18 static int match_recv(void* a, void* b, smx_action_t ignored) {
19    MPI_Request ref = (MPI_Request)a;
20    MPI_Request req = (MPI_Request)b;
21    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
22
23   xbt_assert(ref, "Cannot match recv against null reference");
24   xbt_assert(req, "Cannot match recv against null request");
25   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
26     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
27 }
28
29 static int match_send(void* a, void* b,smx_action_t ignored) {
30    MPI_Request ref = (MPI_Request)a;
31    MPI_Request req = (MPI_Request)b;
32    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
33    xbt_assert(ref, "Cannot match send against null reference");
34    xbt_assert(req, "Cannot match send against null request");
35    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
36           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
37 }
38
39 static MPI_Request build_request(void *buf, int count,
40                                  MPI_Datatype datatype, int src, int dst,
41                                  int tag, MPI_Comm comm, unsigned flags)
42 {
43   MPI_Request request;
44
45   void *old_buf = NULL;
46
47   request = xbt_new(s_smpi_mpi_request_t, 1);
48
49   s_smpi_subtype_t *subtype = datatype->substruct;
50
51   if(datatype->has_subtype == 1){
52     // This part handles the problem of non-contignous memory
53     old_buf = buf;
54     buf = malloc(count*smpi_datatype_size(datatype));
55     if (flags & SEND) {
56       subtype->serialize(old_buf, buf, count, datatype->substruct);
57     }
58   }
59
60   request->buf = buf;
61   // This part handles the problem of non-contignous memory (for the
62   // unserialisation at the reception)
63   request->old_buf = old_buf;
64   request->old_type = datatype;
65
66   request->size = smpi_datatype_size(datatype) * count;
67   request->src = src;
68   request->dst = dst;
69   request->tag = tag;
70   request->comm = comm;
71   request->action = NULL;
72   request->flags = flags;
73 #ifdef HAVE_TRACING
74   request->send = 0;
75   request->recv = 0;
76 #endif
77   return request;
78 }
79
80
81 void smpi_empty_status(MPI_Status * status) {
82   if(status != MPI_STATUS_IGNORE) {
83       status->MPI_SOURCE=MPI_ANY_SOURCE;
84       status->MPI_TAG=MPI_ANY_TAG;
85       status->count=0;
86   }
87 }
88
89 void smpi_action_trace_run(char *path)
90 {
91   char *name;
92   xbt_dynar_t todo;
93   xbt_dict_cursor_t cursor;
94
95   action_fp=NULL;
96   if (path) {
97     action_fp = fopen(path, "r");
98     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
99                strerror(errno));
100   }
101
102   if (!xbt_dict_is_empty(action_queues)) {
103     XBT_WARN
104       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
105
106
107     xbt_dict_foreach(action_queues, cursor, name, todo) {
108       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
109     }
110   }
111
112   if (path)
113     fclose(action_fp);
114   xbt_dict_free(&action_queues);
115   action_queues = xbt_dict_new_homogeneous(NULL);
116 }
117
118 static void smpi_mpi_request_free_voidp(void* request)
119 {
120   MPI_Request req = request;
121   smpi_mpi_request_free(&req);
122 }
123
124 /* MPI Low level calls */
125 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
126                                int dst, int tag, MPI_Comm comm)
127 {
128   MPI_Request request =
129     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
130                   comm, PERSISTENT | SEND);
131
132   return request;
133 }
134
135 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
136                                int src, int tag, MPI_Comm comm)
137 {
138   MPI_Request request =
139     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
140                   comm, PERSISTENT | RECV);
141
142   return request;
143 }
144
145 void smpi_mpi_start(MPI_Request request)
146 {
147   smx_rdv_t mailbox;
148   int detached = 0;
149
150   xbt_assert(!request->action,
151              "Cannot (re)start a non-finished communication");
152   if(request->flags & RECV) {
153     print_request("New recv", request);
154     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
155       mailbox = smpi_process_mailbox_small();
156     else
157       mailbox = smpi_process_mailbox();
158
159     // FIXME: SIMIX does not yet support non-contiguous datatypes
160     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
161   } else {
162
163     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
164     if(receiver == MPI_UNDEFINED) {
165       XBT_WARN("Trying to send a message to a wrong rank");
166       return;
167     }
168     print_request("New send", request);
169     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode
170       mailbox = smpi_process_remote_mailbox_small(receiver);
171     }else{
172       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
173       mailbox = smpi_process_remote_mailbox(receiver);
174     }
175     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
176       void *oldbuf = request->buf;
177       detached = 1;
178       request->buf = malloc(request->size);
179       if (oldbuf)
180         memcpy(request->buf,oldbuf,request->size);
181       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
182     }
183
184     request->action =
185       simcall_comm_isend(mailbox, request->size, -1.0,
186                          request->buf, request->size,
187                          &match_send,
188                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
189                          request,
190                          // detach if msg size < eager/rdv switch limit
191                          detached);
192
193 #ifdef HAVE_TRACING
194     /* FIXME: detached sends are not traceable (request->action == NULL) */
195     if (request->action)
196       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
197 #endif
198
199   }
200
201 }
202
203 void smpi_mpi_startall(int count, MPI_Request * requests)
204 {
205   int i;
206
207   for(i = 0; i < count; i++) {
208     smpi_mpi_start(requests[i]);
209   }
210 }
211
212 void smpi_mpi_request_free(MPI_Request * request)
213 {
214   xbt_free(*request);
215   *request = MPI_REQUEST_NULL;
216 }
217
218 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
219                             int dst, int tag, MPI_Comm comm)
220 {
221   MPI_Request request =
222     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
223                   comm, NON_PERSISTENT | SEND);
224
225   return request;
226 }
227
228 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
229                            int dst, int tag, MPI_Comm comm)
230 {
231   MPI_Request request =
232     smpi_isend_init(buf, count, datatype, dst, tag, comm);
233
234   smpi_mpi_start(request);
235   return request;
236 }
237
238 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
239                             int src, int tag, MPI_Comm comm)
240 {
241   MPI_Request request =
242     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
243                   comm, NON_PERSISTENT | RECV);
244   return request;
245 }
246
247 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
248                            int src, int tag, MPI_Comm comm)
249 {
250   MPI_Request request =
251     smpi_irecv_init(buf, count, datatype, src, tag, comm);
252
253   smpi_mpi_start(request);
254   return request;
255 }
256
257 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
258                    int tag, MPI_Comm comm, MPI_Status * status)
259 {
260   MPI_Request request;
261   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
262   smpi_mpi_wait(&request, status);
263 }
264
265
266
267 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
268                    int tag, MPI_Comm comm)
269 {
270   MPI_Request request;
271
272   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
273   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
274 }
275
276 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
277                        int dst, int sendtag, void *recvbuf, int recvcount,
278                        MPI_Datatype recvtype, int src, int recvtag,
279                        MPI_Comm comm, MPI_Status * status)
280 {
281   MPI_Request requests[2];
282   MPI_Status stats[2];
283
284   requests[0] =
285     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
286   requests[1] =
287     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
288   smpi_mpi_startall(2, requests);
289   smpi_mpi_waitall(2, requests, stats);
290   if(status != MPI_STATUS_IGNORE) {
291     // Copy receive status
292     memcpy(status, &stats[1], sizeof(MPI_Status));
293   }
294 }
295
296 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
297 {
298   return status->count / smpi_datatype_size(datatype);
299 }
300
301 static void finish_wait(MPI_Request * request, MPI_Status * status)
302 {
303   MPI_Request req = *request;
304   // if we have a sender, we should use its data, and not the data from the receive
305   if((req->action)&&
306      (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
307     req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
308
309   if(status != MPI_STATUS_IGNORE) {
310     status->MPI_SOURCE = req->src;
311     status->MPI_TAG = req->tag;
312     status->MPI_ERROR = MPI_SUCCESS;
313     // FIXME: really this should just contain the count of receive-type blocks,
314     // right?
315     status->count = req->size;
316   }
317   req = *request;
318
319   print_request("Finishing", req);
320   MPI_Datatype datatype = req->old_type;
321   if(datatype->has_subtype == 1){
322       // This part handles the problem of non-contignous memory
323       // the unserialization at the reception
324     s_smpi_subtype_t *subtype = datatype->substruct;
325     if(req->flags & RECV) {
326       subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
327     }
328     //FIXME: I am not sure that if the send is detached we have to free
329     //the sender buffer thus I do it only for the reciever
330     if(req->flags & RECV) free(req->buf);
331   }
332
333   if(req->flags & NON_PERSISTENT) {
334     smpi_mpi_request_free(request);
335   } else {
336     req->action = NULL;
337   }
338 }
339
340 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
341   int flag;
342
343   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
344   if ((*request)->action == NULL)
345     flag = 1;
346   else
347     flag = simcall_comm_test((*request)->action);
348   if(flag) {
349     finish_wait(request, status);
350   }else{
351     smpi_empty_status(status);
352   }
353   return flag;
354 }
355
356 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
357                      MPI_Status * status)
358 {
359   xbt_dynar_t comms;
360   int i, flag, size;
361   int* map;
362
363   *index = MPI_UNDEFINED;
364   flag = 0;
365   if(count > 0) {
366     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
367     map = xbt_new(int, count);
368     size = 0;
369     for(i = 0; i < count; i++) {
370       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
371          xbt_dynar_push(comms, &requests[i]->action);
372          map[size] = i;
373          size++;
374       }
375     }
376     if(size > 0) {
377       i = simcall_comm_testany(comms);
378       // not MPI_UNDEFINED, as this is a simix return code
379       if(i != -1) {
380         *index = map[i];
381         finish_wait(&requests[*index], status);
382         flag = 1;
383       }
384     }else{
385         //all requests are null or inactive, return true
386         flag=1;
387         smpi_empty_status(status);
388     }
389     xbt_free(map);
390     xbt_dynar_free(&comms);
391   }
392
393   return flag;
394 }
395
396
397 int smpi_mpi_testall(int count, MPI_Request requests[],
398                      MPI_Status status[])
399 {
400   MPI_Status stat;
401   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
402   int flag=1;
403   int i;
404   for(i=0; i<count; i++){
405     if(requests[i]!= MPI_REQUEST_NULL){
406       if (smpi_mpi_test(&requests[i], pstat)!=1){
407         flag=0;
408       }
409     }else{
410       smpi_empty_status(pstat);
411     }
412     if(status != MPI_STATUSES_IGNORE) {
413       memcpy(&status[i], pstat, sizeof(*pstat));
414     }
415   }
416   return flag;
417 }
418
419 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
420   int flag=0;
421   //FIXME find another wait to avoid busy waiting ?
422   // the issue here is that we have to wait on a nonexistent comm
423   while(flag==0){
424     smpi_mpi_iprobe(source, tag, comm, &flag, status);
425     XBT_DEBUG("Busy Waiting on probing : %d", flag);
426     if(!flag) {
427       simcall_process_sleep(0.0001);
428     }
429   }
430 }
431
432 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
433   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
434             comm, NON_PERSISTENT | RECV);
435
436   // behave like a receive, but don't do it
437   smx_rdv_t mailbox;
438
439   print_request("New iprobe", request);
440   // We have to test both mailboxes as we don't know if we will receive one one or another
441     if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
442         mailbox = smpi_process_mailbox_small();
443         XBT_DEBUG("trying to probe the perm recv mailbox");
444         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
445     }
446     if (request->action==NULL){
447         mailbox = smpi_process_mailbox();
448         XBT_DEBUG("trying to probe the other mailbox");
449         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
450     }
451
452   if(request->action){
453     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
454     *flag=true;
455     if(status != MPI_STATUS_IGNORE) {
456       status->MPI_SOURCE = req->src;
457       status->MPI_TAG = req->tag;
458       status->MPI_ERROR = MPI_SUCCESS;
459       status->count = req->size;
460     }
461   }
462   else *flag=false;
463   smpi_mpi_request_free(&request);
464
465   return;
466 }
467
468 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
469 {
470   print_request("Waiting", *request);
471   if ((*request)->action != NULL) { // this is not a detached send
472     simcall_comm_wait((*request)->action, -1.0);
473     finish_wait(request, status);
474   }
475   // FIXME for a detached send, finish_wait is not called:
476 }
477
478 int smpi_mpi_waitany(int count, MPI_Request requests[],
479                      MPI_Status * status)
480 {
481   xbt_dynar_t comms;
482   int i, size, index;
483   int *map;
484
485   index = MPI_UNDEFINED;
486   if(count > 0) {
487     // Wait for a request to complete
488     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
489     map = xbt_new(int, count);
490     size = 0;
491     XBT_DEBUG("Wait for one of");
492     for(i = 0; i < count; i++) {
493       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
494         print_request("Waiting any ", requests[i]);
495         xbt_dynar_push(comms, &requests[i]->action);
496         map[size] = i;
497         size++;
498       }
499     }
500     if(size > 0) {
501       i = simcall_comm_waitany(comms);
502
503       // not MPI_UNDEFINED, as this is a simix return code
504       if (i != -1) {
505         index = map[i];
506         finish_wait(&requests[index], status);
507       }
508     }
509     xbt_free(map);
510     xbt_dynar_free(&comms);
511   }
512
513   if (index==MPI_UNDEFINED)
514     smpi_empty_status(status);
515
516   return index;
517 }
518
519 void smpi_mpi_waitall(int count, MPI_Request requests[],
520                       MPI_Status status[])
521 {
522   int  index, c;
523   MPI_Status stat;
524   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
525   //tag invalid requests in the set
526   for(c = 0; c < count; c++) {
527     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
528       if(status != MPI_STATUSES_IGNORE)
529         smpi_empty_status(&status[c]);
530     }else if(requests[c]->src == MPI_PROC_NULL ){
531       if(status != MPI_STATUSES_IGNORE) {
532         smpi_empty_status(&status[c]);
533         status[c].MPI_SOURCE=MPI_PROC_NULL;
534       }
535     }
536   }
537
538   for(c = 0; c < count; c++) {
539       if(MC_IS_ENABLED) {
540         smpi_mpi_wait(&requests[c], pstat);
541         index = c;
542       } else {
543         index = smpi_mpi_waitany(count, requests, pstat);
544         if(index == MPI_UNDEFINED) {
545           break;
546        }
547       if(status != MPI_STATUSES_IGNORE) {
548         memcpy(&status[index], pstat, sizeof(*pstat));
549
550       }
551     }
552   }
553 }
554
555 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
556                       MPI_Status status[])
557 {
558   int i, count, index;
559   MPI_Status stat;
560   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
561
562   count = 0;
563   for(i = 0; i < incount; i++)
564   {
565     index=smpi_mpi_waitany(incount, requests, pstat);
566     if(index!=MPI_UNDEFINED){
567       indices[count] = index;
568       count++;
569       if(status != MPI_STATUSES_IGNORE) {
570         memcpy(&status[index], pstat, sizeof(*pstat));
571       }
572     }else{
573       return MPI_UNDEFINED;
574     }
575   }
576   return count;
577 }
578
579 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
580                       MPI_Status status[])
581 {
582   int i, count, count_dead;
583   MPI_Status stat;
584   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
585
586   count = 0;
587   count_dead = 0;
588   for(i = 0; i < incount; i++) {
589     if((requests[i] != MPI_REQUEST_NULL)) {
590       if(smpi_mpi_test(&requests[i], pstat)) {
591          indices[count] = i;
592          count++;
593          if(status != MPI_STATUSES_IGNORE) {
594             memcpy(&status[i], pstat, sizeof(*pstat));
595          }
596       }
597     }else{
598       count_dead++;
599     }
600   }
601   if(count_dead==incount)return MPI_UNDEFINED;
602   else return count;
603 }
604
605 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
606                     MPI_Comm comm)
607 {
608   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
609   nary_tree_bcast(buf, count, datatype, root, comm, 4);
610 }
611
612 void smpi_mpi_barrier(MPI_Comm comm)
613 {
614   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
615   nary_tree_barrier(comm, 4);
616 }
617
618 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
619                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
620                      int root, MPI_Comm comm)
621 {
622   int system_tag = 666;
623   int rank, size, src, index;
624   MPI_Aint lb = 0, recvext = 0;
625   MPI_Request *requests;
626
627   rank = smpi_comm_rank(comm);
628   size = smpi_comm_size(comm);
629   if(rank != root) {
630     // Send buffer to root
631     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
632   } else {
633     // FIXME: check for errors
634     smpi_datatype_extent(recvtype, &lb, &recvext);
635     // Local copy from root
636     smpi_datatype_copy(sendbuf, sendcount, sendtype,
637                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
638     // Receive buffers from senders
639     requests = xbt_new(MPI_Request, size - 1);
640     index = 0;
641     for(src = 0; src < size; src++) {
642       if(src != root) {
643         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
644                                           recvcount, recvtype,
645                                           src, system_tag, comm);
646         index++;
647       }
648     }
649     // Wait for completion of irecv's.
650     smpi_mpi_startall(size - 1, requests);
651     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
652     xbt_free(requests);
653   }
654 }
655
656 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
657                       void *recvbuf, int *recvcounts, int *displs,
658                       MPI_Datatype recvtype, int root, MPI_Comm comm)
659 {
660   int system_tag = 666;
661   int rank, size, src, index;
662   MPI_Aint lb = 0, recvext = 0;
663   MPI_Request *requests;
664
665   rank = smpi_comm_rank(comm);
666   size = smpi_comm_size(comm);
667   if(rank != root) {
668     // Send buffer to root
669     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
670   } else {
671     // FIXME: check for errors
672     smpi_datatype_extent(recvtype, &lb, &recvext);
673     // Local copy from root
674     smpi_datatype_copy(sendbuf, sendcount, sendtype,
675                        (char *)recvbuf + displs[root] * recvext,
676                        recvcounts[root], recvtype);
677     // Receive buffers from senders
678     requests = xbt_new(MPI_Request, size - 1);
679     index = 0;
680     for(src = 0; src < size; src++) {
681       if(src != root) {
682         requests[index] =
683           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
684                           recvcounts[src], recvtype, src, system_tag, comm);
685         index++;
686       }
687     }
688     // Wait for completion of irecv's.
689     smpi_mpi_startall(size - 1, requests);
690     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
691     xbt_free(requests);
692   }
693 }
694
695 void smpi_mpi_allgather(void *sendbuf, int sendcount,
696                         MPI_Datatype sendtype, void *recvbuf,
697                         int recvcount, MPI_Datatype recvtype,
698                         MPI_Comm comm)
699 {
700   int system_tag = 666;
701   int rank, size, other, index;
702   MPI_Aint lb = 0, recvext = 0;
703   MPI_Request *requests;
704
705   rank = smpi_comm_rank(comm);
706   size = smpi_comm_size(comm);
707   // FIXME: check for errors
708   smpi_datatype_extent(recvtype, &lb, &recvext);
709   // Local copy from self
710   smpi_datatype_copy(sendbuf, sendcount, sendtype,
711                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
712                      recvtype);
713   // Send/Recv buffers to/from others;
714   requests = xbt_new(MPI_Request, 2 * (size - 1));
715   index = 0;
716   for(other = 0; other < size; other++) {
717     if(other != rank) {
718       requests[index] =
719         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
720                         comm);
721       index++;
722       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
723                                         recvcount, recvtype, other,
724                                         system_tag, comm);
725       index++;
726     }
727   }
728   // Wait for completion of all comms.
729   smpi_mpi_startall(2 * (size - 1), requests);
730   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
731   xbt_free(requests);
732 }
733
734 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
735                          MPI_Datatype sendtype, void *recvbuf,
736                          int *recvcounts, int *displs,
737                          MPI_Datatype recvtype, MPI_Comm comm)
738 {
739   int system_tag = 666;
740   int rank, size, other, index;
741   MPI_Aint lb = 0, recvext = 0;
742   MPI_Request *requests;
743
744   rank = smpi_comm_rank(comm);
745   size = smpi_comm_size(comm);
746   // FIXME: check for errors
747   smpi_datatype_extent(recvtype, &lb, &recvext);
748   // Local copy from self
749   smpi_datatype_copy(sendbuf, sendcount, sendtype,
750                      (char *)recvbuf + displs[rank] * recvext,
751                      recvcounts[rank], recvtype);
752   // Send buffers to others;
753   requests = xbt_new(MPI_Request, 2 * (size - 1));
754   index = 0;
755   for(other = 0; other < size; other++) {
756     if(other != rank) {
757       requests[index] =
758         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
759                         comm);
760       index++;
761       requests[index] =
762         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
763                         recvtype, other, system_tag, comm);
764       index++;
765     }
766   }
767   // Wait for completion of all comms.
768   smpi_mpi_startall(2 * (size - 1), requests);
769   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
770   xbt_free(requests);
771 }
772
773 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
774                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
775                       int root, MPI_Comm comm)
776 {
777   int system_tag = 666;
778   int rank, size, dst, index;
779   MPI_Aint lb = 0, sendext = 0;
780   MPI_Request *requests;
781
782   rank = smpi_comm_rank(comm);
783   size = smpi_comm_size(comm);
784   if(rank != root) {
785     // Recv buffer from root
786     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
787                   MPI_STATUS_IGNORE);
788   } else {
789     // FIXME: check for errors
790     smpi_datatype_extent(sendtype, &lb, &sendext);
791     // Local copy from root
792     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
793                        sendcount, sendtype, recvbuf, recvcount, recvtype);
794     // Send buffers to receivers
795     requests = xbt_new(MPI_Request, size - 1);
796     index = 0;
797     for(dst = 0; dst < size; dst++) {
798       if(dst != root) {
799         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
800                                           sendcount, sendtype, dst,
801                                           system_tag, comm);
802         index++;
803       }
804     }
805     // Wait for completion of isend's.
806     smpi_mpi_startall(size - 1, requests);
807     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
808     xbt_free(requests);
809   }
810 }
811
812 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
813                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
814                        MPI_Datatype recvtype, int root, MPI_Comm comm)
815 {
816   int system_tag = 666;
817   int rank, size, dst, index;
818   MPI_Aint lb = 0, sendext = 0;
819   MPI_Request *requests;
820
821   rank = smpi_comm_rank(comm);
822   size = smpi_comm_size(comm);
823   if(rank != root) {
824     // Recv buffer from root
825     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
826                   MPI_STATUS_IGNORE);
827   } else {
828     // FIXME: check for errors
829     smpi_datatype_extent(sendtype, &lb, &sendext);
830     // Local copy from root
831     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
832                        sendtype, recvbuf, recvcount, recvtype);
833     // Send buffers to receivers
834     requests = xbt_new(MPI_Request, size - 1);
835     index = 0;
836     for(dst = 0; dst < size; dst++) {
837       if(dst != root) {
838         requests[index] =
839           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
840                           sendtype, dst, system_tag, comm);
841         index++;
842       }
843     }
844     // Wait for completion of isend's.
845     smpi_mpi_startall(size - 1, requests);
846     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
847     xbt_free(requests);
848   }
849 }
850
851 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
852                      MPI_Datatype datatype, MPI_Op op, int root,
853                      MPI_Comm comm)
854 {
855   int system_tag = 666;
856   int rank, size, src, index;
857   MPI_Aint lb = 0, dataext = 0;
858   MPI_Request *requests;
859   void **tmpbufs;
860
861   rank = smpi_comm_rank(comm);
862   size = smpi_comm_size(comm);
863   if(rank != root) {
864     // Send buffer to root
865     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
866   } else {
867     // FIXME: check for errors
868     smpi_datatype_extent(datatype, &lb, &dataext);
869     // Local copy from root
870     if (sendbuf && recvbuf)
871       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
872     // Receive buffers from senders
873     //TODO: make a MPI_barrier here ?
874     requests = xbt_new(MPI_Request, size - 1);
875     tmpbufs = xbt_new(void *, size - 1);
876     index = 0;
877     for(src = 0; src < size; src++) {
878       if(src != root) {
879         // FIXME: possibly overkill we we have contiguous/noncontiguous data
880         //  mapping...
881         tmpbufs[index] = xbt_malloc(count * dataext);
882         requests[index] =
883           smpi_irecv_init(tmpbufs[index], count, datatype, src,
884                           system_tag, comm);
885         index++;
886       }
887     }
888     // Wait for completion of irecv's.
889     smpi_mpi_startall(size - 1, requests);
890     for(src = 0; src < size - 1; src++) {
891       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
892       XBT_DEBUG("finished waiting any request with index %d", index);
893       if(index == MPI_UNDEFINED) {
894         break;
895       }
896       if(op) /* op can be MPI_OP_NULL that does nothing */
897         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
898     }
899     for(index = 0; index < size - 1; index++) {
900       xbt_free(tmpbufs[index]);
901     }
902     xbt_free(tmpbufs);
903     xbt_free(requests);
904   }
905 }
906
907 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
908                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
909 {
910   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
911   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
912 }
913
914 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
915                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
916 {
917   int system_tag = 666;
918   int rank, size, other, index;
919   MPI_Aint lb = 0, dataext = 0;
920   MPI_Request *requests;
921   void **tmpbufs;
922
923   rank = smpi_comm_rank(comm);
924   size = smpi_comm_size(comm);
925
926   // FIXME: check for errors
927   smpi_datatype_extent(datatype, &lb, &dataext);
928
929   // Local copy from self
930   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
931
932   // Send/Recv buffers to/from others;
933   requests = xbt_new(MPI_Request, size - 1);
934   tmpbufs = xbt_new(void *, rank);
935   index = 0;
936   for(other = 0; other < rank; other++) {
937     // FIXME: possibly overkill we we have contiguous/noncontiguous data
938     // mapping...
939     tmpbufs[index] = xbt_malloc(count * dataext);
940     requests[index] =
941       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
942                       comm);
943     index++;
944   }
945   for(other = rank + 1; other < size; other++) {
946     requests[index] =
947       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
948     index++;
949   }
950   // Wait for completion of all comms.
951   smpi_mpi_startall(size - 1, requests);
952   for(other = 0; other < size - 1; other++) {
953     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
954     if(index == MPI_UNDEFINED) {
955       break;
956     }
957     if(index < rank) {
958       // #Request is below rank: it's a irecv
959       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
960     }
961   }
962   for(index = 0; index < rank; index++) {
963     xbt_free(tmpbufs[index]);
964   }
965   xbt_free(tmpbufs);
966   xbt_free(requests);
967 }