Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make internal collective flags negative (incorrect in MPI), to avoid confusion with...
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/virtu.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14 #include "simgrid/sg_config.h"
15 #include "colls/colls.h"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
18
19
20 static int match_recv(void* a, void* b, smx_action_t ignored) {
21    MPI_Request ref = (MPI_Request)a;
22    MPI_Request req = (MPI_Request)b;
23    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
24
25   xbt_assert(ref, "Cannot match recv against null reference");
26   xbt_assert(req, "Cannot match recv against null request");
27   if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
28     && ((ref->tag == MPI_ANY_TAG && req->tag >=0) || req->tag == ref->tag)){
29     //we match, we can transfer some values
30     // FIXME : move this to the copy function ?
31     if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
32     if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
33     if(ref->real_size < req->real_size) ref->truncated = 1;
34     if(req->detached==1){
35         ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
36     }
37     XBT_DEBUG("match succeeded");
38     return 1;
39   }else return 0;
40 }
41
42 static int match_send(void* a, void* b,smx_action_t ignored) {
43    MPI_Request ref = (MPI_Request)a;
44    MPI_Request req = (MPI_Request)b;
45    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
46    xbt_assert(ref, "Cannot match send against null reference");
47    xbt_assert(req, "Cannot match send against null request");
48
49    if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
50              && ((req->tag == MPI_ANY_TAG && ref->tag >=0)|| req->tag == ref->tag))
51    {
52      if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
53      if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
54      if(req->real_size < ref->real_size) req->truncated = 1;
55      if(ref->detached==1){
56          req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
57      }
58     XBT_DEBUG("match succeeded");
59      return 1;
60    } else return 0;
61 }
62
63
64 typedef struct s_smpi_factor *smpi_factor_t;
65 typedef struct s_smpi_factor {
66   long factor;
67   int nb_values;
68   double values[4];//arbitrary set to 4
69 } s_smpi_factor_t;
70 xbt_dynar_t smpi_os_values = NULL;
71 xbt_dynar_t smpi_or_values = NULL;
72 xbt_dynar_t smpi_ois_values = NULL;
73
74 // Methods used to parse and store the values for timing injections in smpi
75 // These are taken from surf/network.c and generalized to have more factors
76 // These methods should be merged with those in surf/network.c (moved somewhere in xbt ?)
77
78 static int factor_cmp(const void *pa, const void *pb)
79 {
80   return (((s_smpi_factor_t*)pa)->factor > ((s_smpi_factor_t*)pb)->factor);
81 }
82
83
84 static xbt_dynar_t parse_factor(const char *smpi_coef_string)
85 {
86   char *value = NULL;
87   unsigned int iter = 0;
88   s_smpi_factor_t fact;
89   int i=0;
90   xbt_dynar_t smpi_factor, radical_elements, radical_elements2 = NULL;
91
92   smpi_factor = xbt_dynar_new(sizeof(s_smpi_factor_t), NULL);
93   radical_elements = xbt_str_split(smpi_coef_string, ";");
94   xbt_dynar_foreach(radical_elements, iter, value) {
95     fact.nb_values=0;
96     radical_elements2 = xbt_str_split(value, ":");
97     if (xbt_dynar_length(radical_elements2) <2 || xbt_dynar_length(radical_elements2) > 5)
98       xbt_die("Malformed radical for smpi factor!");
99     for(i =0; i<xbt_dynar_length(radical_elements2);i++ ){
100         if (i==0){
101            fact.factor = atol(xbt_dynar_get_as(radical_elements2, i, char *));
102         }else{
103            fact.values[fact.nb_values] = atof(xbt_dynar_get_as(radical_elements2, i, char *));
104            fact.nb_values++;
105         }
106     }
107
108     xbt_dynar_push_as(smpi_factor, s_smpi_factor_t, fact);
109     XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
110     xbt_dynar_free(&radical_elements2);
111   }
112   xbt_dynar_free(&radical_elements);
113   iter=0;
114   xbt_dynar_sort(smpi_factor, &factor_cmp);
115   xbt_dynar_foreach(smpi_factor, iter, fact) {
116     XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
117   }
118   return smpi_factor;
119 }
120
121 static double smpi_os(double size)
122 {
123   if (!smpi_os_values) {
124     smpi_os_values = parse_factor(sg_cfg_get_string("smpi/os"));
125     smpi_register_static(smpi_os_values, xbt_dynar_free_voidp);
126   }
127   unsigned int iter = 0;
128   s_smpi_factor_t fact;
129   double current=0.0;
130   xbt_dynar_foreach(smpi_os_values, iter, fact) {
131     if (size <= fact.factor) {
132         XBT_DEBUG("os : %lf <= %ld return %f", size, fact.factor, current);
133       return current;
134     }else{
135       current=fact.values[0]+fact.values[1]*size;
136     }
137   }
138   XBT_DEBUG("os : %lf > %ld return %f", size, fact.factor, current);
139
140   return current;
141 }
142
143 static double smpi_ois(double size)
144 {
145   if (!smpi_ois_values) {
146     smpi_ois_values = parse_factor(sg_cfg_get_string("smpi/ois"));
147     smpi_register_static(smpi_ois_values, xbt_dynar_free_voidp);
148   }
149   unsigned int iter = 0;
150   s_smpi_factor_t fact;
151   double current=0.0;
152   xbt_dynar_foreach(smpi_ois_values, iter, fact) {
153     if (size <= fact.factor) {
154         XBT_DEBUG("ois : %lf <= %ld return %f", size, fact.factor, current);
155       return current;
156     }else{
157       current=fact.values[0]+fact.values[1]*size;
158     }
159   }
160   XBT_DEBUG("ois : %lf > %ld return %f", size, fact.factor, current);
161
162   return current;
163 }
164
165 static double smpi_or(double size)
166 {
167   if (!smpi_or_values) {
168     smpi_or_values = parse_factor(sg_cfg_get_string("smpi/or"));
169     smpi_register_static(smpi_or_values, xbt_dynar_free_voidp);
170   }
171   unsigned int iter = 0;
172   s_smpi_factor_t fact;
173   double current=0.0;
174   xbt_dynar_foreach(smpi_or_values, iter, fact) {
175     if (size <= fact.factor) {
176         XBT_DEBUG("or : %lf <= %ld return %f", size, fact.factor, current);
177       return current;
178     }else
179       current=fact.values[0]+fact.values[1]*size;
180   }
181   XBT_DEBUG("or : %lf > %ld return %f", size, fact.factor, current);
182
183   return current;
184 }
185
186 static MPI_Request build_request(void *buf, int count,
187                                  MPI_Datatype datatype, int src, int dst,
188                                  int tag, MPI_Comm comm, unsigned flags)
189 {
190   MPI_Request request;
191
192   void *old_buf = NULL;
193
194   request = xbt_new(s_smpi_mpi_request_t, 1);
195
196   s_smpi_subtype_t *subtype = datatype->substruct;
197
198   if(datatype->has_subtype == 1){
199     // This part handles the problem of non-contiguous memory
200     old_buf = buf;
201     buf = count==0 ? NULL : xbt_malloc(count*smpi_datatype_size(datatype));
202     if (flags & SEND) {
203       subtype->serialize(old_buf, buf, count, datatype->substruct);
204     }
205   }
206
207   request->buf = buf;
208   // This part handles the problem of non-contiguous memory (for the
209   // unserialisation at the reception)
210   request->old_buf = old_buf;
211   request->old_type = datatype;
212
213   request->size = smpi_datatype_size(datatype) * count;
214   request->src = src;
215   request->dst = dst;
216   request->tag = tag;
217   request->comm = comm;
218   request->action = NULL;
219   request->flags = flags;
220   request->detached = 0;
221   request->detached_sender = NULL;
222
223   request->truncated = 0;
224   request->real_size = 0;
225   request->real_tag = 0;
226
227   request->refcount=1;
228 #ifdef HAVE_TRACING
229   request->send = 0;
230   request->recv = 0;
231 #endif
232   if (flags & SEND) smpi_datatype_unuse(datatype);
233
234   return request;
235 }
236
237
238 void smpi_empty_status(MPI_Status * status)
239 {
240   if(status != MPI_STATUS_IGNORE) {
241     status->MPI_SOURCE = MPI_ANY_SOURCE;
242     status->MPI_TAG = MPI_ANY_TAG;
243     status->MPI_ERROR = MPI_SUCCESS;
244     status->count=0;
245   }
246 }
247
248 void smpi_action_trace_run(char *path)
249 {
250   char *name;
251   xbt_dynar_t todo;
252   xbt_dict_cursor_t cursor;
253
254   action_fp=NULL;
255   if (path) {
256     action_fp = fopen(path, "r");
257     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
258                strerror(errno));
259   }
260
261   if (!xbt_dict_is_empty(action_queues)) {
262     XBT_WARN
263       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
264
265
266     xbt_dict_foreach(action_queues, cursor, name, todo) {
267       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
268     }
269   }
270
271   if (path)
272     fclose(action_fp);
273   xbt_dict_free(&action_queues);
274   action_queues = xbt_dict_new_homogeneous(NULL);
275 }
276
277 static void smpi_mpi_request_free_voidp(void* request)
278 {
279   MPI_Request req = request;
280   smpi_mpi_request_free(&req);
281 }
282
283 /* MPI Low level calls */
284 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
285                                int dst, int tag, MPI_Comm comm)
286 {
287   MPI_Request request =
288     build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
289                   comm, PERSISTENT | SEND);
290   request->refcount++;
291   return request;
292 }
293
294 MPI_Request smpi_mpi_ssend_init(void *buf, int count, MPI_Datatype datatype,
295                                int dst, int tag, MPI_Comm comm)
296 {
297   MPI_Request request =
298     build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
299                   comm, PERSISTENT | SSEND | SEND);
300   request->refcount++;
301   return request;
302 }
303
304 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
305                                int src, int tag, MPI_Comm comm)
306 {
307   MPI_Request request =
308     build_request(buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
309                   comm, PERSISTENT | RECV);
310   request->refcount++;
311   return request;
312 }
313
314 void smpi_mpi_start(MPI_Request request)
315 {
316   smx_rdv_t mailbox;
317
318   xbt_assert(!request->action,
319              "Cannot (re)start a non-finished communication");
320   if(request->flags & RECV) {
321     print_request("New recv", request);
322     if (request->size < sg_cfg_get_int("smpi/async_small_thres"))
323       mailbox = smpi_process_mailbox_small();
324     else
325       mailbox = smpi_process_mailbox();
326     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
327     request->real_size=request->size;
328     smpi_datatype_use(request->old_type);
329     request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, request);
330
331     //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
332     double sleeptime = request->detached ? smpi_or(request->size) : 0.0;
333     if(sleeptime!=0.0){
334         simcall_process_sleep(sleeptime);
335         XBT_DEBUG("receiving size of %zu : sleep %lf ", request->size, smpi_or(request->size));
336     }
337
338   } else {
339
340
341     int receiver = request->dst;//smpi_group_index(smpi_comm_group(request->comm), request->dst);
342
343     #ifdef HAVE_TRACING
344       int rank = smpi_process_index();
345       if (TRACE_smpi_view_internals()) {
346         TRACE_smpi_send(rank, rank, receiver);
347       }
348     #endif
349 /*    if(receiver == MPI_UNDEFINED) {*/
350 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
351 /*      return;*/
352 /*    }*/
353     print_request("New send", request);
354     if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
355       mailbox = smpi_process_remote_mailbox_small(receiver);
356     }else{
357       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
358       mailbox = smpi_process_remote_mailbox(receiver);
359     }
360     if ( (! (request->flags & SSEND)) && (request->size < sg_cfg_get_int("smpi/send_is_detached_thres"))) {
361       void *oldbuf = NULL;
362       request->detached = 1;
363       request->refcount++;
364       if(request->old_type->has_subtype == 0){
365         oldbuf = request->buf;
366         if (oldbuf && request->size!=0){
367           request->buf = xbt_malloc(request->size);
368           memcpy(request->buf,oldbuf,request->size);
369         }
370       }
371       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
372     }
373
374     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
375     request->real_size=request->size;
376     smpi_datatype_use(request->old_type);
377
378     //if we are giving back the control to the user without waiting for completion, we have to inject timings
379     double sleeptime =0.0;
380     if(request->detached || (request->flags & (ISEND|SSEND))){// issend should be treated as isend
381       //isend and send timings may be different
382       sleeptime = (request->flags & ISEND)? smpi_ois(request->size) : smpi_os(request->size);
383     }
384
385     if(sleeptime!=0.0){
386         simcall_process_sleep(sleeptime);
387         XBT_DEBUG("sending size of %zu : sleep %lf ", request->size, smpi_os(request->size));
388     }
389
390     request->action =
391       simcall_comm_isend(mailbox, request->size, -1.0,
392                          request->buf, request->real_size,
393                          &match_send,
394                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
395                          request,
396                          // detach if msg size < eager/rdv switch limit
397                          request->detached);
398
399 #ifdef HAVE_TRACING
400     /* FIXME: detached sends are not traceable (request->action == NULL) */
401     if (request->action)
402       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
403
404 #endif
405
406   }
407
408 }
409
410 void smpi_mpi_startall(int count, MPI_Request * requests)
411 {
412   int i;
413
414   for(i = 0; i < count; i++) {
415     smpi_mpi_start(requests[i]);
416   }
417 }
418
419 void smpi_mpi_request_free(MPI_Request * request)
420 {
421
422   if((*request) != MPI_REQUEST_NULL){
423     (*request)->refcount--;
424     if((*request)->refcount<0) xbt_die("wrong refcount");
425
426     if((*request)->refcount==0){
427         print_request("Destroying", (*request));
428         xbt_free(*request);
429         *request = MPI_REQUEST_NULL;
430     }else{
431         print_request("Decrementing", (*request));
432
433     }
434   }else{
435       xbt_die("freeing an already free request");
436   }
437 }
438
439 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
440                             int dst, int tag, MPI_Comm comm)
441 {
442   MPI_Request request =
443     build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
444                   comm, NON_PERSISTENT | SEND);
445
446   return request;
447 }
448
449 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
450                            int dst, int tag, MPI_Comm comm)
451 {
452   MPI_Request request =
453       build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
454                     comm, NON_PERSISTENT | ISEND | SEND);
455
456   smpi_mpi_start(request);
457   return request;
458 }
459
460 MPI_Request smpi_mpi_issend(void *buf, int count, MPI_Datatype datatype,
461                            int dst, int tag, MPI_Comm comm)
462 {
463   MPI_Request request =
464       build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
465                     comm, NON_PERSISTENT | ISEND | SSEND | SEND);
466   smpi_mpi_start(request);
467   return request;
468 }
469
470
471
472 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
473                             int src, int tag, MPI_Comm comm)
474 {
475   MPI_Request request =
476     build_request(buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
477                   comm, NON_PERSISTENT | RECV);
478   return request;
479 }
480
481 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
482                            int src, int tag, MPI_Comm comm)
483 {
484   MPI_Request request =
485       build_request(buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
486                     comm, NON_PERSISTENT | RECV);
487
488   smpi_mpi_start(request);
489   return request;
490 }
491
492 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
493                    int tag, MPI_Comm comm, MPI_Status * status)
494 {
495   MPI_Request request;
496   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
497   smpi_mpi_wait(&request, status);
498 }
499
500
501
502 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
503                    int tag, MPI_Comm comm)
504 {
505   MPI_Request request =
506       build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
507                     comm, NON_PERSISTENT | SEND);
508   smpi_mpi_start(request);
509   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
510
511 }
512
513 void smpi_mpi_ssend(void *buf, int count, MPI_Datatype datatype,
514                            int dst, int tag, MPI_Comm comm)
515 {
516   MPI_Request request =
517       build_request(buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
518                     comm, NON_PERSISTENT | SSEND | SEND);
519
520   smpi_mpi_start(request);
521   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
522 }
523
524 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
525                        int dst, int sendtag, void *recvbuf, int recvcount,
526                        MPI_Datatype recvtype, int src, int recvtag,
527                        MPI_Comm comm, MPI_Status * status)
528 {
529   MPI_Request requests[2];
530   MPI_Status stats[2];
531   int myid=smpi_process_index();
532   if ((smpi_group_index(smpi_comm_group(comm), dst) == myid) && (smpi_group_index(smpi_comm_group(comm), src) == myid)) {
533       smpi_datatype_copy(sendbuf, sendcount, sendtype,
534                                      recvbuf, recvcount, recvtype);
535       return;
536   }
537   requests[0] =
538     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
539   requests[1] =
540     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
541   smpi_mpi_startall(2, requests);
542   smpi_mpi_waitall(2, requests, stats);
543   if(status != MPI_STATUS_IGNORE) {
544     // Copy receive status
545     *status = stats[1];
546   }
547 }
548
549 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
550 {
551   return status->count / smpi_datatype_size(datatype);
552 }
553
554 static void finish_wait(MPI_Request * request, MPI_Status * status)
555 {
556   MPI_Request req = *request;
557   if(status != MPI_STATUS_IGNORE)
558     smpi_empty_status(status);
559
560   if(!(req->detached && req->flags & SEND)){
561     if(status != MPI_STATUS_IGNORE) {
562       int src = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
563       status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(req->comm), src);
564       status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
565       status->MPI_ERROR = req->truncated ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
566       // this handles the case were size in receive differs from size in send
567       // FIXME: really this should just contain the count of receive-type blocks,
568       // right?
569       status->count = req->real_size;
570     }
571
572     print_request("Finishing", req);
573     MPI_Datatype datatype = req->old_type;
574
575     if(datatype->has_subtype == 1){
576         // This part handles the problem of non-contignous memory
577         // the unserialization at the reception
578       s_smpi_subtype_t *subtype = datatype->substruct;
579       if(req->flags & RECV) {
580         subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
581       }
582       if(req->detached == 0) free(req->buf);
583     }
584     smpi_datatype_unuse(datatype);
585
586   }
587
588 #ifdef HAVE_TRACING
589     if (TRACE_smpi_view_internals()) {
590       if(req->flags & RECV){
591         int rank = smpi_process_index();
592         int src_traced = (req->src == MPI_ANY_SOURCE ? req->real_src : req->src);
593         TRACE_smpi_recv(rank, src_traced, rank);
594       }
595     }
596 #endif
597
598   if(req->detached_sender!=NULL){
599     smpi_mpi_request_free(&(req->detached_sender));
600   }
601
602   if(req->flags & NON_PERSISTENT) {
603     smpi_mpi_request_free(request);
604   } else {
605     req->action = NULL;
606   }
607 }
608
609 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
610   int flag;
611
612   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
613   if ((*request)->action == NULL)
614     flag = 1;
615   else
616     flag = simcall_comm_test((*request)->action);
617   if(flag) {
618     finish_wait(request, status);
619     request=MPI_REQUEST_NULL;
620   }else{
621     smpi_empty_status(status);
622   }
623   return flag;
624 }
625
626 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
627                      MPI_Status * status)
628 {
629   xbt_dynar_t comms;
630   int i, flag, size;
631   int* map;
632
633   *index = MPI_UNDEFINED;
634   flag = 0;
635   if(count > 0) {
636     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
637     map = xbt_new(int, count);
638     size = 0;
639     for(i = 0; i < count; i++) {
640       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
641          xbt_dynar_push(comms, &requests[i]->action);
642          map[size] = i;
643          size++;
644       }
645     }
646     if(size > 0) {
647       i = simcall_comm_testany(comms);
648       // not MPI_UNDEFINED, as this is a simix return code
649       if(i != -1) {
650         *index = map[i];
651         finish_wait(&requests[*index], status);
652         flag = 1;
653       }
654     }else{
655         //all requests are null or inactive, return true
656         flag=1;
657         smpi_empty_status(status);
658     }
659     xbt_free(map);
660     xbt_dynar_free(&comms);
661   }
662
663   return flag;
664 }
665
666
667 int smpi_mpi_testall(int count, MPI_Request requests[],
668                      MPI_Status status[])
669 {
670   MPI_Status stat;
671   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
672   int flag=1;
673   int i;
674   for(i=0; i<count; i++){
675     if(requests[i]!= MPI_REQUEST_NULL){
676       if (smpi_mpi_test(&requests[i], pstat)!=1){
677         flag=0;
678       }
679     }else{
680       smpi_empty_status(pstat);
681     }
682     if(status != MPI_STATUSES_IGNORE) {
683       status[i] = *pstat;
684     }
685   }
686   return flag;
687 }
688
689 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
690   int flag=0;
691   //FIXME find another wait to avoid busy waiting ?
692   // the issue here is that we have to wait on a nonexistent comm
693   while(flag==0){
694     smpi_mpi_iprobe(source, tag, comm, &flag, status);
695     XBT_DEBUG("Busy Waiting on probing : %d", flag);
696   }
697 }
698
699 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
700
701   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), source), smpi_comm_rank(comm), tag,
702             comm, NON_PERSISTENT | RECV);
703
704   //to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
705   double sleeptime= sg_cfg_get_double("smpi/iprobe");
706   //multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
707   static int nsleeps = 1;
708
709   simcall_process_sleep(sleeptime);
710
711   // behave like a receive, but don't do it
712   smx_rdv_t mailbox;
713
714   print_request("New iprobe", request);
715   // We have to test both mailboxes as we don't know if we will receive one one or another
716     if (sg_cfg_get_int("smpi/async_small_thres")>0){
717         mailbox = smpi_process_mailbox_small();
718         XBT_DEBUG("trying to probe the perm recv mailbox");
719         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
720     }
721     if (request->action==NULL){
722         mailbox = smpi_process_mailbox();
723         XBT_DEBUG("trying to probe the other mailbox");
724         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
725     }
726
727   if(request->action){
728     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
729     *flag = 1;
730     if(status != MPI_STATUS_IGNORE) {
731       status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(comm), req->src);
732       status->MPI_TAG = req->tag;
733       status->MPI_ERROR = MPI_SUCCESS;
734       status->count = req->real_size;
735     }
736     nsleeps=1;//reset the number of sleeps we will do next time
737   }
738   else {
739       *flag = 0;
740       nsleeps++;
741   }
742   smpi_mpi_request_free(&request);
743
744   return;
745 }
746
747 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
748 {
749   print_request("Waiting", *request);
750   if ((*request)->action != NULL) { // this is not a detached send
751     simcall_comm_wait((*request)->action, -1.0);
752   }
753   finish_wait(request, status);
754
755   // FIXME for a detached send, finish_wait is not called:
756 }
757
758 int smpi_mpi_waitany(int count, MPI_Request requests[],
759                      MPI_Status * status)
760 {
761   xbt_dynar_t comms;
762   int i, size, index;
763   int *map;
764
765   index = MPI_UNDEFINED;
766   if(count > 0) {
767     // Wait for a request to complete
768     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
769     map = xbt_new(int, count);
770     size = 0;
771     XBT_DEBUG("Wait for one of %d", count);
772     for(i = 0; i < count; i++) {
773       if(requests[i] != MPI_REQUEST_NULL) {
774         if (requests[i]->action != NULL) {
775           XBT_DEBUG("Waiting any %p ", requests[i]);
776           xbt_dynar_push(comms, &requests[i]->action);
777           map[size] = i;
778           size++;
779         }else{
780          //This is a finished detached request, let's return this one
781          size=0;//so we free the dynar but don't do the waitany call
782          index=i;
783          finish_wait(&requests[i], status);//cleanup if refcount = 0
784          requests[i]=MPI_REQUEST_NULL;//set to null
785          break;
786          }
787       }
788     }
789     if(size > 0) {
790       i = simcall_comm_waitany(comms);
791
792       // not MPI_UNDEFINED, as this is a simix return code
793       if (i != -1) {
794         index = map[i];
795         finish_wait(&requests[index], status);
796       }
797     }
798     xbt_free(map);
799     xbt_dynar_free(&comms);
800   }
801
802   if (index==MPI_UNDEFINED)
803     smpi_empty_status(status);
804
805   return index;
806 }
807
808 int smpi_mpi_waitall(int count, MPI_Request requests[],
809                       MPI_Status status[])
810 {
811   int  index, c;
812   MPI_Status stat;
813   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
814   int retvalue = MPI_SUCCESS;
815   //tag invalid requests in the set
816   if (status != MPI_STATUSES_IGNORE) {
817     for (c = 0; c < count; c++) {
818       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL) {
819         smpi_empty_status(&status[c]);
820       } else if (requests[c]->src == MPI_PROC_NULL) {
821         smpi_empty_status(&status[c]);
822         status[c].MPI_SOURCE = MPI_PROC_NULL;
823       }
824     }
825   }
826   for(c = 0; c < count; c++) {
827     if (MC_is_active()) {
828       smpi_mpi_wait(&requests[c], pstat);
829       index = c;
830     } else {
831       index = smpi_mpi_waitany(count, requests, pstat);
832       if (index == MPI_UNDEFINED)
833         break;
834       requests[index]=MPI_REQUEST_NULL;
835     }
836     if (status != MPI_STATUSES_IGNORE) {
837       status[index] = *pstat;
838       if (status[index].MPI_ERROR == MPI_ERR_TRUNCATE)
839         retvalue = MPI_ERR_IN_STATUS;
840     }
841   }
842
843   return retvalue;
844 }
845
846 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
847                       MPI_Status status[])
848 {
849   int i, count, index;
850   MPI_Status stat;
851   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
852
853   count = 0;
854   for(i = 0; i < incount; i++)
855   {
856     index=smpi_mpi_waitany(incount, requests, pstat);
857     if(index!=MPI_UNDEFINED){
858       indices[count] = index;
859       count++;
860       if(status != MPI_STATUSES_IGNORE) {
861         status[index] = *pstat;
862       }
863      requests[index]=MPI_REQUEST_NULL;
864     }else{
865       return MPI_UNDEFINED;
866     }
867   }
868   return count;
869 }
870
871 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
872                       MPI_Status status[])
873 {
874   int i, count, count_dead;
875   MPI_Status stat;
876   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
877
878   count = 0;
879   count_dead = 0;
880   for(i = 0; i < incount; i++) {
881     if((requests[i] != MPI_REQUEST_NULL)) {
882       if(smpi_mpi_test(&requests[i], pstat)) {
883          indices[count] = i;
884          count++;
885          if(status != MPI_STATUSES_IGNORE) {
886            status[i] = *pstat;
887          }
888          requests[i]=MPI_REQUEST_NULL;
889
890       }
891     }else{
892       count_dead++;
893     }
894   }
895   if(count_dead==incount)return MPI_UNDEFINED;
896   else return count;
897 }
898
899 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
900                     MPI_Comm comm)
901 {
902   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
903   nary_tree_bcast(buf, count, datatype, root, comm, 4);
904 }
905
906 void smpi_mpi_barrier(MPI_Comm comm)
907 {
908   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
909   nary_tree_barrier(comm, 4);
910 }
911
912 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
913                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
914                      int root, MPI_Comm comm)
915 {
916   int system_tag = COLL_TAG_GATHER;
917   int rank, size, src, index;
918   MPI_Aint lb = 0, recvext = 0;
919   MPI_Request *requests;
920
921   rank = smpi_comm_rank(comm);
922   size = smpi_comm_size(comm);
923   if(rank != root) {
924     // Send buffer to root
925     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
926   } else {
927     // FIXME: check for errors
928     smpi_datatype_extent(recvtype, &lb, &recvext);
929     // Local copy from root
930     smpi_datatype_copy(sendbuf, sendcount, sendtype,
931                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
932     // Receive buffers from senders
933     requests = xbt_new(MPI_Request, size - 1);
934     index = 0;
935     for(src = 0; src < size; src++) {
936       if(src != root) {
937         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
938                                           recvcount, recvtype,
939                                           src, system_tag, comm);
940         index++;
941       }
942     }
943     // Wait for completion of irecv's.
944     smpi_mpi_startall(size - 1, requests);
945     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
946     xbt_free(requests);
947   }
948 }
949
950
951 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts,
952                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
953 {
954     int i, size, count;
955     int *displs;
956     int rank = smpi_process_index();
957     void *tmpbuf;
958
959     /* arbitrarily choose root as rank 0 */
960     size = smpi_comm_size(comm);
961     count = 0;
962     displs = xbt_new(int, size);
963     for (i = 0; i < size; i++) {
964       displs[i] = count;
965       count += recvcounts[i];
966     }
967     tmpbuf=(void*)xbt_malloc(count*smpi_datatype_get_extent(datatype));
968     mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
969     smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf,
970                       recvcounts[rank], datatype, 0, comm);
971     xbt_free(displs);
972     xbt_free(tmpbuf);
973 }
974
975 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
976                       void *recvbuf, int *recvcounts, int *displs,
977                       MPI_Datatype recvtype, int root, MPI_Comm comm)
978 {
979   int system_tag = COLL_TAG_GATHERV;
980   int rank, size, src, index;
981   MPI_Aint lb = 0, recvext = 0;
982   MPI_Request *requests;
983
984   rank = smpi_comm_rank(comm);
985   size = smpi_comm_size(comm);
986   if(rank != root) {
987     // Send buffer to root
988     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
989   } else {
990     // FIXME: check for errors
991     smpi_datatype_extent(recvtype, &lb, &recvext);
992     // Local copy from root
993     smpi_datatype_copy(sendbuf, sendcount, sendtype,
994                        (char *)recvbuf + displs[root] * recvext,
995                        recvcounts[root], recvtype);
996     // Receive buffers from senders
997     requests = xbt_new(MPI_Request, size - 1);
998     index = 0;
999     for(src = 0; src < size; src++) {
1000       if(src != root) {
1001         requests[index] =
1002           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
1003                           recvcounts[src], recvtype, src, system_tag, comm);
1004         index++;
1005       }
1006     }
1007     // Wait for completion of irecv's.
1008     smpi_mpi_startall(size - 1, requests);
1009     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1010     xbt_free(requests);
1011   }
1012 }
1013
1014 void smpi_mpi_allgather(void *sendbuf, int sendcount,
1015                         MPI_Datatype sendtype, void *recvbuf,
1016                         int recvcount, MPI_Datatype recvtype,
1017                         MPI_Comm comm)
1018 {
1019   int system_tag = COLL_TAG_ALLGATHER;
1020   int rank, size, other, index;
1021   MPI_Aint lb = 0, recvext = 0;
1022   MPI_Request *requests;
1023
1024   rank = smpi_comm_rank(comm);
1025   size = smpi_comm_size(comm);
1026   // FIXME: check for errors
1027   smpi_datatype_extent(recvtype, &lb, &recvext);
1028   // Local copy from self
1029   smpi_datatype_copy(sendbuf, sendcount, sendtype,
1030                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
1031                      recvtype);
1032   // Send/Recv buffers to/from others;
1033   requests = xbt_new(MPI_Request, 2 * (size - 1));
1034   index = 0;
1035   for(other = 0; other < size; other++) {
1036     if(other != rank) {
1037       requests[index] =
1038         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
1039                         comm);
1040       index++;
1041       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
1042                                         recvcount, recvtype, other,
1043                                         system_tag, comm);
1044       index++;
1045     }
1046   }
1047   // Wait for completion of all comms.
1048   smpi_mpi_startall(2 * (size - 1), requests);
1049   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
1050   xbt_free(requests);
1051 }
1052
1053 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
1054                          MPI_Datatype sendtype, void *recvbuf,
1055                          int *recvcounts, int *displs,
1056                          MPI_Datatype recvtype, MPI_Comm comm)
1057 {
1058   int system_tag = COLL_TAG_ALLGATHERV;
1059   int rank, size, other, index;
1060   MPI_Aint lb = 0, recvext = 0;
1061   MPI_Request *requests;
1062
1063   rank = smpi_comm_rank(comm);
1064   size = smpi_comm_size(comm);
1065   // FIXME: check for errors
1066   smpi_datatype_extent(recvtype, &lb, &recvext);
1067   // Local copy from self
1068   smpi_datatype_copy(sendbuf, sendcount, sendtype,
1069                      (char *)recvbuf + displs[rank] * recvext,
1070                      recvcounts[rank], recvtype);
1071   // Send buffers to others;
1072   requests = xbt_new(MPI_Request, 2 * (size - 1));
1073   index = 0;
1074   for(other = 0; other < size; other++) {
1075     if(other != rank) {
1076       requests[index] =
1077         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
1078                         comm);
1079       index++;
1080       requests[index] =
1081         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
1082                         recvtype, other, system_tag, comm);
1083       index++;
1084     }
1085   }
1086   // Wait for completion of all comms.
1087   smpi_mpi_startall(2 * (size - 1), requests);
1088   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
1089   xbt_free(requests);
1090 }
1091
1092 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
1093                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
1094                       int root, MPI_Comm comm)
1095 {
1096   int system_tag = COLL_TAG_SCATTER;
1097   int rank, size, dst, index;
1098   MPI_Aint lb = 0, sendext = 0;
1099   MPI_Request *requests;
1100
1101   rank = smpi_comm_rank(comm);
1102   size = smpi_comm_size(comm);
1103   if(rank != root) {
1104     // Recv buffer from root
1105     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
1106                   MPI_STATUS_IGNORE);
1107   } else {
1108     // FIXME: check for errors
1109     smpi_datatype_extent(sendtype, &lb, &sendext);
1110     // Local copy from root
1111     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
1112                        sendcount, sendtype, recvbuf, recvcount, recvtype);
1113     // Send buffers to receivers
1114     requests = xbt_new(MPI_Request, size - 1);
1115     index = 0;
1116     for(dst = 0; dst < size; dst++) {
1117       if(dst != root) {
1118         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
1119                                           sendcount, sendtype, dst,
1120                                           system_tag, comm);
1121         index++;
1122       }
1123     }
1124     // Wait for completion of isend's.
1125     smpi_mpi_startall(size - 1, requests);
1126     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1127     xbt_free(requests);
1128   }
1129 }
1130
1131 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
1132                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
1133                        MPI_Datatype recvtype, int root, MPI_Comm comm)
1134 {
1135   int system_tag = COLL_TAG_SCATTERV;
1136   int rank, size, dst, index;
1137   MPI_Aint lb = 0, sendext = 0;
1138   MPI_Request *requests;
1139
1140   rank = smpi_comm_rank(comm);
1141   size = smpi_comm_size(comm);
1142   if(rank != root) {
1143     // Recv buffer from root
1144     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
1145                   MPI_STATUS_IGNORE);
1146   } else {
1147     // FIXME: check for errors
1148     smpi_datatype_extent(sendtype, &lb, &sendext);
1149     // Local copy from root
1150     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
1151                        sendtype, recvbuf, recvcount, recvtype);
1152     // Send buffers to receivers
1153     requests = xbt_new(MPI_Request, size - 1);
1154     index = 0;
1155     for(dst = 0; dst < size; dst++) {
1156       if(dst != root) {
1157         requests[index] =
1158           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
1159                           sendtype, dst, system_tag, comm);
1160         index++;
1161       }
1162     }
1163     // Wait for completion of isend's.
1164     smpi_mpi_startall(size - 1, requests);
1165     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1166     xbt_free(requests);
1167   }
1168 }
1169
1170 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
1171                      MPI_Datatype datatype, MPI_Op op, int root,
1172                      MPI_Comm comm)
1173 {
1174   int system_tag = COLL_TAG_REDUCE;
1175   int rank, size, src, index;
1176   MPI_Aint lb = 0, dataext = 0;
1177   MPI_Request *requests;
1178   void **tmpbufs;
1179
1180   char* sendtmpbuf = (char*) sendbuf;
1181   if( sendbuf == MPI_IN_PLACE ) {
1182       sendtmpbuf = (char *)recvbuf;
1183   }
1184
1185   rank = smpi_comm_rank(comm);
1186   size = smpi_comm_size(comm);
1187   //non commutative case, use a working algo from openmpi
1188   if(!smpi_op_is_commute(op)){
1189     smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count,
1190                      datatype, op, root, comm);
1191     return;
1192   }
1193   
1194   if(rank != root) {
1195     // Send buffer to root
1196     smpi_mpi_send(sendtmpbuf, count, datatype, root, system_tag, comm);
1197   } else {
1198     // FIXME: check for errors
1199     smpi_datatype_extent(datatype, &lb, &dataext);
1200     // Local copy from root
1201     if (sendtmpbuf && recvbuf)
1202       smpi_datatype_copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
1203     // Receive buffers from senders
1204     //TODO: make a MPI_barrier here ?
1205     requests = xbt_new(MPI_Request, size - 1);
1206     tmpbufs = xbt_new(void *, size - 1);
1207     index = 0;
1208     for(src = 0; src < size; src++) {
1209       if(src != root) {
1210         // FIXME: possibly overkill we we have contiguous/noncontiguous data
1211         //  mapping...
1212         tmpbufs[index] = xbt_malloc(count * dataext);
1213         requests[index] =
1214           smpi_irecv_init(tmpbufs[index], count, datatype, src,
1215                           system_tag, comm);
1216         index++;
1217       }
1218     }
1219     // Wait for completion of irecv's.
1220     smpi_mpi_startall(size - 1, requests);
1221     for(src = 0; src < size - 1; src++) {
1222       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1223       XBT_DEBUG("finished waiting any request with index %d", index);
1224       if(index == MPI_UNDEFINED) {
1225         break;
1226       }
1227       if(op) /* op can be MPI_OP_NULL that does nothing */
1228         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1229     }
1230     for(index = 0; index < size - 1; index++) {
1231       xbt_free(tmpbufs[index]);
1232     }
1233     xbt_free(tmpbufs);
1234     xbt_free(requests);
1235   }
1236 }
1237
1238 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
1239                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1240 {
1241   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
1242   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
1243 }
1244
1245 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
1246                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1247 {
1248   int system_tag = 888;
1249   int rank, size, other, index;
1250   MPI_Aint lb = 0, dataext = 0;
1251   MPI_Request *requests;
1252   void **tmpbufs;
1253
1254   rank = smpi_comm_rank(comm);
1255   size = smpi_comm_size(comm);
1256
1257   // FIXME: check for errors
1258   smpi_datatype_extent(datatype, &lb, &dataext);
1259
1260   // Local copy from self
1261   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
1262
1263   // Send/Recv buffers to/from others;
1264   requests = xbt_new(MPI_Request, size - 1);
1265   tmpbufs = xbt_new(void *, rank);
1266   index = 0;
1267   for(other = 0; other < rank; other++) {
1268     // FIXME: possibly overkill we we have contiguous/noncontiguous data
1269     // mapping...
1270     tmpbufs[index] = xbt_malloc(count * dataext);
1271     requests[index] =
1272       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
1273                       comm);
1274     index++;
1275   }
1276   for(other = rank + 1; other < size; other++) {
1277     requests[index] =
1278       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1279     index++;
1280   }
1281   // Wait for completion of all comms.
1282   smpi_mpi_startall(size - 1, requests);
1283   for(other = 0; other < size - 1; other++) {
1284     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1285     if(index == MPI_UNDEFINED) {
1286       break;
1287     }
1288     if(index < rank) {
1289       // #Request is below rank: it's a irecv
1290       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1291     }
1292   }
1293   for(index = 0; index < rank; index++) {
1294     xbt_free(tmpbufs[index]);
1295   }
1296   xbt_free(tmpbufs);
1297   xbt_free(requests);
1298 }