Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
better name for that struct, and add comments
[simgrid.git] / src / smpi / smpi_base.cpp
1 /* Copyright (c) 2007-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/virtu.h"
9 #include "mc/mc.h"
10 #include "src/mc/mc_replay.h"
11 #include "xbt/replay.h"
12 #include <errno.h>
13 #include "src/simix/smx_private.h"
14 #include "surf/surf.h"
15 #include "simgrid/sg_config.h"
16 #include "colls/colls.h"
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
19
20
21 static int match_recv(void* a, void* b, smx_synchro_t ignored) {
22    MPI_Request ref = (MPI_Request)a;
23    MPI_Request req = (MPI_Request)b;
24    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
25
26   xbt_assert(ref, "Cannot match recv against null reference");
27   xbt_assert(req, "Cannot match recv against null request");
28   if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
29     && ((ref->tag == MPI_ANY_TAG && req->tag >=0) || req->tag == ref->tag)){
30     //we match, we can transfer some values
31     // FIXME : move this to the copy function ?
32     if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
33     if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
34     if(ref->real_size < req->real_size) ref->truncated = 1;
35     if(req->detached==1){
36         ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
37     }
38     XBT_DEBUG("match succeeded");
39     return 1;
40   }else return 0;
41 }
42
43 static int match_send(void* a, void* b,smx_synchro_t ignored) {
44    MPI_Request ref = (MPI_Request)a;
45    MPI_Request req = (MPI_Request)b;
46    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
47    xbt_assert(ref, "Cannot match send against null reference");
48    xbt_assert(req, "Cannot match send against null request");
49
50    if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
51              && ((req->tag == MPI_ANY_TAG && ref->tag >=0)|| req->tag == ref->tag))
52    {
53      if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
54      if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
55      if(req->real_size < ref->real_size) req->truncated = 1;
56      if(ref->detached==1){
57          req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
58      }
59     XBT_DEBUG("match succeeded");
60      return 1;
61    } else return 0;
62 }
63
64
65 // Methods used to parse and store the values for timing injections in smpi
66 // These are taken from surf/network.c and generalized to have more values for each factor
67 typedef struct s_smpi_factor_multival *smpi_os_factor_multival_t;
68 typedef struct s_smpi_factor_multival { // FIXME: this should be merged (deduplicated) with s_smpi_factor defined in network_smpi.c
69   long factor;
70   int nb_values;
71   double values[4];//arbitrary set to 4
72 } s_smpi_factor_multival_t;
73 xbt_dynar_t smpi_os_values = NULL;
74 xbt_dynar_t smpi_or_values = NULL;
75 xbt_dynar_t smpi_ois_values = NULL;
76
77 double smpi_wtime_sleep = 0.0;
78 double smpi_iprobe_sleep = 1e-4;
79 double smpi_test_sleep = 1e-4;
80
81 static int factor_cmp(const void *pa, const void *pb)
82 {
83   return (((s_smpi_factor_multival_t*)pa)->factor > ((s_smpi_factor_multival_t*)pb)->factor) ? 1 :
84          (((s_smpi_factor_multival_t*)pa)->factor < ((s_smpi_factor_multival_t*)pb)->factor) ? -1 : 0;
85 }
86
87
88 static xbt_dynar_t parse_factor(const char *smpi_coef_string)
89 {
90   char *value = NULL;
91   unsigned int iter = 0;
92   s_smpi_factor_multival_t fact;
93   fact.nb_values=0;
94   unsigned int i=0;
95   xbt_dynar_t smpi_factor, radical_elements, radical_elements2 = NULL;
96
97   smpi_factor = xbt_dynar_new(sizeof(s_smpi_factor_multival_t), NULL);
98   radical_elements = xbt_str_split(smpi_coef_string, ";");
99   xbt_dynar_foreach(radical_elements, iter, value) {
100     memset(&fact, 0, sizeof(s_smpi_factor_multival_t));
101     radical_elements2 = xbt_str_split(value, ":");
102     if (xbt_dynar_length(radical_elements2) <2 || xbt_dynar_length(radical_elements2) > 5)
103       xbt_die("Malformed radical for smpi factor: '%s'", smpi_coef_string);
104     for(i =0; i<xbt_dynar_length(radical_elements2);i++ ){
105         if (i==0){
106            fact.factor = atol(xbt_dynar_get_as(radical_elements2, i, char *));
107         }else{
108            fact.values[fact.nb_values] = atof(xbt_dynar_get_as(radical_elements2, i, char *));
109            fact.nb_values++;
110         }
111     }
112
113     xbt_dynar_push_as(smpi_factor, s_smpi_factor_multival_t, fact);
114     XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
115     xbt_dynar_free(&radical_elements2);
116   }
117   xbt_dynar_free(&radical_elements);
118   iter=0;
119   xbt_dynar_sort(smpi_factor, &factor_cmp);
120   xbt_dynar_foreach(smpi_factor, iter, fact) {
121     XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
122   }
123   return smpi_factor;
124 }
125
126 static double smpi_os(double size)
127 {
128   if (!smpi_os_values) {
129     smpi_os_values = parse_factor(sg_cfg_get_string("smpi/os"));
130     smpi_register_static(smpi_os_values, xbt_dynar_free_voidp);
131   }
132   unsigned int iter = 0;
133   s_smpi_factor_multival_t fact;
134   double current=0.0;
135   // Iterate over all the sections that were specified and find the right
136   // value. (fact.factor represents the interval sizes; we want to find the
137   // section that has fact.factor <= size and no other such fact.factor <= size)
138   // Note: parse_factor() (used before) already sorts the dynar we iterate over!
139   xbt_dynar_foreach(smpi_os_values, iter, fact) {
140     if (size <= fact.factor) { // Values already too large, use the previously
141                                // computed value of current!
142         XBT_DEBUG("os : %f <= %ld return %f", size, fact.factor, current);
143       return current;
144     }else{
145       // If the next section is too large, the current section must be used.
146       // Hence, save the cost, as we might have to use it.
147       current = fact.values[0]+fact.values[1]*size;
148     }
149   }
150   XBT_DEBUG("os : %f > %ld return %f", size, fact.factor, current);
151
152   return current;
153 }
154
155 static double smpi_ois(double size)
156 {
157   if (!smpi_ois_values) {
158     smpi_ois_values = parse_factor(sg_cfg_get_string("smpi/ois"));
159     smpi_register_static(smpi_ois_values, xbt_dynar_free_voidp);
160   }
161   unsigned int iter = 0;
162   s_smpi_factor_multival_t fact;
163   double current=0.0;
164   // Iterate over all the sections that were specified and find the right
165   // value. (fact.factor represents the interval sizes; we want to find the
166   // section that has fact.factor <= size and no other such fact.factor <= size)
167   // Note: parse_factor() (used before) already sorts the dynar we iterate over!
168   xbt_dynar_foreach(smpi_ois_values, iter, fact) {
169     if (size <= fact.factor) { // Values already too large, use the previously
170                                // computed value of current!
171         XBT_DEBUG("ois : %f <= %ld return %f", size, fact.factor, current);
172       return current;
173     }else{
174       // If the next section is too large, the current section must be used.
175       // Hence, save the cost, as we might have to use it.
176       current = fact.values[0]+fact.values[1]*size;
177     }
178   }
179   XBT_DEBUG("ois : %f > %ld return %f", size, fact.factor, current);
180
181   return current;
182 }
183
184 static double smpi_or(double size)
185 {
186   if (!smpi_or_values) {
187     smpi_or_values = parse_factor(sg_cfg_get_string("smpi/or"));
188     smpi_register_static(smpi_or_values, xbt_dynar_free_voidp);
189   }
190   unsigned int iter = 0;
191   s_smpi_factor_multival_t fact;
192   double current=0.0;
193   // Iterate over all the sections that were specified and find the right
194   // value. (fact.factor represents the interval sizes; we want to find the
195   // section that has fact.factor <= size and no other such fact.factor <= size)
196   // Note: parse_factor() (used before) already sorts the dynar we iterate over!
197   xbt_dynar_foreach(smpi_or_values, iter, fact) {
198     if (size <= fact.factor) { // Values already too large, use the previously
199                                // computed value of current!
200         XBT_DEBUG("or : %f <= %ld return %f", size, fact.factor, current);
201       return current;
202     } else {
203       // If the next section is too large, the current section must be used.
204       // Hence, save the cost, as we might have to use it.
205       current=fact.values[0]+fact.values[1]*size;
206     }
207   }
208   XBT_DEBUG("or : %f > %ld return %f", size, fact.factor, current);
209
210   return current;
211 }
212
213 double smpi_mpi_wtime(){
214   double time;
215   if (smpi_process_initialized() && !smpi_process_finalized() && !smpi_process_get_sampling()) {
216     smpi_bench_end();
217     time = SIMIX_get_clock();
218     // to avoid deadlocks if used as a break condition, such as
219     //     while (MPI_Wtime(...) < time_limit) {
220     //       ....
221     //     }
222     // because the time will not normally advance when only calls to MPI_Wtime
223     // are made -> deadlock (MPI_Wtime never reaches the time limit)
224     if(smpi_wtime_sleep > 0) simcall_process_sleep(smpi_wtime_sleep);
225     smpi_bench_begin();
226   } else {
227     time = SIMIX_get_clock();
228   }
229   return time;
230 }
231
232 static MPI_Request build_request(void *buf, int count,
233                                  MPI_Datatype datatype, int src, int dst,
234                                  int tag, MPI_Comm comm, unsigned flags)
235 {
236   MPI_Request request = NULL;
237
238   void *old_buf = NULL;
239
240   request = xbt_new(s_smpi_mpi_request_t, 1);
241
242   s_smpi_subtype_t *subtype = static_cast<s_smpi_subtype_t*>(datatype->substruct);
243
244   if(((flags & RECV) && (flags & ACCUMULATE)) || (datatype->has_subtype == 1)){
245     // This part handles the problem of non-contiguous memory
246     old_buf = buf;
247     buf = count==0 ? NULL : xbt_malloc(count*smpi_datatype_size(datatype));
248     if ((datatype->has_subtype == 1) && (flags & SEND)) {
249       subtype->serialize(old_buf, buf, count, datatype->substruct);
250     }
251   }
252
253   request->buf = buf;
254   // This part handles the problem of non-contiguous memory (for the
255   // unserialisation at the reception)
256   request->old_buf = old_buf;
257   request->old_type = datatype;
258
259   request->size = smpi_datatype_size(datatype) * count;
260   request->src = src;
261   request->dst = dst;
262   request->tag = tag;
263   request->comm = comm;
264   request->action = NULL;
265   request->flags = flags;
266   request->detached = 0;
267   request->detached_sender = NULL;
268   request->real_src = 0;
269
270   request->truncated = 0;
271   request->real_size = 0;
272   request->real_tag = 0;
273   if(flags & PERSISTENT)
274     request->refcount = 1;
275   else
276     request->refcount = 0;
277   request->op = MPI_REPLACE;
278   request->send = 0;
279   request->recv = 0;
280   if (flags & SEND) smpi_datatype_unuse(datatype);
281
282   return request;
283 }
284
285
286 void smpi_empty_status(MPI_Status * status)
287 {
288   if(status != MPI_STATUS_IGNORE) {
289     status->MPI_SOURCE = MPI_ANY_SOURCE;
290     status->MPI_TAG = MPI_ANY_TAG;
291     status->MPI_ERROR = MPI_SUCCESS;
292     status->count=0;
293   }
294 }
295
296 static void smpi_mpi_request_free_voidp(void* request)
297 {
298   MPI_Request req = static_cast<MPI_Request>(request);
299   smpi_mpi_request_free(&req);
300 }
301
302 /* MPI Low level calls */
303 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
304                                int dst, int tag, MPI_Comm comm)
305 {
306   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
307   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
308                           comm, PERSISTENT | SEND | PREPARED);
309   return request;
310 }
311
312 MPI_Request smpi_mpi_ssend_init(void *buf, int count, MPI_Datatype datatype,
313                                int dst, int tag, MPI_Comm comm)
314 {
315   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
316   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
317                           comm, PERSISTENT | SSEND | SEND | PREPARED);
318   return request;
319 }
320
321 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
322                                int src, int tag, MPI_Comm comm)
323 {
324   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
325   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
326                           comm, PERSISTENT | RECV | PREPARED);
327   return request;
328 }
329
330 void smpi_mpi_start(MPI_Request request)
331 {
332   smx_rdv_t mailbox;
333   
334   xbt_assert(!request->action, "Cannot (re)start a non-finished communication");
335   request->flags &= ~PREPARED;
336   request->flags &= ~FINISHED;
337   request->refcount++;
338
339   if (request->flags & RECV) {
340     print_request("New recv", request);
341
342     int async_small_thresh = sg_cfg_get_int("smpi/async_small_thresh");
343
344     xbt_mutex_t mut = smpi_process_mailboxes_mutex();
345     if (async_small_thresh != 0 ||request->flags & RMA)
346       xbt_mutex_acquire(mut);
347
348     if (async_small_thresh == 0 && !(request->flags & RMA)) {
349       mailbox = smpi_process_mailbox();
350     }
351     else if (request->flags & RMA || static_cast<int>(request->size) < async_small_thresh){
352     //We have to check both mailboxes (because SSEND messages are sent to the large mbox). begin with the more appropriate one : the small one.
353       mailbox = smpi_process_mailbox_small();
354       XBT_DEBUG("Is there a corresponding send already posted in the small mailbox %p (in case of SSEND)?", mailbox);
355       smx_synchro_t action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
356     
357       if(action ==NULL){
358         mailbox = smpi_process_mailbox();
359         XBT_DEBUG("No, nothing in the small mailbox test the other one : %p", mailbox);
360         action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
361         if(action ==NULL){
362           XBT_DEBUG("Still nothing, switch back to the small mailbox : %p", mailbox);
363           mailbox = smpi_process_mailbox_small();
364           }
365       }else{
366         XBT_DEBUG("yes there was something for us in the large mailbox");
367       }
368     }else{
369       mailbox = smpi_process_mailbox_small();
370       XBT_DEBUG("Is there a corresponding send already posted the small mailbox?");
371     smx_synchro_t action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
372     
373       if(action ==NULL){
374         XBT_DEBUG("No, nothing in the permanent receive mailbox");
375         mailbox = smpi_process_mailbox();
376       }else{
377         XBT_DEBUG("yes there was something for us in the small mailbox");
378       }
379     }
380
381     //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
382     double sleeptime = request->detached ? smpi_or(request->size) : 0.0;
383     if(sleeptime!=0.0){
384         simcall_process_sleep(sleeptime);
385         XBT_DEBUG("receiving size of %zu : sleep %f ", request->size, smpi_or(request->size));
386     }
387     
388     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
389     request->real_size=request->size;
390     smpi_datatype_use(request->old_type);
391     smpi_comm_use(request->comm);
392     request->action = simcall_comm_irecv(SIMIX_process_self(), mailbox, request->buf,
393                                          &request->real_size, &match_recv,
394                                          !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
395                                          : &smpi_comm_null_copy_buffer_callback,
396                                          request, -1.0);
397         XBT_DEBUG("recv simcall posted");
398
399     if (async_small_thresh != 0 || (request->flags & RMA))
400       xbt_mutex_release(mut);
401   } else {
402
403
404     int receiver = request->dst;
405
406     int rank = request->src;
407     if (TRACE_smpi_view_internals()) {
408       TRACE_smpi_send(rank, rank, receiver,request->size);
409     }
410     print_request("New send", request);
411     
412         //if we are giving back the control to the user without waiting for completion, we have to inject timings
413     double sleeptime = 0.0;
414     if(request->detached || (request->flags & (ISEND|SSEND))){// issend should be treated as isend
415       //isend and send timings may be different
416       sleeptime = (request->flags & ISEND)? smpi_ois(request->size) : smpi_os(request->size);
417     }
418
419     if(sleeptime != 0.0){
420         simcall_process_sleep(sleeptime);
421         XBT_DEBUG("sending size of %zu : sleep %f ", request->size, smpi_os(request->size));
422     }
423
424     int async_small_thresh = sg_cfg_get_int("smpi/async_small_thresh");
425
426     xbt_mutex_t mut=smpi_process_remote_mailboxes_mutex(receiver);
427
428     if (async_small_thresh != 0 || (request->flags & RMA))
429       xbt_mutex_acquire(mut);
430
431     if (!(async_small_thresh != 0 || (request->flags & RMA))) {
432       mailbox = smpi_process_remote_mailbox(receiver);
433     }
434     else if (request->flags & RMA || static_cast<int>(request->size) < async_small_thresh) { // eager mode
435       mailbox = smpi_process_remote_mailbox(receiver);
436       XBT_DEBUG("Is there a corresponding recv already posted in the large mailbox %p?", mailbox);
437       smx_synchro_t action = simcall_comm_iprobe(mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
438       if(action ==NULL){
439        if (! (request->flags & SSEND)){
440          mailbox = smpi_process_remote_mailbox_small(receiver);
441          XBT_DEBUG("No, nothing in the large mailbox, message is to be sent on the small one %p", mailbox);
442        } else{
443          mailbox = smpi_process_remote_mailbox_small(receiver);
444          XBT_DEBUG("SSEND : Is there a corresponding recv already posted in the small mailbox %p?", mailbox);
445          action = simcall_comm_iprobe(mailbox, 1,request->dst, request->tag, &match_send, (void*)request);
446          if(action ==NULL){
447            XBT_DEBUG("No, we are first, send to large mailbox");
448            mailbox = smpi_process_remote_mailbox(receiver);
449          }
450        }
451       }else{
452         XBT_DEBUG("Yes there was something for us in the large mailbox");
453       }
454     }else{
455       mailbox = smpi_process_remote_mailbox(receiver);
456       XBT_DEBUG("Send request %p is in the large mailbox %p (buf: %p)",mailbox, request,request->buf);
457     }
458
459     void* buf = request->buf;
460     if ( (! (request->flags & SSEND)) && (static_cast<int>(request->size) < sg_cfg_get_int("smpi/send_is_detached_thresh"))) {
461       void *oldbuf = NULL;
462       request->detached = 1;
463       XBT_DEBUG("Send request %p is detached", request);
464       request->refcount++;
465       if(request->old_type->has_subtype == 0){
466         oldbuf = request->buf;
467         if (!smpi_process_get_replaying() && oldbuf && request->size!=0){
468           if((smpi_privatize_global_variables)
469             && ((char*) request->buf >= smpi_start_data_exe)
470             && ((char*)request->buf < smpi_start_data_exe + smpi_size_data_exe )){
471             XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
472             smpi_switch_data_segment(request->src);
473           }
474           buf = xbt_malloc(request->size);
475           memcpy(buf,oldbuf,request->size);
476           XBT_DEBUG("buf %p copied into %p",oldbuf,buf);
477         }
478       }
479     }
480
481     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
482     request->real_size=request->size;
483     smpi_datatype_use(request->old_type);
484     smpi_comm_use(request->comm);
485     request->action =
486       simcall_comm_isend(SIMIX_process_from_PID(request->src+1), mailbox, request->size, -1.0,
487                          buf, request->real_size,
488                          &match_send,
489                          &xbt_free_f, // how to free the userdata if a detached send fails
490                          !smpi_process_get_replaying()? &smpi_comm_copy_buffer_callback
491                          : &smpi_comm_null_copy_buffer_callback,
492                          request,
493                          // detach if msg size < eager/rdv switch limit
494                          request->detached);
495     XBT_DEBUG("send simcall posted");
496
497
498
499     /* FIXME: detached sends are not traceable (request->action == NULL) */
500     if (request->action)
501       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
502
503     if (async_small_thresh != 0 || request->flags & RMA)
504       xbt_mutex_release(mut);
505   }
506
507 }
508
509 void smpi_mpi_startall(int count, MPI_Request * requests)
510 {
511   int i;
512   if(requests==NULL) return;
513
514   for(i = 0; i < count; i++) {
515     smpi_mpi_start(requests[i]);
516   }
517 }
518
519 void smpi_mpi_request_free(MPI_Request * request)
520 {
521   if((*request) != MPI_REQUEST_NULL){
522     (*request)->refcount--;
523     if((*request)->refcount<0) xbt_die("wrong refcount");
524
525     if((*request)->refcount==0){
526         print_request("Destroying", (*request));
527         xbt_free(*request);
528         *request = MPI_REQUEST_NULL;
529     }else{
530         print_request("Decrementing", (*request));
531     }
532   }else{
533       xbt_die("freeing an already free request");
534   }
535 }
536
537
538 MPI_Request smpi_rma_send_init(void *buf, int count, MPI_Datatype datatype,
539                             int src, int dst, int tag, MPI_Comm comm, MPI_Op op)
540 {
541   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
542   if(op==MPI_OP_NULL){
543     request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf , count, datatype, src, dst, tag,
544                             comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED);
545   }else{
546     request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
547                             comm, RMA | NON_PERSISTENT | ISEND | SEND | PREPARED | ACCUMULATE);
548     request->op = op;
549   }
550   return request;
551 }
552
553 MPI_Request smpi_rma_recv_init(void *buf, int count, MPI_Datatype datatype,
554                             int src, int dst, int tag, MPI_Comm comm, MPI_Op op)
555 {
556   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
557   if(op==MPI_OP_NULL){
558     request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
559                             comm, RMA | NON_PERSISTENT | RECV | PREPARED);
560   }else{
561     request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype,  src, dst, tag,
562                             comm, RMA | NON_PERSISTENT | RECV | PREPARED | ACCUMULATE);
563     request->op = op;
564   }
565   return request;
566 }
567
568
569 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
570                             int dst, int tag, MPI_Comm comm)
571 {
572   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
573   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf , count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
574                           comm, PERSISTENT | ISEND | SEND | PREPARED);
575   return request;
576 }
577
578 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
579                            int dst, int tag, MPI_Comm comm)
580 {
581   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
582   request =  build_request(buf==MPI_BOTTOM?(void*)0:buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
583                            comm, NON_PERSISTENT | ISEND | SEND);
584   smpi_mpi_start(request);
585   return request;
586 }
587
588 MPI_Request smpi_mpi_issend(void *buf, int count, MPI_Datatype datatype,
589                            int dst, int tag, MPI_Comm comm)
590 {
591   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
592   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
593                           comm, NON_PERSISTENT | ISEND | SSEND | SEND);
594   smpi_mpi_start(request);
595   return request;
596 }
597
598 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
599                             int src, int tag, MPI_Comm comm)
600 {
601   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
602   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
603                           comm, PERSISTENT | RECV | PREPARED);
604   return request;
605 }
606
607 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
608                            int src, int tag, MPI_Comm comm)
609 {
610   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
611   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
612                           comm, NON_PERSISTENT | RECV);
613   smpi_mpi_start(request);
614   return request;
615 }
616
617 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
618                    int tag, MPI_Comm comm, MPI_Status * status)
619 {
620   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
621   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
622   smpi_mpi_wait(&request, status);
623   request = NULL;
624 }
625
626
627
628 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
629                    int tag, MPI_Comm comm)
630 {
631   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
632   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
633                           comm, NON_PERSISTENT | SEND);
634
635   smpi_mpi_start(request);
636   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
637   request = NULL;
638 }
639
640 void smpi_mpi_ssend(void *buf, int count, MPI_Datatype datatype,
641                            int dst, int tag, MPI_Comm comm)
642 {
643   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
644   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
645                 comm, NON_PERSISTENT | SSEND | SEND);
646
647   smpi_mpi_start(request);
648   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
649   request = NULL;
650 }
651
652 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
653                        int dst, int sendtag, void *recvbuf, int recvcount,
654                        MPI_Datatype recvtype, int src, int recvtag,
655                        MPI_Comm comm, MPI_Status * status)
656 {
657   MPI_Request requests[2];
658   MPI_Status stats[2];
659   int myid=smpi_process_index();
660   if ((smpi_group_index(smpi_comm_group(comm), dst) == myid) && (smpi_group_index(smpi_comm_group(comm), src) == myid)) {
661       smpi_datatype_copy(sendbuf, sendcount, sendtype,
662                                      recvbuf, recvcount, recvtype);
663       return;
664   }
665   requests[0] =
666     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
667   requests[1] =
668     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
669   smpi_mpi_startall(2, requests);
670   smpi_mpi_waitall(2, requests, stats);
671   smpi_mpi_request_free(&requests[0]);
672   smpi_mpi_request_free(&requests[1]);
673   if(status != MPI_STATUS_IGNORE) {
674     // Copy receive status
675     *status = stats[1];
676   }
677 }
678
679 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
680 {
681   return status->count / smpi_datatype_size(datatype);
682 }
683
684 static void finish_wait(MPI_Request * request, MPI_Status * status)
685 {
686   MPI_Request req = *request;
687   smpi_empty_status(status);
688
689   if(!(req->detached && req->flags & SEND)
690       && !(req->flags & PREPARED)){
691     if(status != MPI_STATUS_IGNORE) {
692       int src = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
693       status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(req->comm), src);
694       status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
695       status->MPI_ERROR = req->truncated ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
696       // this handles the case were size in receive differs from size in send
697       // FIXME: really this should just contain the count of receive-type blocks,
698       // right?
699       status->count = req->real_size;
700     }
701
702     print_request("Finishing", req);
703     MPI_Datatype datatype = req->old_type;
704
705     if((req->flags & ACCUMULATE) || (datatype->has_subtype == 1)){
706       if (!smpi_process_get_replaying()){
707         if( smpi_privatize_global_variables
708             && ((char*)req->old_buf >= smpi_start_data_exe)
709             && ((char*)req->old_buf < smpi_start_data_exe + smpi_size_data_exe )
710         ){
711             XBT_VERB("Privatization : We are unserializing to a zone in global memory - Switch data segment ");
712             smpi_switch_data_segment(smpi_process_index());
713         }
714       }
715
716       if(datatype->has_subtype == 1){
717         // This part handles the problem of non-contignous memory
718         // the unserialization at the reception
719         s_smpi_subtype_t *subtype = static_cast<s_smpi_subtype_t*>(datatype->substruct);
720         if(req->flags & RECV)
721           subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct, req->op);
722         if(req->detached == 0) free(req->buf);
723       }else if(req->flags & RECV){//apply op on contiguous buffer for accumulate
724           int n =req->real_size/smpi_datatype_size(datatype);
725           smpi_op_apply(req->op, req->buf, req->old_buf, &n, &datatype);
726       }
727     }
728     smpi_comm_unuse(req->comm);
729     smpi_datatype_unuse(datatype);
730
731   }
732
733   if (TRACE_smpi_view_internals()) {
734     if(req->flags & RECV){
735       int rank = smpi_process_index();
736       int src_traced = (req->src == MPI_ANY_SOURCE ? req->real_src : req->src);
737       TRACE_smpi_recv(rank, src_traced, rank);
738     }
739   }
740
741   if(req->detached_sender!=NULL){
742     smpi_mpi_request_free(&(req->detached_sender));
743   }
744   if(req->flags & PERSISTENT)
745     req->action = NULL;
746   req->flags |= FINISHED;
747
748   smpi_mpi_request_free(request);
749
750 }
751
752 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
753   int flag;
754
755   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
756
757   // to avoid deadlocks if used as a break condition, such as
758   //     while (MPI_Test(request, flag, status) && flag) {
759   //     }
760   // because the time will not normally advance when only calls to MPI_Test
761   // are made -> deadlock
762   // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
763   static int nsleeps = 1;
764   if(smpi_test_sleep > 0)  simcall_process_sleep(nsleeps*smpi_test_sleep);
765
766   smpi_empty_status(status);
767   flag = 1;
768   if (!((*request)->flags & PREPARED)) {
769     if ((*request)->action != NULL)
770       flag = simcall_comm_test((*request)->action);
771     if (flag) {
772       finish_wait(request, status);
773       nsleeps=1;//reset the number of sleeps we will do next time
774       if (*request != MPI_REQUEST_NULL && !((*request)->flags & PERSISTENT))
775       *request = MPI_REQUEST_NULL;
776     }else{
777       nsleeps++;
778     }
779   }
780   return flag;
781 }
782
783 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
784                      MPI_Status * status)
785 {
786   xbt_dynar_t comms;
787   int i, flag, size;
788   int* map;
789
790   *index = MPI_UNDEFINED;
791   flag = 0;
792   comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
793   map = xbt_new(int, count);
794   size = 0;
795   for(i = 0; i < count; i++) {
796     if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action &&
797         !(requests[i]->flags & PREPARED)) {
798        xbt_dynar_push(comms, &requests[i]->action);
799        map[size] = i;
800        size++;
801     }
802   }
803   if(size > 0) {
804     //multiplier to the sleeptime, to increase speed of execution, each failed testany will increase it
805     static int nsleeps = 1;
806     if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep);
807
808     i = simcall_comm_testany(comms);
809     // not MPI_UNDEFINED, as this is a simix return code
810     if(i != -1) {
811       *index = map[i];
812       finish_wait(&requests[*index], status);
813       if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags & NON_PERSISTENT))
814       requests[*index] = MPI_REQUEST_NULL;
815       flag = 1;
816       nsleeps=1;
817     }else{
818       nsleeps++;
819     }
820   }else{
821       //all requests are null or inactive, return true
822       flag=1;
823       smpi_empty_status(status);
824   }
825   xbt_free(map);
826   xbt_dynar_free(&comms);
827
828   return flag;
829 }
830
831
832 int smpi_mpi_testall(int count, MPI_Request requests[],
833                      MPI_Status status[])
834 {
835   MPI_Status stat;
836   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
837   int flag=1;
838   int i;
839   for(i=0; i<count; i++){
840     if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags & PREPARED)) {
841       if (smpi_mpi_test(&requests[i], pstat)!=1){
842         flag=0;
843       }else{
844           requests[i]=MPI_REQUEST_NULL;
845       }
846     }else{
847       smpi_empty_status(pstat);
848     }
849     if(status != MPI_STATUSES_IGNORE) {
850       status[i] = *pstat;
851     }
852   }
853   return flag;
854 }
855
856 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
857   int flag=0;
858   //FIXME find another wait to avoid busy waiting ?
859   // the issue here is that we have to wait on a nonexistent comm
860   while(flag==0){
861     smpi_mpi_iprobe(source, tag, comm, &flag, status);
862     XBT_DEBUG("Busy Waiting on probing : %d", flag);
863   }
864 }
865
866 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
867
868   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), source), smpi_comm_rank(comm), tag,
869             comm, PERSISTENT | RECV);
870
871   // to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
872   // (especially when used as a break condition, such as while(MPI_Iprobe(...)) ... )
873   // multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
874   static int nsleeps = 1;
875   if(smpi_iprobe_sleep > 0)  simcall_process_sleep(nsleeps*smpi_iprobe_sleep);
876   // behave like a receive, but don't do it
877   smx_rdv_t mailbox;
878
879   print_request("New iprobe", request);
880   // We have to test both mailboxes as we don't know if we will receive one one or another
881   if (sg_cfg_get_int("smpi/async_small_thresh")>0){
882       mailbox = smpi_process_mailbox_small();
883       XBT_DEBUG("trying to probe the perm recv mailbox");
884       request->action = simcall_comm_iprobe(mailbox, 0, request->src, request->tag, &match_recv, (void*)request);
885   }
886   if (request->action==NULL){
887   mailbox = smpi_process_mailbox();
888       XBT_DEBUG("trying to probe the other mailbox");
889       request->action = simcall_comm_iprobe(mailbox, 0, request->src,request->tag, &match_recv, (void*)request);
890   }
891
892   if (request->action){
893     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
894     *flag = 1;
895     if(status != MPI_STATUS_IGNORE && !(req->flags & PREPARED)) {
896       status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(comm), req->src);
897       status->MPI_TAG    = req->tag;
898       status->MPI_ERROR  = MPI_SUCCESS;
899       status->count      = req->real_size;
900     }
901     nsleeps=1;//reset the number of sleeps we will do next time
902   }
903   else {
904     *flag = 0;
905     nsleeps++;
906   }
907   smpi_mpi_request_free(&request);
908
909   return;
910 }
911
912 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
913 {
914   print_request("Waiting", *request);
915   if ((*request)->flags & PREPARED) {
916     smpi_empty_status(status);
917     return;
918   }
919
920   if ((*request)->action != NULL) { // this is not a detached send
921     simcall_comm_wait((*request)->action, -1.0);
922
923   if((MC_is_active() || MC_record_replay_is_active()) && (*request)->action)
924     (*request)->action->comm.dst_data = NULL; // dangling pointer : dst_data is freed with a wait, need to set it to NULL for system state comparison
925   }
926
927   finish_wait(request, status);
928   if (*request != MPI_REQUEST_NULL && ((*request)->flags & NON_PERSISTENT))
929       *request = MPI_REQUEST_NULL;
930   // FIXME for a detached send, finish_wait is not called:
931 }
932
933 int smpi_mpi_waitany(int count, MPI_Request requests[],
934                      MPI_Status * status)
935 {
936   xbt_dynar_t comms;
937   int i, size, index;
938   int *map;
939
940   index = MPI_UNDEFINED;
941   if(count > 0) {
942     // Wait for a request to complete
943     comms = xbt_dynar_new(sizeof(smx_synchro_t), NULL);
944     map = xbt_new(int, count);
945     size = 0;
946     XBT_DEBUG("Wait for one of %d", count);
947     for(i = 0; i < count; i++) {
948       if (requests[i] != MPI_REQUEST_NULL
949           && !(requests[i]->flags & PREPARED)
950           && !(requests[i]->flags & FINISHED)) {
951         if (requests[i]->action != NULL) {
952           XBT_DEBUG("Waiting any %p ", requests[i]);
953           xbt_dynar_push(comms, &requests[i]->action);
954           map[size] = i;
955           size++;
956         }else{
957          //This is a finished detached request, let's return this one
958          size=0;//so we free the dynar but don't do the waitany call
959          index=i;
960          finish_wait(&requests[i], status);//cleanup if refcount = 0
961          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
962          requests[i]=MPI_REQUEST_NULL;//set to null
963          break;
964          }
965       }
966     }
967     if(size > 0) {
968       i = simcall_comm_waitany(comms);
969
970       // not MPI_UNDEFINED, as this is a simix return code
971       if (i != -1) {
972         index = map[i];
973         finish_wait(&requests[index], status);
974         if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
975         requests[index] = MPI_REQUEST_NULL;
976       }
977     }
978     xbt_free(map);
979     xbt_dynar_free(&comms);
980   }
981
982   if (index==MPI_UNDEFINED)
983     smpi_empty_status(status);
984
985   return index;
986 }
987
988 int smpi_mpi_waitall(int count, MPI_Request requests[],
989                       MPI_Status status[])
990 {
991   int  index, c;
992   MPI_Status stat;
993   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
994   int retvalue = MPI_SUCCESS;
995   //tag invalid requests in the set
996   if (status != MPI_STATUSES_IGNORE) {
997     for (c = 0; c < count; c++) {
998       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ||
999           (requests[c]->flags & PREPARED)) {
1000         smpi_empty_status(&status[c]);
1001       } else if (requests[c]->src == MPI_PROC_NULL) {
1002         smpi_empty_status(&status[c]);
1003         status[c].MPI_SOURCE = MPI_PROC_NULL;
1004       }
1005     }
1006   }
1007   for(c = 0; c < count; c++) {
1008
1009     if (MC_is_active() || MC_record_replay_is_active()) {
1010       smpi_mpi_wait(&requests[c], pstat);
1011       index = c;
1012     } else {
1013       index = smpi_mpi_waitany(count, requests, pstat);
1014       if (index == MPI_UNDEFINED)
1015         break;
1016       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
1017       requests[index]=MPI_REQUEST_NULL;
1018     }
1019     if (status != MPI_STATUSES_IGNORE) {
1020       status[index] = *pstat;
1021       if (status[index].MPI_ERROR == MPI_ERR_TRUNCATE)
1022         retvalue = MPI_ERR_IN_STATUS;
1023     }
1024   }
1025
1026   return retvalue;
1027 }
1028
1029 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
1030                       MPI_Status status[])
1031 {
1032   int i, count, index;
1033   MPI_Status stat;
1034   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
1035
1036   count = 0;
1037   for(i = 0; i < incount; i++)
1038   {
1039     index=smpi_mpi_waitany(incount, requests, pstat);
1040     if(index!=MPI_UNDEFINED){
1041       indices[count] = index;
1042       count++;
1043       if(status != MPI_STATUSES_IGNORE) {
1044         status[index] = *pstat;
1045       }
1046      if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
1047      requests[index]=MPI_REQUEST_NULL;
1048     }else{
1049       return MPI_UNDEFINED;
1050     }
1051   }
1052   return count;
1053 }
1054
1055 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
1056                       MPI_Status status[])
1057 {
1058   int i, count, count_dead;
1059   MPI_Status stat;
1060   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
1061
1062   count = 0;
1063   count_dead = 0;
1064   for(i = 0; i < incount; i++) {
1065     if((requests[i] != MPI_REQUEST_NULL)) {
1066       if(smpi_mpi_test(&requests[i], pstat)) {
1067          indices[i] = 1;
1068          count++;
1069          if(status != MPI_STATUSES_IGNORE) {
1070            status[i] = *pstat;
1071          }
1072          if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags & NON_PERSISTENT)
1073          requests[i]=MPI_REQUEST_NULL;
1074       }
1075     }else{
1076       count_dead++;
1077     }
1078   }
1079   if(count_dead==incount)return MPI_UNDEFINED;
1080   else return count;
1081 }
1082
1083 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
1084                     MPI_Comm comm)
1085 {
1086     smpi_coll_tuned_bcast_binomial_tree(buf, count, datatype, root, comm);
1087 }
1088
1089 void smpi_mpi_barrier(MPI_Comm comm)
1090 {
1091     smpi_coll_tuned_barrier_ompi_basic_linear(comm);
1092 }
1093
1094 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
1095                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
1096                      int root, MPI_Comm comm)
1097 {
1098   int system_tag = COLL_TAG_GATHER;
1099   int rank, size, src, index;
1100   MPI_Aint lb = 0, recvext = 0;
1101   MPI_Request *requests;
1102
1103   rank = smpi_comm_rank(comm);
1104   size = smpi_comm_size(comm);
1105   if(rank != root) {
1106     // Send buffer to root
1107     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
1108   } else {
1109     // FIXME: check for errors
1110     smpi_datatype_extent(recvtype, &lb, &recvext);
1111     // Local copy from root
1112     smpi_datatype_copy(sendbuf, sendcount, sendtype,
1113                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
1114     // Receive buffers from senders
1115     requests = xbt_new(MPI_Request, size - 1);
1116     index = 0;
1117     for(src = 0; src < size; src++) {
1118       if(src != root) {
1119         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
1120                                           recvcount, recvtype,
1121                                           src, system_tag, comm);
1122         index++;
1123       }
1124     }
1125     // Wait for completion of irecv's.
1126     smpi_mpi_startall(size - 1, requests);
1127     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1128     for(src = 0; src < size-1; src++) {
1129       smpi_mpi_request_free(&requests[src]);
1130     }
1131     xbt_free(requests);
1132   }
1133 }
1134
1135
1136 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts,
1137                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1138 {
1139     int i, size, count;
1140     int *displs;
1141     int rank = smpi_process_index();
1142     void *tmpbuf;
1143
1144     /* arbitrarily choose root as rank 0 */
1145     size = smpi_comm_size(comm);
1146     count = 0;
1147     displs = xbt_new(int, size);
1148     for (i = 0; i < size; i++) {
1149       displs[i] = count;
1150       count += recvcounts[i];
1151     }
1152     tmpbuf=(void*)smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype));
1153
1154     mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
1155     smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf,
1156                       recvcounts[rank], datatype, 0, comm);
1157     xbt_free(displs);
1158     smpi_free_tmp_buffer(tmpbuf);
1159 }
1160
1161 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
1162                       void *recvbuf, int *recvcounts, int *displs,
1163                       MPI_Datatype recvtype, int root, MPI_Comm comm)
1164 {
1165   int system_tag = COLL_TAG_GATHERV;
1166   int rank, size, src, index;
1167   MPI_Aint lb = 0, recvext = 0;
1168   MPI_Request *requests;
1169
1170   rank = smpi_comm_rank(comm);
1171   size = smpi_comm_size(comm);
1172   if(rank != root) {
1173     // Send buffer to root
1174     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
1175   } else {
1176     // FIXME: check for errors
1177     smpi_datatype_extent(recvtype, &lb, &recvext);
1178     // Local copy from root
1179     smpi_datatype_copy(sendbuf, sendcount, sendtype,
1180                        (char *)recvbuf + displs[root] * recvext,
1181                        recvcounts[root], recvtype);
1182     // Receive buffers from senders
1183     requests = xbt_new(MPI_Request, size - 1);
1184     index = 0;
1185     for(src = 0; src < size; src++) {
1186       if(src != root) {
1187         requests[index] =
1188           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
1189                           recvcounts[src], recvtype, src, system_tag, comm);
1190         index++;
1191       }
1192     }
1193     // Wait for completion of irecv's.
1194     smpi_mpi_startall(size - 1, requests);
1195     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1196     for(src = 0; src < size-1; src++) {
1197       smpi_mpi_request_free(&requests[src]);
1198     }
1199     xbt_free(requests);
1200   }
1201 }
1202
1203 void smpi_mpi_allgather(void *sendbuf, int sendcount,
1204                         MPI_Datatype sendtype, void *recvbuf,
1205                         int recvcount, MPI_Datatype recvtype,
1206                         MPI_Comm comm)
1207 {
1208   int system_tag = COLL_TAG_ALLGATHER;
1209   int rank, size, other, index;
1210   MPI_Aint lb = 0, recvext = 0;
1211   MPI_Request *requests;
1212
1213   rank = smpi_comm_rank(comm);
1214   size = smpi_comm_size(comm);
1215   // FIXME: check for errors
1216   smpi_datatype_extent(recvtype, &lb, &recvext);
1217   // Local copy from self
1218   smpi_datatype_copy(sendbuf, sendcount, sendtype,
1219                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
1220                      recvtype);
1221   // Send/Recv buffers to/from others;
1222   requests = xbt_new(MPI_Request, 2 * (size - 1));
1223   index = 0;
1224   for(other = 0; other < size; other++) {
1225     if(other != rank) {
1226       requests[index] =
1227         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
1228                         comm);
1229       index++;
1230       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
1231                                         recvcount, recvtype, other,
1232                                         system_tag, comm);
1233       index++;
1234     }
1235   }
1236   // Wait for completion of all comms.
1237   smpi_mpi_startall(2 * (size - 1), requests);
1238   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
1239   for(other = 0; other < 2*(size-1); other++) {
1240     smpi_mpi_request_free(&requests[other]);
1241   }
1242   xbt_free(requests);
1243 }
1244
1245 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
1246                          MPI_Datatype sendtype, void *recvbuf,
1247                          int *recvcounts, int *displs,
1248                          MPI_Datatype recvtype, MPI_Comm comm)
1249 {
1250   int system_tag = COLL_TAG_ALLGATHERV;
1251   int rank, size, other, index;
1252   MPI_Aint lb = 0, recvext = 0;
1253   MPI_Request *requests;
1254
1255   rank = smpi_comm_rank(comm);
1256   size = smpi_comm_size(comm);
1257   // FIXME: check for errors
1258   smpi_datatype_extent(recvtype, &lb, &recvext);
1259   // Local copy from self
1260   smpi_datatype_copy(sendbuf, sendcount, sendtype,
1261                      (char *)recvbuf + displs[rank] * recvext,
1262                      recvcounts[rank], recvtype);
1263   // Send buffers to others;
1264   requests = xbt_new(MPI_Request, 2 * (size - 1));
1265   index = 0;
1266   for(other = 0; other < size; other++) {
1267     if(other != rank) {
1268       requests[index] =
1269         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
1270                         comm);
1271       index++;
1272       requests[index] =
1273         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
1274                         recvtype, other, system_tag, comm);
1275       index++;
1276     }
1277   }
1278   // Wait for completion of all comms.
1279   smpi_mpi_startall(2 * (size - 1), requests);
1280   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
1281   for(other = 0; other < 2*(size-1); other++) {
1282     smpi_mpi_request_free(&requests[other]);
1283   }
1284   xbt_free(requests);
1285 }
1286
1287 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
1288                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
1289                       int root, MPI_Comm comm)
1290 {
1291   int system_tag = COLL_TAG_SCATTER;
1292   int rank, size, dst, index;
1293   MPI_Aint lb = 0, sendext = 0;
1294   MPI_Request *requests;
1295
1296   rank = smpi_comm_rank(comm);
1297   size = smpi_comm_size(comm);
1298   if(rank != root) {
1299     // Recv buffer from root
1300     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
1301                   MPI_STATUS_IGNORE);
1302   } else {
1303     // FIXME: check for errors
1304     smpi_datatype_extent(sendtype, &lb, &sendext);
1305     // Local copy from root
1306     if(recvbuf!=MPI_IN_PLACE){
1307         smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
1308                            sendcount, sendtype, recvbuf, recvcount, recvtype);
1309     }
1310     // Send buffers to receivers
1311     requests = xbt_new(MPI_Request, size - 1);
1312     index = 0;
1313     for(dst = 0; dst < size; dst++) {
1314       if(dst != root) {
1315         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
1316                                           sendcount, sendtype, dst,
1317                                           system_tag, comm);
1318         index++;
1319       }
1320     }
1321     // Wait for completion of isend's.
1322     smpi_mpi_startall(size - 1, requests);
1323     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1324     for(dst = 0; dst < size-1; dst++) {
1325       smpi_mpi_request_free(&requests[dst]);
1326     }
1327     xbt_free(requests);
1328   }
1329 }
1330
1331 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
1332                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
1333                        MPI_Datatype recvtype, int root, MPI_Comm comm)
1334 {
1335   int system_tag = COLL_TAG_SCATTERV;
1336   int rank, size, dst, index;
1337   MPI_Aint lb = 0, sendext = 0;
1338   MPI_Request *requests;
1339
1340   rank = smpi_comm_rank(comm);
1341   size = smpi_comm_size(comm);
1342   if(rank != root) {
1343     // Recv buffer from root
1344     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
1345                   MPI_STATUS_IGNORE);
1346   } else {
1347     // FIXME: check for errors
1348     smpi_datatype_extent(sendtype, &lb, &sendext);
1349     // Local copy from root
1350     if(recvbuf!=MPI_IN_PLACE){
1351       smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
1352                        sendtype, recvbuf, recvcount, recvtype);
1353     }
1354     // Send buffers to receivers
1355     requests = xbt_new(MPI_Request, size - 1);
1356     index = 0;
1357     for(dst = 0; dst < size; dst++) {
1358       if(dst != root) {
1359         requests[index] =
1360           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
1361                           sendtype, dst, system_tag, comm);
1362         index++;
1363       }
1364     }
1365     // Wait for completion of isend's.
1366     smpi_mpi_startall(size - 1, requests);
1367     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1368     for(dst = 0; dst < size-1; dst++) {
1369       smpi_mpi_request_free(&requests[dst]);
1370     }
1371     xbt_free(requests);
1372   }
1373 }
1374
1375 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
1376                      MPI_Datatype datatype, MPI_Op op, int root,
1377                      MPI_Comm comm)
1378 {
1379   int system_tag = COLL_TAG_REDUCE;
1380   int rank, size, src, index;
1381   MPI_Aint lb = 0, dataext = 0;
1382   MPI_Request *requests;
1383   void **tmpbufs;
1384
1385
1386   char* sendtmpbuf = (char*) sendbuf;
1387   if( sendbuf == MPI_IN_PLACE ) {
1388     sendtmpbuf = (char *)smpi_get_tmp_sendbuffer(count*smpi_datatype_get_extent(datatype));
1389     smpi_datatype_copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
1390   }
1391
1392   rank = smpi_comm_rank(comm);
1393   size = smpi_comm_size(comm);
1394   //non commutative case, use a working algo from openmpi
1395   if(!smpi_op_is_commute(op)){
1396     smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count,
1397                      datatype, op, root, comm);
1398     return;
1399   }
1400   
1401   if(rank != root) {
1402     // Send buffer to root
1403     smpi_mpi_send(sendtmpbuf, count, datatype, root, system_tag, comm);
1404   } else {
1405     // FIXME: check for errors
1406     smpi_datatype_extent(datatype, &lb, &dataext);
1407     // Local copy from root
1408     if (sendtmpbuf && recvbuf)
1409       smpi_datatype_copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
1410     // Receive buffers from senders
1411     //TODO: make a MPI_barrier here ?
1412     requests = xbt_new(MPI_Request, size - 1);
1413     tmpbufs = xbt_new(void *, size - 1);
1414     index = 0;
1415     for(src = 0; src < size; src++) {
1416       if(src != root) {
1417         // FIXME: possibly overkill we we have contiguous/noncontiguous data
1418         //  mapping...
1419          if (!smpi_process_get_replaying())
1420           tmpbufs[index] = xbt_malloc(count * dataext);
1421          else
1422            tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
1423         requests[index] =
1424           smpi_irecv_init(tmpbufs[index], count, datatype, src,
1425                           system_tag, comm);
1426         index++;
1427       }
1428     }
1429     // Wait for completion of irecv's.
1430     smpi_mpi_startall(size - 1, requests);
1431     for(src = 0; src < size - 1; src++) {
1432       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1433       XBT_DEBUG("finished waiting any request with index %d", index);
1434       if(index == MPI_UNDEFINED) {
1435         break;
1436       }else{
1437         smpi_mpi_request_free(&requests[index]);
1438       }
1439       if(op) /* op can be MPI_OP_NULL that does nothing */
1440         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1441     }
1442       for(index = 0; index < size - 1; index++) {
1443         smpi_free_tmp_buffer(tmpbufs[index]);
1444       }
1445     xbt_free(tmpbufs);
1446     xbt_free(requests);
1447
1448     if( sendbuf == MPI_IN_PLACE ) {
1449       smpi_free_tmp_buffer(sendtmpbuf);
1450     }
1451   }
1452 }
1453
1454 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
1455                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1456 {
1457   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
1458   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
1459 }
1460
1461 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
1462                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1463 {
1464   int system_tag = -888;
1465   int rank, size, other, index;
1466   MPI_Aint lb = 0, dataext = 0;
1467   MPI_Request *requests;
1468   void **tmpbufs;
1469
1470   rank = smpi_comm_rank(comm);
1471   size = smpi_comm_size(comm);
1472
1473   // FIXME: check for errors
1474   smpi_datatype_extent(datatype, &lb, &dataext);
1475
1476   // Local copy from self
1477   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
1478
1479   // Send/Recv buffers to/from others;
1480   requests = xbt_new(MPI_Request, size - 1);
1481   tmpbufs = xbt_new(void *, rank);
1482   index = 0;
1483   for(other = 0; other < rank; other++) {
1484     // FIXME: possibly overkill we we have contiguous/noncontiguous data
1485     // mapping...
1486     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
1487     requests[index] =
1488       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
1489                       comm);
1490     index++;
1491   }
1492   for(other = rank + 1; other < size; other++) {
1493     requests[index] =
1494       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1495     index++;
1496   }
1497   // Wait for completion of all comms.
1498   smpi_mpi_startall(size - 1, requests);
1499
1500   if(smpi_op_is_commute(op)){
1501     for(other = 0; other < size - 1; other++) {
1502       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1503       if(index == MPI_UNDEFINED) {
1504         break;
1505       }
1506       if(index < rank) {
1507         // #Request is below rank: it's a irecv
1508         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1509       }
1510     }
1511   }else{
1512     //non commutative case, wait in order
1513     for(other = 0; other < size - 1; other++) {
1514       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
1515       if(index < rank) {
1516         smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
1517       }
1518     }
1519   }
1520   for(index = 0; index < rank; index++) {
1521     smpi_free_tmp_buffer(tmpbufs[index]);
1522   }
1523   for(index = 0; index < size-1; index++) {
1524     smpi_mpi_request_free(&requests[index]);
1525   }
1526   xbt_free(tmpbufs);
1527   xbt_free(requests);
1528 }
1529
1530 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count,
1531                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1532 {
1533   int system_tag = -888;
1534   int rank, size, other, index;
1535   MPI_Aint lb = 0, dataext = 0;
1536   MPI_Request *requests;
1537   void **tmpbufs;
1538   int recvbuf_is_empty=1;
1539   rank = smpi_comm_rank(comm);
1540   size = smpi_comm_size(comm);
1541
1542   // FIXME: check for errors
1543   smpi_datatype_extent(datatype, &lb, &dataext);
1544
1545   // Send/Recv buffers to/from others;
1546   requests = xbt_new(MPI_Request, size - 1);
1547   tmpbufs = xbt_new(void *, rank);
1548   index = 0;
1549   for(other = 0; other < rank; other++) {
1550     // FIXME: possibly overkill we we have contiguous/noncontiguous data
1551     // mapping...
1552     tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext);
1553     requests[index] =
1554       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
1555                       comm);
1556     index++;
1557   }
1558   for(other = rank + 1; other < size; other++) {
1559     requests[index] =
1560       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1561     index++;
1562   }
1563   // Wait for completion of all comms.
1564   smpi_mpi_startall(size - 1, requests);
1565   if(smpi_op_is_commute(op)){
1566     for(other = 0; other < size - 1; other++) {
1567       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1568       if(index == MPI_UNDEFINED) {
1569         break;
1570       }
1571       if(index < rank) {
1572         if(recvbuf_is_empty){
1573           smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
1574           recvbuf_is_empty=0;
1575         }else
1576         // #Request is below rank: it's a irecv
1577         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1578       }
1579     }
1580   }else{
1581     //non commutative case, wait in order
1582     for(other = 0; other < size - 1; other++) {
1583       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
1584       if(index < rank) {
1585           if(recvbuf_is_empty){
1586             smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
1587             recvbuf_is_empty=0;
1588           }else smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
1589       }
1590     }
1591   }
1592   for(index = 0; index < rank; index++) {
1593     smpi_free_tmp_buffer(tmpbufs[index]);
1594   }
1595   for(index = 0; index < size-1; index++) {
1596     smpi_mpi_request_free(&requests[index]);
1597   }
1598   xbt_free(tmpbufs);
1599   xbt_free(requests);
1600 }