Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
avoid breaking the shiny new and working functionality from jean-noel
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "surf/surf.h"
13
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
16
17
18 static int match_recv(void* a, void* b, smx_action_t ignored) {
19    MPI_Request ref = (MPI_Request)a;
20    MPI_Request req = (MPI_Request)b;
21    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
22
23   xbt_assert(ref, "Cannot match recv against null reference");
24   xbt_assert(req, "Cannot match recv against null request");
25   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
26     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
27 }
28
29 static int match_send(void* a, void* b,smx_action_t ignored) {
30    MPI_Request ref = (MPI_Request)a;
31    MPI_Request req = (MPI_Request)b;
32    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
33    xbt_assert(ref, "Cannot match send against null reference");
34    xbt_assert(req, "Cannot match send against null request");
35    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
36           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
37 }
38
39 static MPI_Request build_request(void *buf, int count,
40                                  MPI_Datatype datatype, int src, int dst,
41                                  int tag, MPI_Comm comm, unsigned flags)
42 {
43   MPI_Request request;
44
45   void *old_buf;
46
47   request = xbt_new(s_smpi_mpi_request_t, 1);
48
49   s_smpi_subtype_t *subtype = datatype->substruct;
50
51   if(datatype->has_subtype == 1){
52     // This part handles the problem of non-contignous memory
53     old_buf = buf;
54     buf = malloc(count*smpi_datatype_size(datatype));
55     if (flags & SEND) {
56       subtype->serialize(old_buf, buf, count, datatype->substruct);
57     }
58   }
59
60   request->buf = buf;
61   // This part handles the problem of non-contignous memory (for the
62   // unserialisation at the reception)
63   request->old_buf = old_buf;
64   request->old_type = datatype;
65
66   request->size = smpi_datatype_size(datatype) * count;
67   request->src = src;
68   request->dst = dst;
69   request->tag = tag;
70   request->comm = comm;
71   request->action = NULL;
72   request->flags = flags;
73 #ifdef HAVE_TRACING
74   request->send = 0;
75   request->recv = 0;
76 #endif
77   return request;
78 }
79
80
81 void smpi_empty_status(MPI_Status * status) {
82   if(status != MPI_STATUS_IGNORE) {
83       status->MPI_SOURCE=MPI_ANY_SOURCE;
84       status->MPI_TAG=MPI_ANY_TAG;
85       status->count=0;
86   }
87 }
88
89 void smpi_action_trace_run(char *path)
90 {
91   char *name;
92   xbt_dynar_t todo;
93   xbt_dict_cursor_t cursor;
94
95   action_fp=NULL;
96   if (path) {
97     action_fp = fopen(path, "r");
98     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
99                strerror(errno));
100   }
101
102   if (!xbt_dict_is_empty(action_queues)) {
103     XBT_WARN
104       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
105
106
107     xbt_dict_foreach(action_queues, cursor, name, todo) {
108       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
109     }
110   }
111
112   if (path)
113     fclose(action_fp);
114   xbt_dict_free(&action_queues);
115   action_queues = xbt_dict_new_homogeneous(NULL);
116 }
117
118 static void smpi_mpi_request_free_voidp(void* request)
119 {
120   MPI_Request req = request;
121   smpi_mpi_request_free(&req);
122 }
123
124 /* MPI Low level calls */
125 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
126                                int dst, int tag, MPI_Comm comm)
127 {
128   MPI_Request request =
129     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
130                   comm, PERSISTENT | SEND);
131
132   return request;
133 }
134
135 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
136                                int src, int tag, MPI_Comm comm)
137 {
138   MPI_Request request =
139     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
140                   comm, PERSISTENT | RECV);
141
142   return request;
143 }
144
145 void smpi_mpi_start(MPI_Request request)
146 {
147   smx_rdv_t mailbox;
148   int detached = 0;
149
150   xbt_assert(!request->action,
151              "Cannot (re)start a non-finished communication");
152   if(request->flags & RECV) {
153     print_request("New recv", request);
154     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
155       mailbox = smpi_process_mailbox_small();
156     else
157       mailbox = smpi_process_mailbox();
158
159     // FIXME: SIMIX does not yet support non-contiguous datatypes
160     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
161   } else {
162
163     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
164     if(receiver == MPI_UNDEFINED) {
165       XBT_WARN("Trying to send a message to a wrong rank");
166       return;
167     }
168     print_request("New send", request);
169     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode
170       mailbox = smpi_process_remote_mailbox_small(receiver);
171     }else{
172       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
173       mailbox = smpi_process_remote_mailbox(receiver);
174     }
175     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
176       void *oldbuf = request->buf;
177       detached = 1;
178       request->buf = malloc(request->size);
179       if (oldbuf)
180         memcpy(request->buf,oldbuf,request->size);
181       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
182     }
183
184     request->action =
185       simcall_comm_isend(mailbox, request->size, -1.0,
186                          request->buf, request->size,
187                          &match_send,
188                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
189                          request,
190                          // detach if msg size < eager/rdv switch limit
191                          detached);
192
193 #ifdef HAVE_TRACING
194     /* FIXME: detached sends are not traceable (request->action == NULL) */
195     if (request->action)
196       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
197 #endif
198
199   }
200
201 }
202
203 void smpi_mpi_startall(int count, MPI_Request * requests)
204 {
205   int i;
206
207   for(i = 0; i < count; i++) {
208     smpi_mpi_start(requests[i]);
209   }
210 }
211
212 void smpi_mpi_request_free(MPI_Request * request)
213 {
214   xbt_free(*request);
215   *request = MPI_REQUEST_NULL;
216 }
217
218 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
219                             int dst, int tag, MPI_Comm comm)
220 {
221   MPI_Request request =
222     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
223                   comm, NON_PERSISTENT | SEND);
224
225   return request;
226 }
227
228 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
229                            int dst, int tag, MPI_Comm comm)
230 {
231   MPI_Request request =
232     smpi_isend_init(buf, count, datatype, dst, tag, comm);
233
234   smpi_mpi_start(request);
235   return request;
236 }
237
238 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
239                             int src, int tag, MPI_Comm comm)
240 {
241   MPI_Request request =
242     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
243                   comm, NON_PERSISTENT | RECV);
244   return request;
245 }
246
247 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
248                            int src, int tag, MPI_Comm comm)
249 {
250   MPI_Request request =
251     smpi_irecv_init(buf, count, datatype, src, tag, comm);
252
253   smpi_mpi_start(request);
254   return request;
255 }
256
257 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
258                    int tag, MPI_Comm comm, MPI_Status * status)
259 {
260   MPI_Request request;
261   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
262   smpi_mpi_wait(&request, status);
263 }
264
265
266
267 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
268                    int tag, MPI_Comm comm)
269 {
270   MPI_Request request;
271
272   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
273   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
274 }
275
276 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
277                        int dst, int sendtag, void *recvbuf, int recvcount,
278                        MPI_Datatype recvtype, int src, int recvtag,
279                        MPI_Comm comm, MPI_Status * status)
280 {
281   MPI_Request requests[2];
282   MPI_Status stats[2];
283
284   requests[0] =
285     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
286   requests[1] =
287     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
288   smpi_mpi_startall(2, requests);
289   smpi_mpi_waitall(2, requests, stats);
290   if(status != MPI_STATUS_IGNORE) {
291     // Copy receive status
292     memcpy(status, &stats[1], sizeof(MPI_Status));
293   }
294 }
295
296 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
297 {
298   return status->count / smpi_datatype_size(datatype);
299 }
300
301 static void finish_wait(MPI_Request * request, MPI_Status * status)
302 {
303   MPI_Request req = *request;
304   // if we have a sender, we should use its data, and not the data from the receive
305   if((req->action)&&
306      (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
307     req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
308
309   if(status != MPI_STATUS_IGNORE) {
310     status->MPI_SOURCE = req->src;
311     status->MPI_TAG = req->tag;
312     status->MPI_ERROR = MPI_SUCCESS;
313     // FIXME: really this should just contain the count of receive-type blocks,
314     // right?
315     status->count = req->size;
316   }
317   req = *request;
318
319   print_request("Finishing", req);
320   MPI_Datatype datatype = req->old_type;
321   if(datatype->has_subtype == 1){
322       // This part handles the problem of non-contignous memory
323       // the unserialization at the reception
324     s_smpi_subtype_t *subtype = datatype->substruct;
325     if(req->flags & RECV) {
326       subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
327     }
328     //FIXME: I am not sure that if the send is detached we have to free
329     //the sender buffer thus I do it only for the reciever
330     if(req->flags & RECV) free(req->buf);
331   }
332
333   if(req->flags & NON_PERSISTENT) {
334     smpi_mpi_request_free(request);
335   } else {
336     req->action = NULL;
337   }
338 }
339
340 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
341   int flag;
342
343   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
344   if ((*request)->action == NULL)
345     flag = 1;
346   else
347     flag = simcall_comm_test((*request)->action);
348   if(flag) {
349     finish_wait(request, status);
350   }else{
351     smpi_empty_status(status);
352   }
353   return flag;
354 }
355
356 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
357                      MPI_Status * status)
358 {
359   xbt_dynar_t comms;
360   int i, flag, size;
361   int* map;
362
363   *index = MPI_UNDEFINED;
364   flag = 0;
365   if(count > 0) {
366     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
367     map = xbt_new(int, count);
368     size = 0;
369     for(i = 0; i < count; i++) {
370       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
371          xbt_dynar_push(comms, &requests[i]->action);
372          map[size] = i;
373          size++;
374       }
375     }
376     if(size > 0) {
377       i = simcall_comm_testany(comms);
378       // not MPI_UNDEFINED, as this is a simix return code
379       if(i != -1) {
380         *index = map[i];
381         finish_wait(&requests[*index], status);
382         flag = 1;
383       }
384     }else{
385         //all requests are null or inactive, return true
386         flag=1;
387         smpi_empty_status(status);
388     }
389     xbt_free(map);
390     xbt_dynar_free(&comms);
391   }
392
393   return flag;
394 }
395
396
397 int smpi_mpi_testall(int count, MPI_Request requests[],
398                      MPI_Status status[])
399 {
400   MPI_Status stat;
401   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
402   int flag=1;
403   int i;
404   for(i=0; i<count; i++){
405     if(requests[i]!= MPI_REQUEST_NULL){
406       if (smpi_mpi_test(&requests[i], pstat)!=1){
407         flag=0;
408       }
409     }else{
410       smpi_empty_status(pstat);
411     }
412     if(status != MPI_STATUSES_IGNORE) {
413       memcpy(&status[i], pstat, sizeof(*pstat));
414     }
415   }
416   return flag;
417 }
418
419 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
420   int flag=0;
421   //FIXME find another wait to avoid busy waiting ?
422   // the issue here is that we have to wait on a nonexistent comm
423   while(flag==0){
424     smpi_mpi_iprobe(source, tag, comm, &flag, status);
425     XBT_DEBUG("Busy Waiting on probing : %d", flag);
426     if(!flag) {
427       simcall_process_sleep(0.0001);
428     }
429   }
430 }
431
432 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
433   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
434             comm, NON_PERSISTENT | RECV);
435
436   // behave like a receive, but don't do it
437   smx_rdv_t mailbox;
438
439   print_request("New iprobe", request);
440   // We have to test both mailboxes as we don't know if we will receive one one or another
441     if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
442         mailbox = smpi_process_mailbox_small();
443         XBT_DEBUG("trying to probe the perm recv mailbox");
444         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
445     }
446     if (request->action==NULL){
447         mailbox = smpi_process_mailbox();
448         XBT_DEBUG("trying to probe the other mailbox");
449         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
450     }
451
452   if(request->action){
453     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
454     *flag=true;
455     if(status != MPI_STATUS_IGNORE) {
456       status->MPI_SOURCE = req->src;
457       status->MPI_TAG = req->tag;
458       status->MPI_ERROR = MPI_SUCCESS;
459       status->count = req->size;
460     }
461   }
462   else *flag=false;
463   smpi_mpi_request_free(&request);
464
465   return;
466 }
467
468 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
469 {
470   print_request("Waiting", *request);
471   if ((*request)->action != NULL) { // this is not a detached send
472     simcall_comm_wait((*request)->action, -1.0);
473     finish_wait(request, status);
474   }
475   // FIXME for a detached send, finish_wait is not called:
476 }
477
478 int smpi_mpi_waitany(int count, MPI_Request requests[],
479                      MPI_Status * status)
480 {
481   xbt_dynar_t comms;
482   int i, size, index;
483   int *map;
484
485   index = MPI_UNDEFINED;
486   if(count > 0) {
487     // Wait for a request to complete
488     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
489     map = xbt_new(int, count);
490     size = 0;
491     XBT_DEBUG("Wait for one of");
492     for(i = 0; i < count; i++) {
493       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
494         print_request("Waiting any ", requests[i]);
495         xbt_dynar_push(comms, &requests[i]->action);
496         map[size] = i;
497         size++;
498       }
499     }
500     if(size > 0) {
501       i = simcall_comm_waitany(comms);
502
503       // not MPI_UNDEFINED, as this is a simix return code
504       if (i != -1) {
505         index = map[i];
506         finish_wait(&requests[index], status);
507       }
508     }
509     xbt_free(map);
510     xbt_dynar_free(&comms);
511   }
512
513   if (index==MPI_UNDEFINED)
514     smpi_empty_status(status);
515
516   return index;
517 }
518
519 void smpi_mpi_waitall(int count, MPI_Request requests[],
520                       MPI_Status status[])
521 {
522   int  index, c;
523   MPI_Status stat;
524   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
525   //tag invalid requests in the set
526   for(c = 0; c < count; c++) {
527     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
528       if(status != MPI_STATUSES_IGNORE)
529         smpi_empty_status(&status[c]);
530     }else if(requests[c]->src == MPI_PROC_NULL ){
531       if(status != MPI_STATUSES_IGNORE) {
532         smpi_empty_status(&status[c]);
533         status[c].MPI_SOURCE=MPI_PROC_NULL;
534       }
535     }
536   }
537
538   for(c = 0; c < count; c++) {
539       if(MC_IS_ENABLED) {
540         smpi_mpi_wait(&requests[c], pstat);
541         index = c;
542       } else {
543         index = smpi_mpi_waitany(count, requests, pstat);
544         if(index == MPI_UNDEFINED) {
545           break;
546        }
547       if(status != MPI_STATUSES_IGNORE) {
548         memcpy(&status[index], pstat, sizeof(*pstat));
549
550       }
551     }
552   }
553 }
554
555 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
556                       MPI_Status status[])
557 {
558   int i, count, index;
559   MPI_Status stat;
560   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
561
562   count = 0;
563   for(i = 0; i < incount; i++)
564   {
565     index=smpi_mpi_waitany(incount, requests, pstat);
566     if(index!=MPI_UNDEFINED){
567       indices[count] = index;
568       count++;
569       if(status != MPI_STATUSES_IGNORE) {
570         memcpy(&status[index], pstat, sizeof(*pstat));
571       }
572     }else{
573       return MPI_UNDEFINED;
574     }
575   }
576   return count;
577 }
578
579 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
580                       MPI_Status status[])
581 {
582   int i, count, count_dead;
583   MPI_Status stat;
584   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
585
586   count = 0;
587   for(i = 0; i < incount; i++) {
588     if((requests[i] != MPI_REQUEST_NULL)) {
589       if(smpi_mpi_test(&requests[i], pstat)) {
590          indices[count] = i;
591          count++;
592          if(status != MPI_STATUSES_IGNORE) {
593             memcpy(&status[i], pstat, sizeof(*pstat));
594          }
595       }
596     }else{
597       count_dead++;
598     }
599   }
600   if(count_dead==incount)return MPI_UNDEFINED;
601   else return count;
602 }
603
604 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
605                     MPI_Comm comm)
606 {
607   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
608   nary_tree_bcast(buf, count, datatype, root, comm, 4);
609 }
610
611 void smpi_mpi_barrier(MPI_Comm comm)
612 {
613   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
614   nary_tree_barrier(comm, 4);
615 }
616
617 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
618                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
619                      int root, MPI_Comm comm)
620 {
621   int system_tag = 666;
622   int rank, size, src, index;
623   MPI_Aint lb = 0, recvext = 0;
624   MPI_Request *requests;
625
626   rank = smpi_comm_rank(comm);
627   size = smpi_comm_size(comm);
628   if(rank != root) {
629     // Send buffer to root
630     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
631   } else {
632     // FIXME: check for errors
633     smpi_datatype_extent(recvtype, &lb, &recvext);
634     // Local copy from root
635     smpi_datatype_copy(sendbuf, sendcount, sendtype,
636                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
637     // Receive buffers from senders
638     requests = xbt_new(MPI_Request, size - 1);
639     index = 0;
640     for(src = 0; src < size; src++) {
641       if(src != root) {
642         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
643                                           recvcount, recvtype,
644                                           src, system_tag, comm);
645         index++;
646       }
647     }
648     // Wait for completion of irecv's.
649     smpi_mpi_startall(size - 1, requests);
650     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
651     xbt_free(requests);
652   }
653 }
654
655 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
656                       void *recvbuf, int *recvcounts, int *displs,
657                       MPI_Datatype recvtype, int root, MPI_Comm comm)
658 {
659   int system_tag = 666;
660   int rank, size, src, index;
661   MPI_Aint lb = 0, recvext = 0;
662   MPI_Request *requests;
663
664   rank = smpi_comm_rank(comm);
665   size = smpi_comm_size(comm);
666   if(rank != root) {
667     // Send buffer to root
668     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
669   } else {
670     // FIXME: check for errors
671     smpi_datatype_extent(recvtype, &lb, &recvext);
672     // Local copy from root
673     smpi_datatype_copy(sendbuf, sendcount, sendtype,
674                        (char *)recvbuf + displs[root] * recvext,
675                        recvcounts[root], recvtype);
676     // Receive buffers from senders
677     requests = xbt_new(MPI_Request, size - 1);
678     index = 0;
679     for(src = 0; src < size; src++) {
680       if(src != root) {
681         requests[index] =
682           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
683                           recvcounts[src], recvtype, src, system_tag, comm);
684         index++;
685       }
686     }
687     // Wait for completion of irecv's.
688     smpi_mpi_startall(size - 1, requests);
689     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
690     xbt_free(requests);
691   }
692 }
693
694 void smpi_mpi_allgather(void *sendbuf, int sendcount,
695                         MPI_Datatype sendtype, void *recvbuf,
696                         int recvcount, MPI_Datatype recvtype,
697                         MPI_Comm comm)
698 {
699   int system_tag = 666;
700   int rank, size, other, index;
701   MPI_Aint lb = 0, recvext = 0;
702   MPI_Request *requests;
703
704   rank = smpi_comm_rank(comm);
705   size = smpi_comm_size(comm);
706   // FIXME: check for errors
707   smpi_datatype_extent(recvtype, &lb, &recvext);
708   // Local copy from self
709   smpi_datatype_copy(sendbuf, sendcount, sendtype,
710                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
711                      recvtype);
712   // Send/Recv buffers to/from others;
713   requests = xbt_new(MPI_Request, 2 * (size - 1));
714   index = 0;
715   for(other = 0; other < size; other++) {
716     if(other != rank) {
717       requests[index] =
718         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
719                         comm);
720       index++;
721       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
722                                         recvcount, recvtype, other,
723                                         system_tag, comm);
724       index++;
725     }
726   }
727   // Wait for completion of all comms.
728   smpi_mpi_startall(2 * (size - 1), requests);
729   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
730   xbt_free(requests);
731 }
732
733 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
734                          MPI_Datatype sendtype, void *recvbuf,
735                          int *recvcounts, int *displs,
736                          MPI_Datatype recvtype, MPI_Comm comm)
737 {
738   int system_tag = 666;
739   int rank, size, other, index;
740   MPI_Aint lb = 0, recvext = 0;
741   MPI_Request *requests;
742
743   rank = smpi_comm_rank(comm);
744   size = smpi_comm_size(comm);
745   // FIXME: check for errors
746   smpi_datatype_extent(recvtype, &lb, &recvext);
747   // Local copy from self
748   smpi_datatype_copy(sendbuf, sendcount, sendtype,
749                      (char *)recvbuf + displs[rank] * recvext,
750                      recvcounts[rank], recvtype);
751   // Send buffers to others;
752   requests = xbt_new(MPI_Request, 2 * (size - 1));
753   index = 0;
754   for(other = 0; other < size; other++) {
755     if(other != rank) {
756       requests[index] =
757         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
758                         comm);
759       index++;
760       requests[index] =
761         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
762                         recvtype, other, system_tag, comm);
763       index++;
764     }
765   }
766   // Wait for completion of all comms.
767   smpi_mpi_startall(2 * (size - 1), requests);
768   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
769   xbt_free(requests);
770 }
771
772 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
773                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
774                       int root, MPI_Comm comm)
775 {
776   int system_tag = 666;
777   int rank, size, dst, index;
778   MPI_Aint lb = 0, sendext = 0;
779   MPI_Request *requests;
780
781   rank = smpi_comm_rank(comm);
782   size = smpi_comm_size(comm);
783   if(rank != root) {
784     // Recv buffer from root
785     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
786                   MPI_STATUS_IGNORE);
787   } else {
788     // FIXME: check for errors
789     smpi_datatype_extent(sendtype, &lb, &sendext);
790     // Local copy from root
791     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
792                        sendcount, sendtype, recvbuf, recvcount, recvtype);
793     // Send buffers to receivers
794     requests = xbt_new(MPI_Request, size - 1);
795     index = 0;
796     for(dst = 0; dst < size; dst++) {
797       if(dst != root) {
798         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
799                                           sendcount, sendtype, dst,
800                                           system_tag, comm);
801         index++;
802       }
803     }
804     // Wait for completion of isend's.
805     smpi_mpi_startall(size - 1, requests);
806     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
807     xbt_free(requests);
808   }
809 }
810
811 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
812                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
813                        MPI_Datatype recvtype, int root, MPI_Comm comm)
814 {
815   int system_tag = 666;
816   int rank, size, dst, index;
817   MPI_Aint lb = 0, sendext = 0;
818   MPI_Request *requests;
819
820   rank = smpi_comm_rank(comm);
821   size = smpi_comm_size(comm);
822   if(rank != root) {
823     // Recv buffer from root
824     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
825                   MPI_STATUS_IGNORE);
826   } else {
827     // FIXME: check for errors
828     smpi_datatype_extent(sendtype, &lb, &sendext);
829     // Local copy from root
830     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
831                        sendtype, recvbuf, recvcount, recvtype);
832     // Send buffers to receivers
833     requests = xbt_new(MPI_Request, size - 1);
834     index = 0;
835     for(dst = 0; dst < size; dst++) {
836       if(dst != root) {
837         requests[index] =
838           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
839                           sendtype, dst, system_tag, comm);
840         index++;
841       }
842     }
843     // Wait for completion of isend's.
844     smpi_mpi_startall(size - 1, requests);
845     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
846     xbt_free(requests);
847   }
848 }
849
850 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
851                      MPI_Datatype datatype, MPI_Op op, int root,
852                      MPI_Comm comm)
853 {
854   int system_tag = 666;
855   int rank, size, src, index;
856   MPI_Aint lb = 0, dataext = 0;
857   MPI_Request *requests;
858   void **tmpbufs;
859
860   rank = smpi_comm_rank(comm);
861   size = smpi_comm_size(comm);
862   if(rank != root) {
863     // Send buffer to root
864     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
865   } else {
866     // FIXME: check for errors
867     smpi_datatype_extent(datatype, &lb, &dataext);
868     // Local copy from root
869     if (sendbuf && recvbuf)
870       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
871     // Receive buffers from senders
872     //TODO: make a MPI_barrier here ?
873     requests = xbt_new(MPI_Request, size - 1);
874     tmpbufs = xbt_new(void *, size - 1);
875     index = 0;
876     for(src = 0; src < size; src++) {
877       if(src != root) {
878         // FIXME: possibly overkill we we have contiguous/noncontiguous data
879         //  mapping...
880         tmpbufs[index] = xbt_malloc(count * dataext);
881         requests[index] =
882           smpi_irecv_init(tmpbufs[index], count, datatype, src,
883                           system_tag, comm);
884         index++;
885       }
886     }
887     // Wait for completion of irecv's.
888     smpi_mpi_startall(size - 1, requests);
889     for(src = 0; src < size - 1; src++) {
890       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
891       XBT_DEBUG("finished waiting any request with index %d", index);
892       if(index == MPI_UNDEFINED) {
893         break;
894       }
895       if(op) /* op can be MPI_OP_NULL that does nothing */
896         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
897     }
898     for(index = 0; index < size - 1; index++) {
899       xbt_free(tmpbufs[index]);
900     }
901     xbt_free(tmpbufs);
902     xbt_free(requests);
903   }
904 }
905
906 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
907                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
908 {
909   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
910   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
911 }
912
913 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
914                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
915 {
916   int system_tag = 666;
917   int rank, size, other, index;
918   MPI_Aint lb = 0, dataext = 0;
919   MPI_Request *requests;
920   void **tmpbufs;
921
922   rank = smpi_comm_rank(comm);
923   size = smpi_comm_size(comm);
924
925   // FIXME: check for errors
926   smpi_datatype_extent(datatype, &lb, &dataext);
927
928   // Local copy from self
929   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
930
931   // Send/Recv buffers to/from others;
932   requests = xbt_new(MPI_Request, size - 1);
933   tmpbufs = xbt_new(void *, rank);
934   index = 0;
935   for(other = 0; other < rank; other++) {
936     // FIXME: possibly overkill we we have contiguous/noncontiguous data
937     // mapping...
938     tmpbufs[index] = xbt_malloc(count * dataext);
939     requests[index] =
940       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
941                       comm);
942     index++;
943   }
944   for(other = rank + 1; other < size; other++) {
945     requests[index] =
946       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
947     index++;
948   }
949   // Wait for completion of all comms.
950   smpi_mpi_startall(size - 1, requests);
951   for(other = 0; other < size - 1; other++) {
952     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
953     if(index == MPI_UNDEFINED) {
954       break;
955     }
956     if(index < rank) {
957       // #Request is below rank: it's a irecv
958       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
959     }
960   }
961   for(index = 0; index < rank; index++) {
962     xbt_free(tmpbufs[index]);
963   }
964   xbt_free(tmpbufs);
965   xbt_free(requests);
966 }