Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/virtu.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14 #include "simgrid/sg_config.h"
15 #include "colls/colls.h"
16
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
18
19
20 static int match_recv(void* a, void* b, smx_action_t ignored) {
21    MPI_Request ref = (MPI_Request)a;
22    MPI_Request req = (MPI_Request)b;
23    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
24
25   xbt_assert(ref, "Cannot match recv against null reference");
26   xbt_assert(req, "Cannot match recv against null request");
27   if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
28     && ((ref->tag == MPI_ANY_TAG && req->tag >=0) || req->tag == ref->tag)){
29     //we match, we can transfer some values
30     // FIXME : move this to the copy function ?
31     if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
32     if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
33     if(ref->real_size < req->real_size) ref->truncated = 1;
34     if(req->detached==1){
35         ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
36     }
37     XBT_DEBUG("match succeeded");
38     return 1;
39   }else return 0;
40 }
41
42 static int match_send(void* a, void* b,smx_action_t ignored) {
43    MPI_Request ref = (MPI_Request)a;
44    MPI_Request req = (MPI_Request)b;
45    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
46    xbt_assert(ref, "Cannot match send against null reference");
47    xbt_assert(req, "Cannot match send against null request");
48
49    if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
50              && ((req->tag == MPI_ANY_TAG && ref->tag >=0)|| req->tag == ref->tag))
51    {
52      if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
53      if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
54      if(req->real_size < ref->real_size) req->truncated = 1;
55      if(ref->detached==1){
56          req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
57      }
58     XBT_DEBUG("match succeeded");
59      return 1;
60    } else return 0;
61 }
62
63
64 typedef struct s_smpi_factor *smpi_factor_t;
65 typedef struct s_smpi_factor {
66   long factor;
67   int nb_values;
68   double values[4];//arbitrary set to 4
69 } s_smpi_factor_t;
70 xbt_dynar_t smpi_os_values = NULL;
71 xbt_dynar_t smpi_or_values = NULL;
72 xbt_dynar_t smpi_ois_values = NULL;
73
74 // Methods used to parse and store the values for timing injections in smpi
75 // These are taken from surf/network.c and generalized to have more factors
76 // These methods should be merged with those in surf/network.c (moved somewhere in xbt ?)
77
78 static int factor_cmp(const void *pa, const void *pb)
79 {
80   return (((s_smpi_factor_t*)pa)->factor > ((s_smpi_factor_t*)pb)->factor) ? 1 :
81          (((s_smpi_factor_t*)pa)->factor < ((s_smpi_factor_t*)pb)->factor) ? -1 : 0;
82 }
83
84
85 static xbt_dynar_t parse_factor(const char *smpi_coef_string)
86 {
87   char *value = NULL;
88   unsigned int iter = 0;
89   s_smpi_factor_t fact;
90   fact.nb_values=0;
91   int i=0;
92   xbt_dynar_t smpi_factor, radical_elements, radical_elements2 = NULL;
93
94   smpi_factor = xbt_dynar_new(sizeof(s_smpi_factor_t), NULL);
95   radical_elements = xbt_str_split(smpi_coef_string, ";");
96   xbt_dynar_foreach(radical_elements, iter, value) {
97     memset(&fact, 0, sizeof(s_smpi_factor_t));
98     radical_elements2 = xbt_str_split(value, ":");
99     if (xbt_dynar_length(radical_elements2) <2 || xbt_dynar_length(radical_elements2) > 5)
100       xbt_die("Malformed radical for smpi factor!");
101     for(i =0; i<xbt_dynar_length(radical_elements2);i++ ){
102         if (i==0){
103            fact.factor = atol(xbt_dynar_get_as(radical_elements2, i, char *));
104         }else{
105            fact.values[fact.nb_values] = atof(xbt_dynar_get_as(radical_elements2, i, char *));
106            fact.nb_values++;
107         }
108     }
109
110     xbt_dynar_push_as(smpi_factor, s_smpi_factor_t, fact);
111     XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
112     xbt_dynar_free(&radical_elements2);
113   }
114   xbt_dynar_free(&radical_elements);
115   iter=0;
116   xbt_dynar_sort(smpi_factor, &factor_cmp);
117   xbt_dynar_foreach(smpi_factor, iter, fact) {
118     XBT_DEBUG("smpi_factor:\t%ld : %d values, first: %f", fact.factor, fact.nb_values ,fact.values[0]);
119   }
120   return smpi_factor;
121 }
122
123 static double smpi_os(double size)
124 {
125   if (!smpi_os_values) {
126     smpi_os_values = parse_factor(sg_cfg_get_string("smpi/os"));
127     smpi_register_static(smpi_os_values, xbt_dynar_free_voidp);
128   }
129   unsigned int iter = 0;
130   s_smpi_factor_t fact;
131   double current=0.0;
132   xbt_dynar_foreach(smpi_os_values, iter, fact) {
133     if (size <= fact.factor) {
134         XBT_DEBUG("os : %f <= %ld return %f", size, fact.factor, current);
135       return current;
136     }else{
137       current=fact.values[0]+fact.values[1]*size;
138     }
139   }
140   XBT_DEBUG("os : %f > %ld return %f", size, fact.factor, current);
141
142   return current;
143 }
144
145 static double smpi_ois(double size)
146 {
147   if (!smpi_ois_values) {
148     smpi_ois_values = parse_factor(sg_cfg_get_string("smpi/ois"));
149     smpi_register_static(smpi_ois_values, xbt_dynar_free_voidp);
150   }
151   unsigned int iter = 0;
152   s_smpi_factor_t fact;
153   double current=0.0;
154   xbt_dynar_foreach(smpi_ois_values, iter, fact) {
155     if (size <= fact.factor) {
156         XBT_DEBUG("ois : %f <= %ld return %f", size, fact.factor, current);
157       return current;
158     }else{
159       current=fact.values[0]+fact.values[1]*size;
160     }
161   }
162   XBT_DEBUG("ois : %f > %ld return %f", size, fact.factor, current);
163
164   return current;
165 }
166
167 static double smpi_or(double size)
168 {
169   if (!smpi_or_values) {
170     smpi_or_values = parse_factor(sg_cfg_get_string("smpi/or"));
171     smpi_register_static(smpi_or_values, xbt_dynar_free_voidp);
172   }
173   unsigned int iter = 0;
174   s_smpi_factor_t fact;
175   double current=0.0;
176   xbt_dynar_foreach(smpi_or_values, iter, fact) {
177     if (size <= fact.factor) {
178         XBT_DEBUG("or : %f <= %ld return %f", size, fact.factor, current);
179       return current;
180     }else
181       current=fact.values[0]+fact.values[1]*size;
182   }
183   XBT_DEBUG("or : %f > %ld return %f", size, fact.factor, current);
184
185   return current;
186 }
187
188 static MPI_Request build_request(void *buf, int count,
189                                  MPI_Datatype datatype, int src, int dst,
190                                  int tag, MPI_Comm comm, unsigned flags)
191 {
192   MPI_Request request = NULL;
193
194   void *old_buf = NULL;
195
196   request = xbt_new(s_smpi_mpi_request_t, 1);
197
198   s_smpi_subtype_t *subtype = datatype->substruct;
199
200   if(datatype->has_subtype == 1){
201     // This part handles the problem of non-contiguous memory
202     old_buf = buf;
203     buf = count==0 ? NULL : xbt_malloc(count*smpi_datatype_size(datatype));
204     if (flags & SEND) {
205       subtype->serialize(old_buf, buf, count, datatype->substruct);
206     }
207   }
208
209   request->buf = buf;
210   // This part handles the problem of non-contiguous memory (for the
211   // unserialisation at the reception)
212   request->old_buf = old_buf;
213   request->old_type = datatype;
214
215   request->size = smpi_datatype_size(datatype) * count;
216   request->src = src;
217   request->dst = dst;
218   request->tag = tag;
219   request->comm = comm;
220   request->action = NULL;
221   request->flags = flags;
222   request->detached = 0;
223   request->detached_sender = NULL;
224   request->real_src = 0;
225
226   request->truncated = 0;
227   request->real_size = 0;
228   request->real_tag = 0;
229   if(flags & PERSISTENT)
230     request->refcount = 1;
231   else
232     request->refcount = 0;
233
234 #ifdef HAVE_TRACING
235   request->send = 0;
236   request->recv = 0;
237 #endif
238   if (flags & SEND) smpi_datatype_unuse(datatype);
239
240   return request;
241 }
242
243
244 void smpi_empty_status(MPI_Status * status)
245 {
246   if(status != MPI_STATUS_IGNORE) {
247     status->MPI_SOURCE = MPI_ANY_SOURCE;
248     status->MPI_TAG = MPI_ANY_TAG;
249     status->MPI_ERROR = MPI_SUCCESS;
250     status->count=0;
251   }
252 }
253
254 void smpi_action_trace_run(char *path)
255 {
256   char *name;
257   xbt_dynar_t todo;
258   xbt_dict_cursor_t cursor;
259
260   action_fp=NULL;
261   if (path) {
262     action_fp = fopen(path, "r");
263     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
264                strerror(errno));
265   }
266
267   if (!xbt_dict_is_empty(action_queues)) {
268     XBT_WARN
269       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
270
271
272     xbt_dict_foreach(action_queues, cursor, name, todo) {
273       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
274     }
275   }
276
277   if (path)
278     fclose(action_fp);
279   xbt_dict_free(&action_queues);
280   action_queues = xbt_dict_new_homogeneous(NULL);
281 }
282
283 static void smpi_mpi_request_free_voidp(void* request)
284 {
285   MPI_Request req = request;
286   smpi_mpi_request_free(&req);
287 }
288
289 /* MPI Low level calls */
290 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
291                                int dst, int tag, MPI_Comm comm)
292 {
293   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
294   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
295                           comm, PERSISTENT | SEND | PREPARED);
296   return request;
297 }
298
299 MPI_Request smpi_mpi_ssend_init(void *buf, int count, MPI_Datatype datatype,
300                                int dst, int tag, MPI_Comm comm)
301 {
302   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
303   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
304                           comm, PERSISTENT | SSEND | SEND | PREPARED);
305   return request;
306 }
307
308 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
309                                int src, int tag, MPI_Comm comm)
310 {
311   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
312   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
313                           comm, PERSISTENT | RECV | PREPARED);
314   return request;
315 }
316
317 void smpi_mpi_start(MPI_Request request)
318 {
319   smx_rdv_t mailbox;
320
321   xbt_assert(!request->action, "Cannot (re)start a non-finished communication");
322   request->flags &= ~PREPARED;
323   request->flags &= ~FINISHED;
324   request->refcount++;
325
326   if (request->flags & RECV) {
327     print_request("New recv", request);
328     //FIXME: if receive is posted with a large size, but send is smaller, mailboxes may not match !
329     if (request->size < sg_cfg_get_int("smpi/async_small_thres"))
330       mailbox = smpi_process_mailbox_small();
331     else
332       mailbox = smpi_process_mailbox();
333     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
334     request->real_size=request->size;
335     smpi_datatype_use(request->old_type);
336     smpi_comm_use(request->comm);
337     request->action = simcall_comm_irecv(mailbox, request->buf,
338                                          &request->real_size, &match_recv,
339                                          request, -1.0);
340
341     //integrate pseudo-timing for buffering of small messages, do not bother to execute the simcall if 0
342     double sleeptime = request->detached ? smpi_or(request->size) : 0.0;
343     if(sleeptime!=0.0){
344         simcall_process_sleep(sleeptime);
345         XBT_DEBUG("receiving size of %zu : sleep %f ", request->size, smpi_or(request->size));
346     }
347
348   } else {
349
350
351     int receiver = request->dst;//smpi_group_index(smpi_comm_group(request->comm), request->dst);
352
353     #ifdef HAVE_TRACING
354       int rank = smpi_process_index();
355       if (TRACE_smpi_view_internals()) {
356         TRACE_smpi_send(rank, rank, receiver,request->size);
357       }
358     #endif
359 /*    if(receiver == MPI_UNDEFINED) {*/
360 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
361 /*      return;*/
362 /*    }*/
363     print_request("New send", request);
364     if (request->size < sg_cfg_get_int("smpi/async_small_thres")) { // eager mode
365       mailbox = smpi_process_remote_mailbox_small(receiver);
366     }else{
367       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
368       mailbox = smpi_process_remote_mailbox(receiver);
369     }
370
371     void* buf = request->buf;
372     if ( (! (request->flags & SSEND)) && (request->size < sg_cfg_get_int("smpi/send_is_detached_thres"))) {
373       void *oldbuf = NULL;
374       request->detached = 1;
375       request->refcount++;
376       if(request->old_type->has_subtype == 0){
377         oldbuf = request->buf;
378         if (!_xbt_replay_is_active() && oldbuf && request->size!=0){
379           if((smpi_privatize_global_variables)
380               && ((char*)request->buf >= start_data_exe)
381               && ((char*)request->buf < start_data_exe + size_data_exe )){
382             XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
383             switch_data_segment(smpi_process_index());
384           }
385           buf = xbt_malloc(request->size);
386           memcpy(buf,oldbuf,request->size);
387         }
388       }
389       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
390     }
391
392     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
393     request->real_size=request->size;
394     smpi_datatype_use(request->old_type);
395     smpi_comm_use(request->comm);
396
397     //if we are giving back the control to the user without waiting for completion, we have to inject timings
398     double sleeptime = 0.0;
399     if(request->detached || (request->flags & (ISEND|SSEND))){// issend should be treated as isend
400       //isend and send timings may be different
401       sleeptime = (request->flags & ISEND)? smpi_ois(request->size) : smpi_os(request->size);
402     }
403
404     if(sleeptime != 0.0){
405         simcall_process_sleep(sleeptime);
406         XBT_DEBUG("sending size of %zu : sleep %f ", request->size, smpi_os(request->size));
407     }
408
409     request->action =
410       simcall_comm_isend(mailbox, request->size, -1.0,
411                          buf, request->real_size,
412                          &match_send,
413                          &xbt_free, // how to free the userdata if a detached send fails
414                          request,
415                          // detach if msg size < eager/rdv switch limit
416                          request->detached);
417
418 #ifdef HAVE_TRACING
419     /* FIXME: detached sends are not traceable (request->action == NULL) */
420     if (request->action)
421       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
422
423 #endif
424
425   }
426
427 }
428
429 void smpi_mpi_startall(int count, MPI_Request * requests)
430 {
431   int i;
432   if(requests==NULL) return;
433
434   for(i = 0; i < count; i++) {
435     smpi_mpi_start(requests[i]);
436   }
437 }
438
439 void smpi_mpi_request_free(MPI_Request * request)
440 {
441   if((*request) != MPI_REQUEST_NULL){
442     (*request)->refcount--;
443     if((*request)->refcount<0) xbt_die("wrong refcount");
444
445     if((*request)->refcount==0){
446         print_request("Destroying", (*request));
447         xbt_free(*request);
448         *request = MPI_REQUEST_NULL;
449     }else{
450         print_request("Decrementing", (*request));
451     }
452   }else{
453       xbt_die("freeing an already free request");
454   }
455 }
456
457 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
458                             int dst, int tag, MPI_Comm comm)
459 {
460   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
461   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf , count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
462                           comm, PERSISTENT | ISEND | SEND | PREPARED);
463   return request;
464 }
465
466 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
467                            int dst, int tag, MPI_Comm comm)
468 {
469   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
470   request =  build_request(buf==MPI_BOTTOM?(void*)0:buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
471                            comm, NON_PERSISTENT | ISEND | SEND);
472   smpi_mpi_start(request);
473   return request;
474 }
475
476 MPI_Request smpi_mpi_issend(void *buf, int count, MPI_Datatype datatype,
477                            int dst, int tag, MPI_Comm comm)
478 {
479   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
480   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
481                           comm, NON_PERSISTENT | ISEND | SSEND | SEND);
482   smpi_mpi_start(request);
483   return request;
484 }
485
486
487
488 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
489                             int src, int tag, MPI_Comm comm)
490 {
491   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
492   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
493                           comm, PERSISTENT | RECV | PREPARED);
494   return request;
495 }
496
497 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
498                            int src, int tag, MPI_Comm comm)
499 {
500   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
501   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, src == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), src), smpi_process_index(), tag,
502                           comm, NON_PERSISTENT | RECV);
503   smpi_mpi_start(request);
504   return request;
505 }
506
507 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
508                    int tag, MPI_Comm comm, MPI_Status * status)
509 {
510   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
511   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
512   smpi_mpi_wait(&request, status);
513   request = NULL;
514 }
515
516
517
518 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
519                    int tag, MPI_Comm comm)
520 {
521   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
522   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
523                           comm, NON_PERSISTENT | SEND);
524
525   smpi_mpi_start(request);
526   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
527   request = NULL;
528 }
529
530 void smpi_mpi_ssend(void *buf, int count, MPI_Datatype datatype,
531                            int dst, int tag, MPI_Comm comm)
532 {
533   MPI_Request request = NULL; /* MC needs the comm to be set to NULL during the call */
534   request = build_request(buf==MPI_BOTTOM ? (void*)0 : buf, count, datatype, smpi_process_index(), smpi_group_index(smpi_comm_group(comm), dst), tag,
535                 comm, NON_PERSISTENT | SSEND | SEND);
536
537   smpi_mpi_start(request);
538   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
539   request = NULL;
540 }
541
542 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
543                        int dst, int sendtag, void *recvbuf, int recvcount,
544                        MPI_Datatype recvtype, int src, int recvtag,
545                        MPI_Comm comm, MPI_Status * status)
546 {
547   MPI_Request requests[2];
548   MPI_Status stats[2];
549   int myid=smpi_process_index();
550   if ((smpi_group_index(smpi_comm_group(comm), dst) == myid) && (smpi_group_index(smpi_comm_group(comm), src) == myid)) {
551       smpi_datatype_copy(sendbuf, sendcount, sendtype,
552                                      recvbuf, recvcount, recvtype);
553       return;
554   }
555   requests[0] =
556     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
557   requests[1] =
558     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
559   smpi_mpi_startall(2, requests);
560   smpi_mpi_waitall(2, requests, stats);
561   smpi_mpi_request_free(&requests[0]);
562   smpi_mpi_request_free(&requests[1]);
563   if(status != MPI_STATUS_IGNORE) {
564     // Copy receive status
565     *status = stats[1];
566   }
567 }
568
569 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
570 {
571   return status->count / smpi_datatype_size(datatype);
572 }
573
574 static void finish_wait(MPI_Request * request, MPI_Status * status)
575 {
576   MPI_Request req = *request;
577   smpi_empty_status(status);
578
579   if(!(req->detached && req->flags & SEND)
580       && !(req->flags & PREPARED)){
581     if(status != MPI_STATUS_IGNORE) {
582       int src = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
583       status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(req->comm), src);
584       status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
585       status->MPI_ERROR = req->truncated ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
586       // this handles the case were size in receive differs from size in send
587       // FIXME: really this should just contain the count of receive-type blocks,
588       // right?
589       status->count = req->real_size;
590     }
591
592     print_request("Finishing", req);
593     MPI_Datatype datatype = req->old_type;
594
595     if(datatype->has_subtype == 1){
596       if (!_xbt_replay_is_active()){
597         if( smpi_privatize_global_variables
598             && ((char*)req->old_buf >= start_data_exe)
599             && ((char*)req->old_buf < start_data_exe + size_data_exe )
600         ){
601             XBT_VERB("Privatization : We are unserializing to a zone in global memory - Switch data segment ");
602             switch_data_segment(smpi_process_index());
603         }
604       }
605       // This part handles the problem of non-contignous memory
606       // the unserialization at the reception
607       s_smpi_subtype_t *subtype = datatype->substruct;
608       if(req->flags & RECV) {
609         subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
610       }
611       if(req->detached == 0) free(req->buf);
612     }
613     smpi_comm_unuse(req->comm);
614     smpi_datatype_unuse(datatype);
615
616   }
617
618 #ifdef HAVE_TRACING
619   if (TRACE_smpi_view_internals()) {
620     if(req->flags & RECV){
621       int rank = smpi_process_index();
622       int src_traced = (req->src == MPI_ANY_SOURCE ? req->real_src : req->src);
623       TRACE_smpi_recv(rank, src_traced, rank);
624     }
625   }
626 #endif
627
628   if(req->detached_sender!=NULL){
629     smpi_mpi_request_free(&(req->detached_sender));
630   }
631   if(req->flags & PERSISTENT)
632     req->action = NULL;
633   req->flags |= FINISHED;
634
635   smpi_mpi_request_free(request);
636
637 }
638
639 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
640   int flag;
641
642   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
643   smpi_empty_status(status);
644   flag = 1;
645   if (!((*request)->flags & PREPARED)) {
646     if ((*request)->action != NULL)
647       flag = simcall_comm_test((*request)->action);
648     if (flag) {
649       finish_wait(request, status);
650       if (*request != MPI_REQUEST_NULL && !((*request)->flags & PERSISTENT))
651       *request = MPI_REQUEST_NULL;
652     }
653   }
654   return flag;
655 }
656
657 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
658                      MPI_Status * status)
659 {
660   xbt_dynar_t comms;
661   int i, flag, size;
662   int* map;
663
664   *index = MPI_UNDEFINED;
665   flag = 0;
666   comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
667   map = xbt_new(int, count);
668   size = 0;
669   for(i = 0; i < count; i++) {
670     if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->action &&
671         !(requests[i]->flags & PREPARED)) {
672        xbt_dynar_push(comms, &requests[i]->action);
673        map[size] = i;
674        size++;
675     }
676   }
677   if(size > 0) {
678     i = simcall_comm_testany(comms);
679     // not MPI_UNDEFINED, as this is a simix return code
680     if(i != -1) {
681       *index = map[i];
682       finish_wait(&requests[*index], status);
683       if (requests[*index] != MPI_REQUEST_NULL && (requests[*index]->flags & NON_PERSISTENT))
684       requests[*index] = MPI_REQUEST_NULL;
685       flag = 1;
686     }
687   }else{
688       //all requests are null or inactive, return true
689       flag=1;
690       smpi_empty_status(status);
691   }
692   xbt_free(map);
693   xbt_dynar_free(&comms);
694
695   return flag;
696 }
697
698
699 int smpi_mpi_testall(int count, MPI_Request requests[],
700                      MPI_Status status[])
701 {
702   MPI_Status stat;
703   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
704   int flag=1;
705   int i;
706   for(i=0; i<count; i++){
707     if (requests[i] != MPI_REQUEST_NULL && !(requests[i]->flags & PREPARED)) {
708       if (smpi_mpi_test(&requests[i], pstat)!=1){
709         flag=0;
710       }else{
711           requests[i]=MPI_REQUEST_NULL;
712       }
713     }else{
714       smpi_empty_status(pstat);
715     }
716     if(status != MPI_STATUSES_IGNORE) {
717       status[i] = *pstat;
718     }
719   }
720   return flag;
721 }
722
723 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
724   int flag=0;
725   //FIXME find another wait to avoid busy waiting ?
726   // the issue here is that we have to wait on a nonexistent comm
727   while(flag==0){
728     smpi_mpi_iprobe(source, tag, comm, &flag, status);
729     XBT_DEBUG("Busy Waiting on probing : %d", flag);
730   }
731 }
732
733 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
734
735   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : smpi_group_index(smpi_comm_group(comm), source), smpi_comm_rank(comm), tag,
736             comm, PERSISTENT | RECV);
737
738   //to avoid deadlock, we have to sleep some time here, or the timer won't advance and we will only do iprobe simcalls
739   double sleeptime= sg_cfg_get_double("smpi/iprobe");
740   //multiplier to the sleeptime, to increase speed of execution, each failed iprobe will increase it
741   static int nsleeps = 1;
742
743   simcall_process_sleep(sleeptime);
744
745   // behave like a receive, but don't do it
746   smx_rdv_t mailbox;
747
748   print_request("New iprobe", request);
749   // We have to test both mailboxes as we don't know if we will receive one one or another
750     if (sg_cfg_get_int("smpi/async_small_thres")>0){
751         mailbox = smpi_process_mailbox_small();
752         XBT_DEBUG("trying to probe the perm recv mailbox");
753         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
754     }
755     if (request->action==NULL){
756         mailbox = smpi_process_mailbox();
757         XBT_DEBUG("trying to probe the other mailbox");
758         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
759     }
760
761   if(request->action){
762     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
763     *flag = 1;
764     if(status != MPI_STATUS_IGNORE && !(req->flags & PREPARED)) {
765       status->MPI_SOURCE = smpi_group_rank(smpi_comm_group(comm), req->src);
766       status->MPI_TAG = req->tag;
767       status->MPI_ERROR = MPI_SUCCESS;
768       status->count = req->real_size;
769     }
770     nsleeps=1;//reset the number of sleeps we will do next time
771   }
772   else {
773       *flag = 0;
774       nsleeps++;
775   }
776   smpi_mpi_request_free(&request);
777
778   return;
779 }
780
781 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
782 {
783   print_request("Waiting", *request);
784   if ((*request)->flags & PREPARED) {
785     smpi_empty_status(status);
786     return;
787   }
788
789   if ((*request)->action != NULL) { // this is not a detached send
790     simcall_comm_wait((*request)->action, -1.0);
791 #ifdef HAVE_MC
792   if(MC_is_active() && (*request)->action)
793     (*request)->action->comm.dst_data = NULL; // dangling pointer : dst_data is freed with a wait, need to set it to NULL for system state comparison
794 #endif
795   }
796
797   finish_wait(request, status);
798   if (*request != MPI_REQUEST_NULL && ((*request)->flags & NON_PERSISTENT))
799       *request = MPI_REQUEST_NULL;
800   // FIXME for a detached send, finish_wait is not called:
801 }
802
803 int smpi_mpi_waitany(int count, MPI_Request requests[],
804                      MPI_Status * status)
805 {
806   xbt_dynar_t comms;
807   int i, size, index;
808   int *map;
809
810   index = MPI_UNDEFINED;
811   if(count > 0) {
812     // Wait for a request to complete
813     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
814     map = xbt_new(int, count);
815     size = 0;
816     XBT_DEBUG("Wait for one of %d", count);
817     for(i = 0; i < count; i++) {
818       if (requests[i] != MPI_REQUEST_NULL
819           && !(requests[i]->flags & PREPARED)
820           && !(requests[i]->flags & FINISHED)) {
821         if (requests[i]->action != NULL) {
822           XBT_DEBUG("Waiting any %p ", requests[i]);
823           xbt_dynar_push(comms, &requests[i]->action);
824           map[size] = i;
825           size++;
826         }else{
827          //This is a finished detached request, let's return this one
828          size=0;//so we free the dynar but don't do the waitany call
829          index=i;
830          finish_wait(&requests[i], status);//cleanup if refcount = 0
831          if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
832          requests[i]=MPI_REQUEST_NULL;//set to null
833          break;
834          }
835       }
836     }
837     if(size > 0) {
838       i = simcall_comm_waitany(comms);
839
840       // not MPI_UNDEFINED, as this is a simix return code
841       if (i != -1) {
842         index = map[i];
843         finish_wait(&requests[index], status);
844         if (requests[i] != MPI_REQUEST_NULL && (requests[i]->flags & NON_PERSISTENT))
845         requests[index] = MPI_REQUEST_NULL;
846       }
847     }
848     xbt_free(map);
849     xbt_dynar_free(&comms);
850   }
851
852   if (index==MPI_UNDEFINED)
853     smpi_empty_status(status);
854
855   return index;
856 }
857
858 int smpi_mpi_waitall(int count, MPI_Request requests[],
859                       MPI_Status status[])
860 {
861   int  index, c;
862   MPI_Status stat;
863   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
864   int retvalue = MPI_SUCCESS;
865   //tag invalid requests in the set
866   if (status != MPI_STATUSES_IGNORE) {
867     for (c = 0; c < count; c++) {
868       if (requests[c] == MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ||
869           (requests[c]->flags & PREPARED)) {
870         smpi_empty_status(&status[c]);
871       } else if (requests[c]->src == MPI_PROC_NULL) {
872         smpi_empty_status(&status[c]);
873         status[c].MPI_SOURCE = MPI_PROC_NULL;
874       }
875     }
876   }
877   for(c = 0; c < count; c++) {
878
879     if (MC_is_active()) {
880       smpi_mpi_wait(&requests[c], pstat);
881       index = c;
882     } else {
883       index = smpi_mpi_waitany(count, requests, pstat);
884       if (index == MPI_UNDEFINED)
885         break;
886       if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
887       requests[index]=MPI_REQUEST_NULL;
888     }
889     if (status != MPI_STATUSES_IGNORE) {
890       status[index] = *pstat;
891       if (status[index].MPI_ERROR == MPI_ERR_TRUNCATE)
892         retvalue = MPI_ERR_IN_STATUS;
893     }
894   }
895
896   return retvalue;
897 }
898
899 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
900                       MPI_Status status[])
901 {
902   int i, count, index;
903   MPI_Status stat;
904   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
905
906   count = 0;
907   for(i = 0; i < incount; i++)
908   {
909     index=smpi_mpi_waitany(incount, requests, pstat);
910     if(index!=MPI_UNDEFINED){
911       indices[count] = index;
912       count++;
913       if(status != MPI_STATUSES_IGNORE) {
914         status[index] = *pstat;
915       }
916      if (requests[index] != MPI_REQUEST_NULL && (requests[index]->flags & NON_PERSISTENT))
917      requests[index]=MPI_REQUEST_NULL;
918     }else{
919       return MPI_UNDEFINED;
920     }
921   }
922   return count;
923 }
924
925 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
926                       MPI_Status status[])
927 {
928   int i, count, count_dead;
929   MPI_Status stat;
930   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
931
932   count = 0;
933   count_dead = 0;
934   for(i = 0; i < incount; i++) {
935     if((requests[i] != MPI_REQUEST_NULL)) {
936       if(smpi_mpi_test(&requests[i], pstat)) {
937          indices[i] = 1;
938          count++;
939          if(status != MPI_STATUSES_IGNORE) {
940            status[i] = *pstat;
941          }
942          if ((requests[i] != MPI_REQUEST_NULL) && requests[i]->flags & NON_PERSISTENT)
943          requests[i]=MPI_REQUEST_NULL;
944       }
945     }else{
946       count_dead++;
947     }
948   }
949   if(count_dead==incount)return MPI_UNDEFINED;
950   else return count;
951 }
952
953 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
954                     MPI_Comm comm)
955 {
956   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
957   nary_tree_bcast(buf, count, datatype, root, comm, 4);
958 }
959
960 void smpi_mpi_barrier(MPI_Comm comm)
961 {
962   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
963   nary_tree_barrier(comm, 4);
964 }
965
966 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
967                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
968                      int root, MPI_Comm comm)
969 {
970   int system_tag = COLL_TAG_GATHER;
971   int rank, size, src, index;
972   MPI_Aint lb = 0, recvext = 0;
973   MPI_Request *requests;
974
975   rank = smpi_comm_rank(comm);
976   size = smpi_comm_size(comm);
977   if(rank != root) {
978     // Send buffer to root
979     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
980   } else {
981     // FIXME: check for errors
982     smpi_datatype_extent(recvtype, &lb, &recvext);
983     // Local copy from root
984     smpi_datatype_copy(sendbuf, sendcount, sendtype,
985                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
986     // Receive buffers from senders
987     requests = xbt_new(MPI_Request, size - 1);
988     index = 0;
989     for(src = 0; src < size; src++) {
990       if(src != root) {
991         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
992                                           recvcount, recvtype,
993                                           src, system_tag, comm);
994         index++;
995       }
996     }
997     // Wait for completion of irecv's.
998     smpi_mpi_startall(size - 1, requests);
999     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1000     for(src = 0; src < size-1; src++) {
1001       smpi_mpi_request_free(&requests[src]);
1002     }
1003     xbt_free(requests);
1004   }
1005 }
1006
1007
1008 void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts,
1009                        MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1010 {
1011     int i, size, count;
1012     int *displs;
1013     int rank = smpi_process_index();
1014     void *tmpbuf;
1015
1016     /* arbitrarily choose root as rank 0 */
1017     size = smpi_comm_size(comm);
1018     count = 0;
1019     displs = xbt_new(int, size);
1020     for (i = 0; i < size; i++) {
1021       displs[i] = count;
1022       count += recvcounts[i];
1023     }
1024     tmpbuf=(void*)xbt_malloc(count*smpi_datatype_get_extent(datatype));
1025     mpi_coll_reduce_fun(sendbuf, tmpbuf, count, datatype, op, 0, comm);
1026     smpi_mpi_scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf,
1027                       recvcounts[rank], datatype, 0, comm);
1028     xbt_free(displs);
1029     xbt_free(tmpbuf);
1030 }
1031
1032 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
1033                       void *recvbuf, int *recvcounts, int *displs,
1034                       MPI_Datatype recvtype, int root, MPI_Comm comm)
1035 {
1036   int system_tag = COLL_TAG_GATHERV;
1037   int rank, size, src, index;
1038   MPI_Aint lb = 0, recvext = 0;
1039   MPI_Request *requests;
1040
1041   rank = smpi_comm_rank(comm);
1042   size = smpi_comm_size(comm);
1043   if(rank != root) {
1044     // Send buffer to root
1045     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
1046   } else {
1047     // FIXME: check for errors
1048     smpi_datatype_extent(recvtype, &lb, &recvext);
1049     // Local copy from root
1050     smpi_datatype_copy(sendbuf, sendcount, sendtype,
1051                        (char *)recvbuf + displs[root] * recvext,
1052                        recvcounts[root], recvtype);
1053     // Receive buffers from senders
1054     requests = xbt_new(MPI_Request, size - 1);
1055     index = 0;
1056     for(src = 0; src < size; src++) {
1057       if(src != root) {
1058         requests[index] =
1059           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
1060                           recvcounts[src], recvtype, src, system_tag, comm);
1061         index++;
1062       }
1063     }
1064     // Wait for completion of irecv's.
1065     smpi_mpi_startall(size - 1, requests);
1066     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1067     for(src = 0; src < size-1; src++) {
1068       smpi_mpi_request_free(&requests[src]);
1069     }
1070     xbt_free(requests);
1071   }
1072 }
1073
1074 void smpi_mpi_allgather(void *sendbuf, int sendcount,
1075                         MPI_Datatype sendtype, void *recvbuf,
1076                         int recvcount, MPI_Datatype recvtype,
1077                         MPI_Comm comm)
1078 {
1079   int system_tag = COLL_TAG_ALLGATHER;
1080   int rank, size, other, index;
1081   MPI_Aint lb = 0, recvext = 0;
1082   MPI_Request *requests;
1083
1084   rank = smpi_comm_rank(comm);
1085   size = smpi_comm_size(comm);
1086   // FIXME: check for errors
1087   smpi_datatype_extent(recvtype, &lb, &recvext);
1088   // Local copy from self
1089   smpi_datatype_copy(sendbuf, sendcount, sendtype,
1090                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
1091                      recvtype);
1092   // Send/Recv buffers to/from others;
1093   requests = xbt_new(MPI_Request, 2 * (size - 1));
1094   index = 0;
1095   for(other = 0; other < size; other++) {
1096     if(other != rank) {
1097       requests[index] =
1098         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
1099                         comm);
1100       index++;
1101       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
1102                                         recvcount, recvtype, other,
1103                                         system_tag, comm);
1104       index++;
1105     }
1106   }
1107   // Wait for completion of all comms.
1108   smpi_mpi_startall(2 * (size - 1), requests);
1109   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
1110   for(other = 0; other < 2*(size-1); other++) {
1111     smpi_mpi_request_free(&requests[other]);
1112   }
1113   xbt_free(requests);
1114 }
1115
1116 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
1117                          MPI_Datatype sendtype, void *recvbuf,
1118                          int *recvcounts, int *displs,
1119                          MPI_Datatype recvtype, MPI_Comm comm)
1120 {
1121   int system_tag = COLL_TAG_ALLGATHERV;
1122   int rank, size, other, index;
1123   MPI_Aint lb = 0, recvext = 0;
1124   MPI_Request *requests;
1125
1126   rank = smpi_comm_rank(comm);
1127   size = smpi_comm_size(comm);
1128   // FIXME: check for errors
1129   smpi_datatype_extent(recvtype, &lb, &recvext);
1130   // Local copy from self
1131   smpi_datatype_copy(sendbuf, sendcount, sendtype,
1132                      (char *)recvbuf + displs[rank] * recvext,
1133                      recvcounts[rank], recvtype);
1134   // Send buffers to others;
1135   requests = xbt_new(MPI_Request, 2 * (size - 1));
1136   index = 0;
1137   for(other = 0; other < size; other++) {
1138     if(other != rank) {
1139       requests[index] =
1140         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
1141                         comm);
1142       index++;
1143       requests[index] =
1144         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
1145                         recvtype, other, system_tag, comm);
1146       index++;
1147     }
1148   }
1149   // Wait for completion of all comms.
1150   smpi_mpi_startall(2 * (size - 1), requests);
1151   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
1152   for(other = 0; other < 2*(size-1); other++) {
1153     smpi_mpi_request_free(&requests[other]);
1154   }
1155   xbt_free(requests);
1156 }
1157
1158 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
1159                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
1160                       int root, MPI_Comm comm)
1161 {
1162   int system_tag = COLL_TAG_SCATTER;
1163   int rank, size, dst, index;
1164   MPI_Aint lb = 0, sendext = 0;
1165   MPI_Request *requests;
1166
1167   rank = smpi_comm_rank(comm);
1168   size = smpi_comm_size(comm);
1169   if(rank != root) {
1170     // Recv buffer from root
1171     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
1172                   MPI_STATUS_IGNORE);
1173   } else {
1174     // FIXME: check for errors
1175     smpi_datatype_extent(sendtype, &lb, &sendext);
1176     // Local copy from root
1177     if(recvbuf!=MPI_IN_PLACE){
1178         smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
1179                            sendcount, sendtype, recvbuf, recvcount, recvtype);
1180     }
1181     // Send buffers to receivers
1182     requests = xbt_new(MPI_Request, size - 1);
1183     index = 0;
1184     for(dst = 0; dst < size; dst++) {
1185       if(dst != root) {
1186         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
1187                                           sendcount, sendtype, dst,
1188                                           system_tag, comm);
1189         index++;
1190       }
1191     }
1192     // Wait for completion of isend's.
1193     smpi_mpi_startall(size - 1, requests);
1194     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1195     for(dst = 0; dst < size-1; dst++) {
1196       smpi_mpi_request_free(&requests[dst]);
1197     }
1198     xbt_free(requests);
1199   }
1200 }
1201
1202 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
1203                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
1204                        MPI_Datatype recvtype, int root, MPI_Comm comm)
1205 {
1206   int system_tag = COLL_TAG_SCATTERV;
1207   int rank, size, dst, index;
1208   MPI_Aint lb = 0, sendext = 0;
1209   MPI_Request *requests;
1210
1211   rank = smpi_comm_rank(comm);
1212   size = smpi_comm_size(comm);
1213   if(rank != root) {
1214     // Recv buffer from root
1215     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
1216                   MPI_STATUS_IGNORE);
1217   } else {
1218     // FIXME: check for errors
1219     smpi_datatype_extent(sendtype, &lb, &sendext);
1220     // Local copy from root
1221     if(recvbuf!=MPI_IN_PLACE){
1222       smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
1223                        sendtype, recvbuf, recvcount, recvtype);
1224     }
1225     // Send buffers to receivers
1226     requests = xbt_new(MPI_Request, size - 1);
1227     index = 0;
1228     for(dst = 0; dst < size; dst++) {
1229       if(dst != root) {
1230         requests[index] =
1231           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
1232                           sendtype, dst, system_tag, comm);
1233         index++;
1234       }
1235     }
1236     // Wait for completion of isend's.
1237     smpi_mpi_startall(size - 1, requests);
1238     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
1239     for(dst = 0; dst < size-1; dst++) {
1240       smpi_mpi_request_free(&requests[dst]);
1241     }
1242     xbt_free(requests);
1243   }
1244 }
1245
1246 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
1247                      MPI_Datatype datatype, MPI_Op op, int root,
1248                      MPI_Comm comm)
1249 {
1250   int system_tag = COLL_TAG_REDUCE;
1251   int rank, size, src, index;
1252   MPI_Aint lb = 0, dataext = 0;
1253   MPI_Request *requests;
1254   void **tmpbufs;
1255
1256
1257   char* sendtmpbuf = (char*) sendbuf;
1258   if( sendbuf == MPI_IN_PLACE ) {
1259     sendtmpbuf = (char *)xbt_malloc(count*smpi_datatype_get_extent(datatype));
1260     smpi_datatype_copy(recvbuf, count, datatype,sendtmpbuf, count, datatype);
1261   }
1262
1263   rank = smpi_comm_rank(comm);
1264   size = smpi_comm_size(comm);
1265   //non commutative case, use a working algo from openmpi
1266   if(!smpi_op_is_commute(op)){
1267     smpi_coll_tuned_reduce_ompi_basic_linear(sendtmpbuf, recvbuf, count,
1268                      datatype, op, root, comm);
1269     return;
1270   }
1271   
1272   if(rank != root) {
1273     // Send buffer to root
1274     smpi_mpi_send(sendtmpbuf, count, datatype, root, system_tag, comm);
1275   } else {
1276     // FIXME: check for errors
1277     smpi_datatype_extent(datatype, &lb, &dataext);
1278     // Local copy from root
1279     if (sendtmpbuf && recvbuf)
1280       smpi_datatype_copy(sendtmpbuf, count, datatype, recvbuf, count, datatype);
1281     // Receive buffers from senders
1282     //TODO: make a MPI_barrier here ?
1283     requests = xbt_new(MPI_Request, size - 1);
1284     tmpbufs = xbt_new(void *, size - 1);
1285     index = 0;
1286     for(src = 0; src < size; src++) {
1287       if(src != root) {
1288         // FIXME: possibly overkill we we have contiguous/noncontiguous data
1289         //  mapping...
1290         tmpbufs[index] = xbt_malloc(count * dataext);
1291         requests[index] =
1292           smpi_irecv_init(tmpbufs[index], count, datatype, src,
1293                           system_tag, comm);
1294         index++;
1295       }
1296     }
1297     // Wait for completion of irecv's.
1298     smpi_mpi_startall(size - 1, requests);
1299     for(src = 0; src < size - 1; src++) {
1300       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1301       XBT_DEBUG("finished waiting any request with index %d", index);
1302       if(index == MPI_UNDEFINED) {
1303         break;
1304       }else{
1305         smpi_mpi_request_free(&requests[index]);
1306       }
1307       if(op) /* op can be MPI_OP_NULL that does nothing */
1308         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1309     }
1310     for(index = 0; index < size - 1; index++) {
1311       xbt_free(tmpbufs[index]);
1312     }
1313     xbt_free(tmpbufs);
1314     xbt_free(requests);
1315
1316     if( sendbuf == MPI_IN_PLACE ) {
1317       xbt_free(sendtmpbuf);
1318     }
1319   }
1320 }
1321
1322 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
1323                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1324 {
1325   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
1326   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
1327 }
1328
1329 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
1330                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1331 {
1332   int system_tag = -888;
1333   int rank, size, other, index;
1334   MPI_Aint lb = 0, dataext = 0;
1335   MPI_Request *requests;
1336   void **tmpbufs;
1337
1338   rank = smpi_comm_rank(comm);
1339   size = smpi_comm_size(comm);
1340
1341   // FIXME: check for errors
1342   smpi_datatype_extent(datatype, &lb, &dataext);
1343
1344   // Local copy from self
1345   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
1346
1347   // Send/Recv buffers to/from others;
1348   requests = xbt_new(MPI_Request, size - 1);
1349   tmpbufs = xbt_new(void *, rank);
1350   index = 0;
1351   for(other = 0; other < rank; other++) {
1352     // FIXME: possibly overkill we we have contiguous/noncontiguous data
1353     // mapping...
1354     tmpbufs[index] = xbt_malloc(count * dataext);
1355     requests[index] =
1356       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
1357                       comm);
1358     index++;
1359   }
1360   for(other = rank + 1; other < size; other++) {
1361     requests[index] =
1362       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1363     index++;
1364   }
1365   // Wait for completion of all comms.
1366   smpi_mpi_startall(size - 1, requests);
1367
1368   if(smpi_op_is_commute(op)){
1369     for(other = 0; other < size - 1; other++) {
1370       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1371       if(index == MPI_UNDEFINED) {
1372         break;
1373       }
1374       if(index < rank) {
1375         // #Request is below rank: it's a irecv
1376         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1377       }
1378     }
1379   }else{
1380     //non commutative case, wait in order
1381     for(other = 0; other < size - 1; other++) {
1382       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
1383       if(index < rank) {
1384         smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
1385       }
1386     }
1387   }
1388   for(index = 0; index < rank; index++) {
1389     xbt_free(tmpbufs[index]);
1390   }
1391   for(index = 0; index < size-1; index++) {
1392     smpi_mpi_request_free(&requests[index]);
1393   }
1394   xbt_free(tmpbufs);
1395   xbt_free(requests);
1396 }
1397
1398 void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count,
1399                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
1400 {
1401   int system_tag = -888;
1402   int rank, size, other, index;
1403   MPI_Aint lb = 0, dataext = 0;
1404   MPI_Request *requests;
1405   void **tmpbufs;
1406   int recvbuf_is_empty=1;
1407   rank = smpi_comm_rank(comm);
1408   size = smpi_comm_size(comm);
1409
1410   // FIXME: check for errors
1411   smpi_datatype_extent(datatype, &lb, &dataext);
1412
1413   // Send/Recv buffers to/from others;
1414   requests = xbt_new(MPI_Request, size - 1);
1415   tmpbufs = xbt_new(void *, rank);
1416   index = 0;
1417   for(other = 0; other < rank; other++) {
1418     // FIXME: possibly overkill we we have contiguous/noncontiguous data
1419     // mapping...
1420     tmpbufs[index] = xbt_malloc(count * dataext);
1421     requests[index] =
1422       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
1423                       comm);
1424     index++;
1425   }
1426   for(other = rank + 1; other < size; other++) {
1427     requests[index] =
1428       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1429     index++;
1430   }
1431   // Wait for completion of all comms.
1432   smpi_mpi_startall(size - 1, requests);
1433   if(smpi_op_is_commute(op)){
1434     for(other = 0; other < size - 1; other++) {
1435       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1436       if(index == MPI_UNDEFINED) {
1437         break;
1438       }
1439       if(index < rank) {
1440         if(recvbuf_is_empty){
1441           smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype);
1442           recvbuf_is_empty=0;
1443         }else
1444         // #Request is below rank: it's a irecv
1445         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1446       }
1447     }
1448   }else{
1449     //non commutative case, wait in order
1450     for(other = 0; other < size - 1; other++) {
1451       smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE);
1452       if(index < rank) {
1453           if(recvbuf_is_empty){
1454             smpi_datatype_copy(tmpbufs[other], count, datatype, recvbuf, count, datatype);
1455             recvbuf_is_empty=0;
1456           }else smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype);
1457       }
1458     }
1459   }
1460   for(index = 0; index < rank; index++) {
1461     xbt_free(tmpbufs[index]);
1462   }
1463   for(index = 0; index < size-1; index++) {
1464     smpi_mpi_request_free(&requests[index]);
1465   }
1466   xbt_free(tmpbufs);
1467   xbt_free(requests);
1468 }