Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
change the way we handle MPI_Request termination
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/virtu.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "simix/smx_private.h"
13 #include "surf/surf.h"
14
15
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
17
18
19 static int match_recv(void* a, void* b, smx_action_t ignored) {
20    MPI_Request ref = (MPI_Request)a;
21    MPI_Request req = (MPI_Request)b;
22    XBT_DEBUG("Trying to match a recv of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
23
24   xbt_assert(ref, "Cannot match recv against null reference");
25   xbt_assert(req, "Cannot match recv against null request");
26   if((ref->src == MPI_ANY_SOURCE || req->src == ref->src)
27     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag)){
28     //we match, we can transfer some values
29     // FIXME : move this to the copy function ?
30     if(ref->src == MPI_ANY_SOURCE)ref->real_src = req->src;
31     if(ref->tag == MPI_ANY_TAG)ref->real_tag = req->tag;
32     if(ref->real_size < req->real_size) ref->truncated = 1;
33     if(req->detached==1){
34         ref->detached_sender=req; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
35     }
36     return 1;
37   }else return 0;
38 }
39
40 static int match_send(void* a, void* b,smx_action_t ignored) {
41    MPI_Request ref = (MPI_Request)a;
42    MPI_Request req = (MPI_Request)b;
43    XBT_DEBUG("Trying to match a send of src %d against %d, tag %d against %d",ref->src,req->src, ref->tag, req->tag);
44    xbt_assert(ref, "Cannot match send against null reference");
45    xbt_assert(req, "Cannot match send against null request");
46
47    if((req->src == MPI_ANY_SOURCE || req->src == ref->src)
48              && (req->tag == MPI_ANY_TAG || req->tag == ref->tag))
49    {
50      if(req->src == MPI_ANY_SOURCE)req->real_src = ref->src;
51      if(req->tag == MPI_ANY_TAG)req->real_tag = ref->tag;
52      if(req->real_size < ref->real_size) req->truncated = 1;
53      if(ref->detached==1){
54          req->detached_sender=ref; //tie the sender to the receiver, as it is detached and has to be freed in the receiver
55      }
56
57      return 1;
58    } else return 0;
59 }
60
61 static MPI_Request build_request(void *buf, int count,
62                                  MPI_Datatype datatype, int src, int dst,
63                                  int tag, MPI_Comm comm, unsigned flags)
64 {
65   MPI_Request request;
66
67   void *old_buf = NULL;
68
69   request = xbt_new(s_smpi_mpi_request_t, 1);
70
71   s_smpi_subtype_t *subtype = datatype->substruct;
72
73   if(datatype->has_subtype == 1){
74     // This part handles the problem of non-contiguous memory
75     old_buf = buf;
76     buf = malloc(count*smpi_datatype_size(datatype));
77     if (flags & SEND) {
78       subtype->serialize(old_buf, buf, count, datatype->substruct);
79     }
80   }
81
82   request->buf = buf;
83   // This part handles the problem of non-contiguous memory (for the
84   // unserialisation at the reception)
85   request->old_buf = old_buf;
86   request->old_type = datatype;
87
88   request->size = smpi_datatype_size(datatype) * count;
89   request->src = src;
90   request->dst = dst;
91   request->tag = tag;
92   request->comm = comm;
93   request->action = NULL;
94   request->flags = flags;
95   request->detached = 0;
96   request->detached_sender = NULL;
97
98   request->truncated = 0;
99   request->real_size = 0;
100   request->real_tag = 0;
101
102   request->refcount=1;
103 #ifdef HAVE_TRACING
104   request->send = 0;
105   request->recv = 0;
106 #endif
107   if (flags & SEND) smpi_datatype_unuse(datatype);
108
109   return request;
110 }
111
112
113 void smpi_empty_status(MPI_Status * status) {
114   if(status != MPI_STATUS_IGNORE) {
115       status->MPI_SOURCE=MPI_ANY_SOURCE;
116       status->MPI_TAG=MPI_ANY_TAG;
117       status->count=0;
118   }
119 }
120
121 void smpi_action_trace_run(char *path)
122 {
123   char *name;
124   xbt_dynar_t todo;
125   xbt_dict_cursor_t cursor;
126
127   action_fp=NULL;
128   if (path) {
129     action_fp = fopen(path, "r");
130     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
131                strerror(errno));
132   }
133
134   if (!xbt_dict_is_empty(action_queues)) {
135     XBT_WARN
136       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
137
138
139     xbt_dict_foreach(action_queues, cursor, name, todo) {
140       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
141     }
142   }
143
144   if (path)
145     fclose(action_fp);
146   xbt_dict_free(&action_queues);
147   action_queues = xbt_dict_new_homogeneous(NULL);
148 }
149
150 static void smpi_mpi_request_free_voidp(void* request)
151 {
152   MPI_Request req = request;
153   smpi_mpi_request_free(&req);
154 }
155
156 /* MPI Low level calls */
157 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
158                                int dst, int tag, MPI_Comm comm)
159 {
160   MPI_Request request =
161     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
162                   comm, PERSISTENT | SEND);
163   request->refcount++;
164   return request;
165 }
166
167 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
168                                int src, int tag, MPI_Comm comm)
169 {
170   MPI_Request request =
171     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
172                   comm, PERSISTENT | RECV);
173   request->refcount++;
174   return request;
175 }
176
177 void smpi_mpi_start(MPI_Request request)
178 {
179   smx_rdv_t mailbox;
180
181   xbt_assert(!request->action,
182              "Cannot (re)start a non-finished communication");
183   if(request->flags & RECV) {
184     print_request("New recv", request);
185     if (request->size < surf_cfg_get_int("smpi/async_small_thres"))
186       mailbox = smpi_process_mailbox_small();
187     else
188       mailbox = smpi_process_mailbox();
189     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
190     request->real_size=request->size;
191     smpi_datatype_use(request->old_type);
192     request->action = simcall_comm_irecv(mailbox, request->buf, &request->real_size, &match_recv, request);
193   } else {
194
195     int receiver = smpi_group_index(smpi_comm_group(request->comm), request->dst);
196 /*    if(receiver == MPI_UNDEFINED) {*/
197 /*      XBT_WARN("Trying to send a message to a wrong rank");*/
198 /*      return;*/
199 /*    }*/
200     print_request("New send", request);
201     if (request->size < surf_cfg_get_int("smpi/async_small_thres")) { // eager mode
202       mailbox = smpi_process_remote_mailbox_small(receiver);
203     }else{
204       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
205       mailbox = smpi_process_remote_mailbox(receiver);
206     }
207     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
208       void *oldbuf = NULL;
209       request->detached = 1;
210       request->refcount++;
211       if(request->old_type->has_subtype == 0){
212         oldbuf = request->buf;
213         if (oldbuf){
214           request->buf = malloc(request->size);
215           memcpy(request->buf,oldbuf,request->size);
216         }
217       }
218       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
219     }
220     // we make a copy here, as the size is modified by simix, and we may reuse the request in another receive later
221     request->real_size=request->size;
222     smpi_datatype_use(request->old_type);
223
224     request->action =
225       simcall_comm_isend(mailbox, request->size, -1.0,
226                          request->buf, request->real_size,
227                          &match_send,
228                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
229                          request,
230                          // detach if msg size < eager/rdv switch limit
231                          request->detached);
232
233 #ifdef HAVE_TRACING
234     /* FIXME: detached sends are not traceable (request->action == NULL) */
235     if (request->action)
236       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
237 #endif
238
239   }
240
241 }
242
243 void smpi_mpi_startall(int count, MPI_Request * requests)
244 {
245   int i;
246
247   for(i = 0; i < count; i++) {
248     smpi_mpi_start(requests[i]);
249   }
250 }
251
252 void smpi_mpi_request_free(MPI_Request * request)
253 {
254
255   if((*request) != MPI_REQUEST_NULL){
256     (*request)->refcount--;
257     if((*request)->refcount<0) xbt_die("wrong refcount");
258
259     if((*request)->refcount==0){
260         xbt_free(*request);
261         *request = MPI_REQUEST_NULL;
262     }
263   }else{
264       xbt_die("freeing an already free request");
265   }
266 }
267
268 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
269                             int dst, int tag, MPI_Comm comm)
270 {
271   MPI_Request request =
272     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
273                   comm, NON_PERSISTENT | SEND);
274
275   return request;
276 }
277
278 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
279                            int dst, int tag, MPI_Comm comm)
280 {
281   MPI_Request request =
282       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
283                     comm, NON_PERSISTENT | SEND);
284
285   smpi_mpi_start(request);
286   return request;
287 }
288
289 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
290                             int src, int tag, MPI_Comm comm)
291 {
292   MPI_Request request =
293     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
294                   comm, NON_PERSISTENT | RECV);
295   return request;
296 }
297
298 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
299                            int src, int tag, MPI_Comm comm)
300 {
301   MPI_Request request =
302       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
303                     comm, NON_PERSISTENT | RECV);
304
305   smpi_mpi_start(request);
306   return request;
307 }
308
309 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
310                    int tag, MPI_Comm comm, MPI_Status * status)
311 {
312   MPI_Request request;
313   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
314   smpi_mpi_wait(&request, status);
315 }
316
317
318
319 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
320                    int tag, MPI_Comm comm)
321 {
322   MPI_Request request;
323   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
324   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
325
326 }
327
328 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
329                        int dst, int sendtag, void *recvbuf, int recvcount,
330                        MPI_Datatype recvtype, int src, int recvtag,
331                        MPI_Comm comm, MPI_Status * status)
332 {
333   MPI_Request requests[2];
334   MPI_Status stats[2];
335
336   requests[0] =
337     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
338   requests[1] =
339     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
340   smpi_mpi_startall(2, requests);
341   smpi_mpi_waitall(2, requests, stats);
342   if(status != MPI_STATUS_IGNORE) {
343     // Copy receive status
344     memcpy(status, &stats[1], sizeof(MPI_Status));
345   }
346 }
347
348 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
349 {
350   return status->count / smpi_datatype_size(datatype);
351 }
352
353 static void finish_wait(MPI_Request * request, MPI_Status * status)
354 {
355   MPI_Request req = *request;
356   if(!(req->detached && req->flags & SEND)){
357     if(status != MPI_STATUS_IGNORE) {
358       status->MPI_SOURCE = req->src == MPI_ANY_SOURCE ? req->real_src : req->src;
359       status->MPI_TAG = req->tag == MPI_ANY_TAG ? req->real_tag : req->tag;
360       if(req->truncated)
361       status->MPI_ERROR = MPI_ERR_TRUNCATE;
362       else status->MPI_ERROR = MPI_SUCCESS ;
363       // this handles the case were size in receive differs from size in send
364       // FIXME: really this should just contain the count of receive-type blocks,
365       // right?
366       status->count = req->real_size;
367     }
368
369     print_request("Finishing", req);
370     MPI_Datatype datatype = req->old_type;
371
372     if(datatype->has_subtype == 1){
373         // This part handles the problem of non-contignous memory
374         // the unserialization at the reception
375       s_smpi_subtype_t *subtype = datatype->substruct;
376       if(req->flags & RECV) {
377         subtype->unserialize(req->buf, req->old_buf, req->real_size/smpi_datatype_size(datatype) , datatype->substruct);
378       }
379       if(req->detached == 0) free(req->buf);
380     }
381     smpi_datatype_unuse(datatype);
382   }
383
384   if(req->detached_sender!=NULL){
385     smpi_mpi_request_free(&(req->detached_sender));
386   }
387
388   if(req->flags & NON_PERSISTENT) {
389     smpi_mpi_request_free(request);
390   } else {
391     req->action = NULL;
392   }
393 }
394
395 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
396   int flag;
397
398   //assume that request is not MPI_REQUEST_NULL (filtered in PMPI_Test or smpi_mpi_testall before)
399   if ((*request)->action == NULL)
400     flag = 1;
401   else
402     flag = simcall_comm_test((*request)->action);
403   if(flag) {
404     (*request)->refcount++;
405     finish_wait(request, status);
406   }else{
407     smpi_empty_status(status);
408   }
409   return flag;
410 }
411
412 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
413                      MPI_Status * status)
414 {
415   xbt_dynar_t comms;
416   int i, flag, size;
417   int* map;
418
419   *index = MPI_UNDEFINED;
420   flag = 0;
421   if(count > 0) {
422     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
423     map = xbt_new(int, count);
424     size = 0;
425     for(i = 0; i < count; i++) {
426       if((requests[i]!=MPI_REQUEST_NULL) && requests[i]->action) {
427          xbt_dynar_push(comms, &requests[i]->action);
428          map[size] = i;
429          size++;
430       }
431     }
432     if(size > 0) {
433       i = simcall_comm_testany(comms);
434       // not MPI_UNDEFINED, as this is a simix return code
435       if(i != -1) {
436         *index = map[i];
437         finish_wait(&requests[*index], status);
438         flag = 1;
439       }
440     }else{
441         //all requests are null or inactive, return true
442         flag=1;
443         smpi_empty_status(status);
444     }
445     xbt_free(map);
446     xbt_dynar_free(&comms);
447   }
448
449   return flag;
450 }
451
452
453 int smpi_mpi_testall(int count, MPI_Request requests[],
454                      MPI_Status status[])
455 {
456   MPI_Status stat;
457   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
458   int flag=1;
459   int i;
460   for(i=0; i<count; i++){
461     if(requests[i]!= MPI_REQUEST_NULL){
462       if (smpi_mpi_test(&requests[i], pstat)!=1){
463         flag=0;
464       }
465     }else{
466       smpi_empty_status(pstat);
467     }
468     if(status != MPI_STATUSES_IGNORE) {
469       memcpy(&status[i], pstat, sizeof(*pstat));
470     }
471   }
472   return flag;
473 }
474
475 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
476   int flag=0;
477   //FIXME find another wait to avoid busy waiting ?
478   // the issue here is that we have to wait on a nonexistent comm
479   while(flag==0){
480     smpi_mpi_iprobe(source, tag, comm, &flag, status);
481     XBT_DEBUG("Busy Waiting on probing : %d", flag);
482     if(!flag) {
483       simcall_process_sleep(0.0001);
484     }
485   }
486 }
487
488 void smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
489   MPI_Request request =build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
490             comm, NON_PERSISTENT | RECV);
491
492   // behave like a receive, but don't do it
493   smx_rdv_t mailbox;
494
495   print_request("New iprobe", request);
496   // We have to test both mailboxes as we don't know if we will receive one one or another
497     if (surf_cfg_get_int("smpi/async_small_thres")>0){
498         mailbox = smpi_process_mailbox_small();
499         XBT_DEBUG("trying to probe the perm recv mailbox");
500         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
501     }
502     if (request->action==NULL){
503         mailbox = smpi_process_mailbox();
504         XBT_DEBUG("trying to probe the other mailbox");
505         request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
506     }
507
508   if(request->action){
509     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
510     *flag = 1;
511     if(status != MPI_STATUS_IGNORE) {
512       status->MPI_SOURCE = req->src;
513       status->MPI_TAG = req->tag;
514       status->MPI_ERROR = MPI_SUCCESS;
515       status->count = req->real_size;
516     }
517   }
518   else *flag = 0;
519   smpi_mpi_request_free(&request);
520
521   return;
522 }
523
524 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
525 {
526   print_request("Waiting", *request);
527   if ((*request)->action != NULL) { // this is not a detached send
528     simcall_comm_wait((*request)->action, -1.0);
529   }
530   finish_wait(request, status);
531
532   // FIXME for a detached send, finish_wait is not called:
533 }
534
535 int smpi_mpi_waitany(int count, MPI_Request requests[],
536                      MPI_Status * status)
537 {
538   xbt_dynar_t comms;
539   int i, size, index;
540   int *map;
541
542   index = MPI_UNDEFINED;
543   if(count > 0) {
544     // Wait for a request to complete
545     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
546     map = xbt_new(int, count);
547     size = 0;
548     XBT_DEBUG("Wait for one of %d", count);
549     for(i = 0; i < count; i++) {
550       if(requests[i] != MPI_REQUEST_NULL) {
551         if (requests[i]->action != NULL) {
552           XBT_DEBUG("Waiting any %p ", requests[i]);
553           xbt_dynar_push(comms, &requests[i]->action);
554           map[size] = i;
555           size++;
556         }else{
557          //This is a finished detached request, let's return this one
558          size=0;//so we free the dynar but don't do the waitany call
559          index=i;
560          finish_wait(&requests[i], status);//cleanup if refcount = 0
561          requests[i]=MPI_REQUEST_NULL;//set to null
562          break;
563          }
564       }
565     }
566     if(size > 0) {
567       i = simcall_comm_waitany(comms);
568
569       // not MPI_UNDEFINED, as this is a simix return code
570       if (i != -1) {
571         index = map[i];
572         finish_wait(&requests[index], status);
573       }
574     }
575     xbt_free(map);
576     xbt_dynar_free(&comms);
577   }
578
579   if (index==MPI_UNDEFINED)
580     smpi_empty_status(status);
581
582   return index;
583 }
584
585 int smpi_mpi_waitall(int count, MPI_Request requests[],
586                       MPI_Status status[])
587 {
588   int  index, c;
589   MPI_Status stat;
590   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
591   int retvalue=MPI_SUCCESS;
592   //tag invalid requests in the set
593   for(c = 0; c < count; c++) {
594     if(requests[c]==MPI_REQUEST_NULL || requests[c]->dst == MPI_PROC_NULL ){
595       if(status != MPI_STATUSES_IGNORE)
596         smpi_empty_status(&status[c]);
597     }else if(requests[c]->src == MPI_PROC_NULL ){
598       if(status != MPI_STATUSES_IGNORE) {
599         smpi_empty_status(&status[c]);
600         status[c].MPI_SOURCE=MPI_PROC_NULL;
601       }
602     }
603   }
604   for(c = 0; c < count; c++) {
605       if(MC_is_active()) {
606         smpi_mpi_wait(&requests[c], pstat);
607         index = c;
608       } else {
609         index = smpi_mpi_waitany(count, requests, pstat);
610         if(index == MPI_UNDEFINED) {
611           break;
612        }
613       if(status != MPI_STATUSES_IGNORE) {
614         memcpy(&status[index], pstat, sizeof(*pstat));
615         if(status[index].MPI_ERROR==MPI_ERR_TRUNCATE)retvalue=MPI_ERR_IN_STATUS;
616
617       }
618     }
619   }
620
621   return retvalue;
622 }
623
624 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
625                       MPI_Status status[])
626 {
627   int i, count, index;
628   MPI_Status stat;
629   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
630
631   count = 0;
632   for(i = 0; i < incount; i++)
633   {
634     index=smpi_mpi_waitany(incount, requests, pstat);
635     if(index!=MPI_UNDEFINED){
636       indices[count] = index;
637       count++;
638       if(status != MPI_STATUSES_IGNORE) {
639         memcpy(&status[index], pstat, sizeof(*pstat));
640       }
641     }else{
642       return MPI_UNDEFINED;
643     }
644   }
645   return count;
646 }
647
648 int smpi_mpi_testsome(int incount, MPI_Request requests[], int *indices,
649                       MPI_Status status[])
650 {
651   int i, count, count_dead;
652   MPI_Status stat;
653   MPI_Status *pstat = status == MPI_STATUSES_IGNORE ? MPI_STATUS_IGNORE : &stat;
654
655   count = 0;
656   count_dead = 0;
657   for(i = 0; i < incount; i++) {
658     if((requests[i] != MPI_REQUEST_NULL)) {
659       if(smpi_mpi_test(&requests[i], pstat)) {
660          indices[count] = i;
661          count++;
662          if(status != MPI_STATUSES_IGNORE) {
663             memcpy(&status[i], pstat, sizeof(*pstat));
664          }
665       }
666     }else{
667       count_dead++;
668     }
669   }
670   if(count_dead==incount)return MPI_UNDEFINED;
671   else return count;
672 }
673
674 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
675                     MPI_Comm comm)
676 {
677   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
678   nary_tree_bcast(buf, count, datatype, root, comm, 4);
679 }
680
681 void smpi_mpi_barrier(MPI_Comm comm)
682 {
683   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
684   nary_tree_barrier(comm, 4);
685 }
686
687 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
688                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
689                      int root, MPI_Comm comm)
690 {
691   int system_tag = 666;
692   int rank, size, src, index;
693   MPI_Aint lb = 0, recvext = 0;
694   MPI_Request *requests;
695
696   rank = smpi_comm_rank(comm);
697   size = smpi_comm_size(comm);
698   if(rank != root) {
699     // Send buffer to root
700     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
701   } else {
702     // FIXME: check for errors
703     smpi_datatype_extent(recvtype, &lb, &recvext);
704     // Local copy from root
705     smpi_datatype_copy(sendbuf, sendcount, sendtype,
706                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
707     // Receive buffers from senders
708     requests = xbt_new(MPI_Request, size - 1);
709     index = 0;
710     for(src = 0; src < size; src++) {
711       if(src != root) {
712         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
713                                           recvcount, recvtype,
714                                           src, system_tag, comm);
715         index++;
716       }
717     }
718     // Wait for completion of irecv's.
719     smpi_mpi_startall(size - 1, requests);
720     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
721     xbt_free(requests);
722   }
723 }
724
725 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
726                       void *recvbuf, int *recvcounts, int *displs,
727                       MPI_Datatype recvtype, int root, MPI_Comm comm)
728 {
729   int system_tag = 666;
730   int rank, size, src, index;
731   MPI_Aint lb = 0, recvext = 0;
732   MPI_Request *requests;
733
734   rank = smpi_comm_rank(comm);
735   size = smpi_comm_size(comm);
736   if(rank != root) {
737     // Send buffer to root
738     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
739   } else {
740     // FIXME: check for errors
741     smpi_datatype_extent(recvtype, &lb, &recvext);
742     // Local copy from root
743     smpi_datatype_copy(sendbuf, sendcount, sendtype,
744                        (char *)recvbuf + displs[root] * recvext,
745                        recvcounts[root], recvtype);
746     // Receive buffers from senders
747     requests = xbt_new(MPI_Request, size - 1);
748     index = 0;
749     for(src = 0; src < size; src++) {
750       if(src != root) {
751         requests[index] =
752           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
753                           recvcounts[src], recvtype, src, system_tag, comm);
754         index++;
755       }
756     }
757     // Wait for completion of irecv's.
758     smpi_mpi_startall(size - 1, requests);
759     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
760     xbt_free(requests);
761   }
762 }
763
764 void smpi_mpi_allgather(void *sendbuf, int sendcount,
765                         MPI_Datatype sendtype, void *recvbuf,
766                         int recvcount, MPI_Datatype recvtype,
767                         MPI_Comm comm)
768 {
769   int system_tag = 666;
770   int rank, size, other, index;
771   MPI_Aint lb = 0, recvext = 0;
772   MPI_Request *requests;
773
774   rank = smpi_comm_rank(comm);
775   size = smpi_comm_size(comm);
776   // FIXME: check for errors
777   smpi_datatype_extent(recvtype, &lb, &recvext);
778   // Local copy from self
779   smpi_datatype_copy(sendbuf, sendcount, sendtype,
780                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
781                      recvtype);
782   // Send/Recv buffers to/from others;
783   requests = xbt_new(MPI_Request, 2 * (size - 1));
784   index = 0;
785   for(other = 0; other < size; other++) {
786     if(other != rank) {
787       requests[index] =
788         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
789                         comm);
790       index++;
791       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
792                                         recvcount, recvtype, other,
793                                         system_tag, comm);
794       index++;
795     }
796   }
797   // Wait for completion of all comms.
798   smpi_mpi_startall(2 * (size - 1), requests);
799   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
800   xbt_free(requests);
801 }
802
803 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
804                          MPI_Datatype sendtype, void *recvbuf,
805                          int *recvcounts, int *displs,
806                          MPI_Datatype recvtype, MPI_Comm comm)
807 {
808   int system_tag = 666;
809   int rank, size, other, index;
810   MPI_Aint lb = 0, recvext = 0;
811   MPI_Request *requests;
812
813   rank = smpi_comm_rank(comm);
814   size = smpi_comm_size(comm);
815   // FIXME: check for errors
816   smpi_datatype_extent(recvtype, &lb, &recvext);
817   // Local copy from self
818   smpi_datatype_copy(sendbuf, sendcount, sendtype,
819                      (char *)recvbuf + displs[rank] * recvext,
820                      recvcounts[rank], recvtype);
821   // Send buffers to others;
822   requests = xbt_new(MPI_Request, 2 * (size - 1));
823   index = 0;
824   for(other = 0; other < size; other++) {
825     if(other != rank) {
826       requests[index] =
827         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
828                         comm);
829       index++;
830       requests[index] =
831         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
832                         recvtype, other, system_tag, comm);
833       index++;
834     }
835   }
836   // Wait for completion of all comms.
837   smpi_mpi_startall(2 * (size - 1), requests);
838   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
839   xbt_free(requests);
840 }
841
842 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
843                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
844                       int root, MPI_Comm comm)
845 {
846   int system_tag = 666;
847   int rank, size, dst, index;
848   MPI_Aint lb = 0, sendext = 0;
849   MPI_Request *requests;
850
851   rank = smpi_comm_rank(comm);
852   size = smpi_comm_size(comm);
853   if(rank != root) {
854     // Recv buffer from root
855     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
856                   MPI_STATUS_IGNORE);
857   } else {
858     // FIXME: check for errors
859     smpi_datatype_extent(sendtype, &lb, &sendext);
860     // Local copy from root
861     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
862                        sendcount, sendtype, recvbuf, recvcount, recvtype);
863     // Send buffers to receivers
864     requests = xbt_new(MPI_Request, size - 1);
865     index = 0;
866     for(dst = 0; dst < size; dst++) {
867       if(dst != root) {
868         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
869                                           sendcount, sendtype, dst,
870                                           system_tag, comm);
871         index++;
872       }
873     }
874     // Wait for completion of isend's.
875     smpi_mpi_startall(size - 1, requests);
876     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
877     xbt_free(requests);
878   }
879 }
880
881 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
882                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
883                        MPI_Datatype recvtype, int root, MPI_Comm comm)
884 {
885   int system_tag = 666;
886   int rank, size, dst, index;
887   MPI_Aint lb = 0, sendext = 0;
888   MPI_Request *requests;
889
890   rank = smpi_comm_rank(comm);
891   size = smpi_comm_size(comm);
892   if(rank != root) {
893     // Recv buffer from root
894     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
895                   MPI_STATUS_IGNORE);
896   } else {
897     // FIXME: check for errors
898     smpi_datatype_extent(sendtype, &lb, &sendext);
899     // Local copy from root
900     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
901                        sendtype, recvbuf, recvcount, recvtype);
902     // Send buffers to receivers
903     requests = xbt_new(MPI_Request, size - 1);
904     index = 0;
905     for(dst = 0; dst < size; dst++) {
906       if(dst != root) {
907         requests[index] =
908           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
909                           sendtype, dst, system_tag, comm);
910         index++;
911       }
912     }
913     // Wait for completion of isend's.
914     smpi_mpi_startall(size - 1, requests);
915     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
916     xbt_free(requests);
917   }
918 }
919
920 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
921                      MPI_Datatype datatype, MPI_Op op, int root,
922                      MPI_Comm comm)
923 {
924   int system_tag = 666;
925   int rank, size, src, index;
926   MPI_Aint lb = 0, dataext = 0;
927   MPI_Request *requests;
928   void **tmpbufs;
929
930   rank = smpi_comm_rank(comm);
931   size = smpi_comm_size(comm);
932   if(rank != root) {
933     // Send buffer to root
934     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
935   } else {
936     // FIXME: check for errors
937     smpi_datatype_extent(datatype, &lb, &dataext);
938     // Local copy from root
939     if (sendbuf && recvbuf)
940       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
941     // Receive buffers from senders
942     //TODO: make a MPI_barrier here ?
943     requests = xbt_new(MPI_Request, size - 1);
944     tmpbufs = xbt_new(void *, size - 1);
945     index = 0;
946     for(src = 0; src < size; src++) {
947       if(src != root) {
948         // FIXME: possibly overkill we we have contiguous/noncontiguous data
949         //  mapping...
950         tmpbufs[index] = xbt_malloc(count * dataext);
951         requests[index] =
952           smpi_irecv_init(tmpbufs[index], count, datatype, src,
953                           system_tag, comm);
954         index++;
955       }
956     }
957     // Wait for completion of irecv's.
958     smpi_mpi_startall(size - 1, requests);
959     for(src = 0; src < size - 1; src++) {
960       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
961       XBT_DEBUG("finished waiting any request with index %d", index);
962       if(index == MPI_UNDEFINED) {
963         break;
964       }
965       if(op) /* op can be MPI_OP_NULL that does nothing */
966         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
967     }
968     for(index = 0; index < size - 1; index++) {
969       xbt_free(tmpbufs[index]);
970     }
971     xbt_free(tmpbufs);
972     xbt_free(requests);
973   }
974 }
975
976 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
977                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
978 {
979   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
980   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
981 }
982
983 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
984                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
985 {
986   int system_tag = 666;
987   int rank, size, other, index;
988   MPI_Aint lb = 0, dataext = 0;
989   MPI_Request *requests;
990   void **tmpbufs;
991
992   rank = smpi_comm_rank(comm);
993   size = smpi_comm_size(comm);
994
995   // FIXME: check for errors
996   smpi_datatype_extent(datatype, &lb, &dataext);
997
998   // Local copy from self
999   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
1000
1001   // Send/Recv buffers to/from others;
1002   requests = xbt_new(MPI_Request, size - 1);
1003   tmpbufs = xbt_new(void *, rank);
1004   index = 0;
1005   for(other = 0; other < rank; other++) {
1006     // FIXME: possibly overkill we we have contiguous/noncontiguous data
1007     // mapping...
1008     tmpbufs[index] = xbt_malloc(count * dataext);
1009     requests[index] =
1010       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
1011                       comm);
1012     index++;
1013   }
1014   for(other = rank + 1; other < size; other++) {
1015     requests[index] =
1016       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
1017     index++;
1018   }
1019   // Wait for completion of all comms.
1020   smpi_mpi_startall(size - 1, requests);
1021   for(other = 0; other < size - 1; other++) {
1022     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
1023     if(index == MPI_UNDEFINED) {
1024       break;
1025     }
1026     if(index < rank) {
1027       // #Request is below rank: it's a irecv
1028       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
1029     }
1030   }
1031   for(index = 0; index < rank; index++) {
1032     xbt_free(tmpbufs[index]);
1033   }
1034   xbt_free(tmpbufs);
1035   xbt_free(requests);
1036 }