Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[smpi] add a gestion of non-contignous data
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "surf/surf.h"
13
14
15 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
16
17
18 static int match_recv(void* a, void* b, smx_action_t ignored) {
19   MPI_Request ref = (MPI_Request)a;
20   MPI_Request req = (MPI_Request)b;
21
22   xbt_assert(ref, "Cannot match recv against null reference");
23   xbt_assert(req, "Cannot match recv against null request");
24   return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
25     && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
26 }
27
28 static int match_send(void* a, void* b,smx_action_t ignored) {
29   MPI_Request ref = (MPI_Request)a;
30   MPI_Request req = (MPI_Request)b;
31
32   xbt_assert(ref, "Cannot match send against null reference");
33   xbt_assert(req, "Cannot match send against null request");
34   return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
35     && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
36 }
37
38 static MPI_Request build_request(void *buf, int count,
39                                  MPI_Datatype datatype, int src, int dst,
40                                  int tag, MPI_Comm comm, unsigned flags)
41 {
42   MPI_Request request;
43
44   void *old_buf;
45
46   request = xbt_new(s_smpi_mpi_request_t, 1);
47
48   s_smpi_subtype_t *subtype = datatype->substruct;
49
50   if(datatype->has_subtype == 1){
51     // This part handles the problem of non-contignous memory
52     old_buf = buf;
53     buf = malloc(count*smpi_datatype_size(datatype));
54     if (flags & SEND) {
55       subtype->serialize(old_buf, buf, count, datatype->substruct);
56     }
57   }
58
59   request->buf = buf;
60   // This part handles the problem of non-contignous memory (for the
61   // unserialisation at the reception)
62   request->old_buf = old_buf;
63   request->old_type = datatype;
64
65   request->size = smpi_datatype_size(datatype) * count;
66   request->src = src;
67   request->dst = dst;
68   request->tag = tag;
69   request->comm = comm;
70   request->action = NULL;
71   request->flags = flags;
72 #ifdef HAVE_TRACING
73   request->send = 0;
74   request->recv = 0;
75 #endif
76   return request;
77 }
78
79 void smpi_action_trace_run(char *path)
80 {
81   char *name;
82   xbt_dynar_t todo;
83   xbt_dict_cursor_t cursor;
84
85   action_fp=NULL;
86   if (path) {
87     action_fp = fopen(path, "r");
88     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
89                strerror(errno));
90   }
91
92   if (!xbt_dict_is_empty(action_queues)) {
93     XBT_WARN
94       ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
95
96
97     xbt_dict_foreach(action_queues, cursor, name, todo) {
98       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
99     }
100   }
101
102   if (path)
103     fclose(action_fp);
104   xbt_dict_free(&action_queues);
105   action_queues = xbt_dict_new_homogeneous(NULL);
106 }
107
108 static void smpi_mpi_request_free_voidp(void* request)
109 {
110   MPI_Request req = request;
111   smpi_mpi_request_free(&req);
112 }
113
114 /* MPI Low level calls */
115 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
116                                int dst, int tag, MPI_Comm comm)
117 {
118   MPI_Request request =
119     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
120                   comm, PERSISTENT | SEND);
121
122   return request;
123 }
124
125 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
126                                int src, int tag, MPI_Comm comm)
127 {
128   MPI_Request request =
129     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
130                   comm, PERSISTENT | RECV);
131
132   return request;
133 }
134
135 void smpi_mpi_start(MPI_Request request)
136 {
137   smx_rdv_t mailbox;
138   int detached = 0;
139
140   xbt_assert(!request->action,
141              "Cannot (re)start a non-finished communication");
142   if(request->flags & RECV) {
143     print_request("New recv", request);
144     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
145       mailbox = smpi_process_mailbox_small();
146     else
147       mailbox = smpi_process_mailbox();
148
149     // FIXME: SIMIX does not yet support non-contiguous datatypes
150     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
151   } else {
152     print_request("New send", request);
153
154     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
155       mailbox = smpi_process_remote_mailbox_small(
156                                                   smpi_group_index(smpi_comm_group(request->comm), request->dst));
157     }else{
158       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
159       mailbox = smpi_process_remote_mailbox(
160                                             smpi_group_index(smpi_comm_group(request->comm), request->dst));
161     }
162     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
163       void *oldbuf = request->buf;
164       detached = 1;
165       request->buf = malloc(request->size);
166       if (oldbuf)
167         memcpy(request->buf,oldbuf,request->size);
168       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
169     }
170
171     request->action =
172       simcall_comm_isend(mailbox, request->size, -1.0,
173                          request->buf, request->size,
174                          &match_send,
175                          &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
176                          request,
177                          // detach if msg size < eager/rdv switch limit
178                          detached);
179
180 #ifdef HAVE_TRACING
181     /* FIXME: detached sends are not traceable (request->action == NULL) */
182     if (request->action)
183       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
184 #endif
185
186   }
187
188 }
189
190 void smpi_mpi_startall(int count, MPI_Request * requests)
191 {
192   int i;
193
194   for(i = 0; i < count; i++) {
195     smpi_mpi_start(requests[i]);
196   }
197 }
198
199 void smpi_mpi_request_free(MPI_Request * request)
200 {
201   xbt_free(*request);
202   *request = MPI_REQUEST_NULL;
203 }
204
205 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
206                             int dst, int tag, MPI_Comm comm)
207 {
208   MPI_Request request =
209     build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
210                   comm, NON_PERSISTENT | SEND);
211
212   return request;
213 }
214
215 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
216                            int dst, int tag, MPI_Comm comm)
217 {
218   MPI_Request request =
219     smpi_isend_init(buf, count, datatype, dst, tag, comm);
220
221   smpi_mpi_start(request);
222   return request;
223 }
224
225 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
226                             int src, int tag, MPI_Comm comm)
227 {
228   MPI_Request request =
229     build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
230                   comm, NON_PERSISTENT | RECV);
231   return request;
232 }
233
234 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
235                            int src, int tag, MPI_Comm comm)
236 {
237   MPI_Request request =
238     smpi_irecv_init(buf, count, datatype, src, tag, comm);
239
240   smpi_mpi_start(request);
241   return request;
242 }
243
244 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
245                    int tag, MPI_Comm comm, MPI_Status * status)
246 {
247   MPI_Request request;
248   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
249   smpi_mpi_wait(&request, status);
250 }
251
252
253
254 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
255                    int tag, MPI_Comm comm)
256 {
257   MPI_Request request;
258
259   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
260   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
261 }
262
263 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
264                        int dst, int sendtag, void *recvbuf, int recvcount,
265                        MPI_Datatype recvtype, int src, int recvtag,
266                        MPI_Comm comm, MPI_Status * status)
267 {
268   MPI_Request requests[2];
269   MPI_Status stats[2];
270
271   requests[0] =
272     smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
273   requests[1] =
274     smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
275   smpi_mpi_startall(2, requests);
276   smpi_mpi_waitall(2, requests, stats);
277   if(status != MPI_STATUS_IGNORE) {
278     // Copy receive status
279     memcpy(status, &stats[1], sizeof(MPI_Status));
280   }
281 }
282
283 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
284 {
285   return status->count / smpi_datatype_size(datatype);
286 }
287
288 static void finish_wait(MPI_Request * request, MPI_Status * status)
289 {
290   MPI_Request req = *request;
291   // if we have a sender, we should use its data, and not the data from the receive
292   if((req->action)&&
293      (req->src==MPI_ANY_SOURCE || req->tag== MPI_ANY_TAG))
294     req = (MPI_Request)SIMIX_comm_get_src_data((*request)->action);
295
296   if(status != MPI_STATUS_IGNORE) {
297     status->MPI_SOURCE = req->src;
298     status->MPI_TAG = req->tag;
299     status->MPI_ERROR = MPI_SUCCESS;
300     // FIXME: really this should just contain the count of receive-type blocks,
301     // right?
302     status->count = req->size;
303   }
304   req = *request;
305
306   print_request("Finishing", req);
307   MPI_Datatype datatype = req->old_type;
308   if(datatype->has_subtype == 1){
309       // This part handles the problem of non-contignous memory
310       // the unserialization at the reception
311     s_smpi_subtype_t *subtype = datatype->substruct;
312     if(req->flags & RECV) {
313       subtype->unserialize(req->buf, req->old_buf, req->size/smpi_datatype_size(datatype) , datatype->substruct);
314     }
315     //FIXME: I am not sure that if the send is detached we have to free
316     //the sender buffer thus I do it only for the reciever
317     if(req->flags & RECV) free(req->buf);
318   }
319
320   if(req->flags & NON_PERSISTENT) {
321     smpi_mpi_request_free(request);
322   } else {
323     req->action = NULL;
324   }
325 }
326
327 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
328   int flag;
329
330   if ((*request)->action == NULL)
331     flag = 1;
332   else
333     flag = simcall_comm_test((*request)->action);
334   if(flag) {
335     finish_wait(request, status);
336   }
337   return flag;
338 }
339
340 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
341                      MPI_Status * status)
342 {
343   xbt_dynar_t comms;
344   int i, flag, size;
345   int* map;
346
347   *index = MPI_UNDEFINED;
348   flag = 0;
349   if(count > 0) {
350     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
351     map = xbt_new(int, count);
352     size = 0;
353     for(i = 0; i < count; i++) {
354       if(requests[i]->action) {
355         xbt_dynar_push(comms, &requests[i]->action);
356         map[size] = i;
357         size++;
358       }
359     }
360     if(size > 0) {
361       i = simcall_comm_testany(comms);
362       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
363       if(i != MPI_UNDEFINED) {
364         *index = map[i];
365         smpi_mpi_wait(&requests[*index], status);
366         flag = 1;
367       }
368     }
369     xbt_free(map);
370     xbt_dynar_free(&comms);
371   }
372   return flag;
373 }
374
375
376 int smpi_mpi_testall(int count, MPI_Request requests[],
377                      MPI_Status status[])
378 {
379   int flag=1;
380   int i;
381   for(i=0; i<count; i++)
382     if (smpi_mpi_test(&requests[i], &status[i])!=1)
383       flag=0;
384
385   return flag;
386 }
387
388 void smpi_mpi_probe(int source, int tag, MPI_Comm comm, MPI_Status* status){
389   int flag=0;
390   //FIXME find another wait to avoid busy waiting ?
391   // the issue here is that we have to wait on a nonexistent comm
392   MPI_Request request;
393   while(flag==0){
394     request = smpi_mpi_iprobe(source, tag, comm, &flag, status);
395     XBT_DEBUG("Busy Waiting on probing : %d", flag);
396     if(!flag) {
397       smpi_mpi_request_free(&request);
398       simcall_process_sleep(0.0001);
399     }
400   }
401 }
402
403 MPI_Request smpi_mpi_iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* status){
404   MPI_Request request = build_request(NULL, 0, MPI_CHAR, source, smpi_comm_rank(comm), tag,
405                                      comm, NON_PERSISTENT | RECV);
406   // behave like a receive, but don't do it
407   smx_rdv_t mailbox;
408
409   print_request("New iprobe", request);
410   // We have to test both mailboxes as we don't know if we will receive one one or another
411   if (xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")>0){
412     mailbox = smpi_process_mailbox_small();
413     request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
414
415   }
416   if (request->action==NULL){
417     mailbox = smpi_process_mailbox();
418     request->action = simcall_comm_iprobe(mailbox, request->src, request->tag, &match_recv, (void*)request);
419   }
420
421   if(request->action){
422     MPI_Request req = (MPI_Request)SIMIX_comm_get_src_data(request->action);
423     *flag=true;
424     if(status != MPI_STATUS_IGNORE) {
425       status->MPI_SOURCE = req->src;
426       status->MPI_TAG = req->tag;
427       status->MPI_ERROR = MPI_SUCCESS;
428       status->count = req->size;
429     }
430   }
431   else *flag=false;
432
433   return request;
434 }
435
436 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
437 {
438   print_request("Waiting", *request);
439   if ((*request)->action != NULL) { // this is not a detached send
440     simcall_comm_wait((*request)->action, -1.0);
441     finish_wait(request, status);
442   }
443   // FIXME for a detached send, finish_wait is not called:
444 }
445
446 int smpi_mpi_waitany(int count, MPI_Request requests[],
447                      MPI_Status * status)
448 {
449   xbt_dynar_t comms;
450   int i, size, index;
451   int *map;
452
453   index = MPI_UNDEFINED;
454   if(count > 0) {
455     // Wait for a request to complete
456     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
457     map = xbt_new(int, count);
458     size = 0;
459     XBT_DEBUG("Wait for one of");
460     for(i = 0; i < count; i++) {
461       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
462         print_request("Waiting any ", requests[i]);
463         xbt_dynar_push(comms, &requests[i]->action);
464         map[size] = i;
465         size++;
466       }
467     }
468     if(size > 0) {
469       i = simcall_comm_waitany(comms);
470
471       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
472       if (i != MPI_UNDEFINED) {
473         index = map[i];
474         finish_wait(&requests[index], status);
475
476       }
477     }
478     xbt_free(map);
479     xbt_dynar_free(&comms);
480   }
481   return index;
482 }
483
484 void smpi_mpi_waitall(int count, MPI_Request requests[],
485                       MPI_Status status[])
486 {
487   int index, c;
488   MPI_Status stat;
489   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
490
491   for(c = 0; c < count; c++) {
492     if(MC_IS_ENABLED) {
493       smpi_mpi_wait(&requests[c], pstat);
494       index = c;
495     } else {
496       index = smpi_mpi_waitany(count, requests, pstat);
497       if(index == MPI_UNDEFINED) {
498         break;
499       }
500     }
501     if(status != MPI_STATUS_IGNORE) {
502       memcpy(&status[index], pstat, sizeof(*pstat));
503     }
504   }
505 }
506
507 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
508                       MPI_Status status[])
509 {
510   int i, count, index;
511
512   count = 0;
513   for(i = 0; i < incount; i++) {
514     if(smpi_mpi_testany(incount, requests, &index, status)) {
515       indices[count] = index;
516       count++;
517     }
518   }
519   return count;
520 }
521
522 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
523                     MPI_Comm comm)
524 {
525   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
526   nary_tree_bcast(buf, count, datatype, root, comm, 4);
527 }
528
529 void smpi_mpi_barrier(MPI_Comm comm)
530 {
531   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
532   nary_tree_barrier(comm, 4);
533 }
534
535 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
536                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
537                      int root, MPI_Comm comm)
538 {
539   int system_tag = 666;
540   int rank, size, src, index;
541   MPI_Aint lb = 0, recvext = 0;
542   MPI_Request *requests;
543
544   rank = smpi_comm_rank(comm);
545   size = smpi_comm_size(comm);
546   if(rank != root) {
547     // Send buffer to root
548     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
549   } else {
550     // FIXME: check for errors
551     smpi_datatype_extent(recvtype, &lb, &recvext);
552     // Local copy from root
553     smpi_datatype_copy(sendbuf, sendcount, sendtype,
554                        (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
555     // Receive buffers from senders
556     requests = xbt_new(MPI_Request, size - 1);
557     index = 0;
558     for(src = 0; src < size; src++) {
559       if(src != root) {
560         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext,
561                                           recvcount, recvtype,
562                                           src, system_tag, comm);
563         index++;
564       }
565     }
566     // Wait for completion of irecv's.
567     smpi_mpi_startall(size - 1, requests);
568     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
569     xbt_free(requests);
570   }
571 }
572
573 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
574                       void *recvbuf, int *recvcounts, int *displs,
575                       MPI_Datatype recvtype, int root, MPI_Comm comm)
576 {
577   int system_tag = 666;
578   int rank, size, src, index;
579   MPI_Aint lb = 0, recvext = 0;
580   MPI_Request *requests;
581
582   rank = smpi_comm_rank(comm);
583   size = smpi_comm_size(comm);
584   if(rank != root) {
585     // Send buffer to root
586     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
587   } else {
588     // FIXME: check for errors
589     smpi_datatype_extent(recvtype, &lb, &recvext);
590     // Local copy from root
591     smpi_datatype_copy(sendbuf, sendcount, sendtype,
592                        (char *)recvbuf + displs[root] * recvext,
593                        recvcounts[root], recvtype);
594     // Receive buffers from senders
595     requests = xbt_new(MPI_Request, size - 1);
596     index = 0;
597     for(src = 0; src < size; src++) {
598       if(src != root) {
599         requests[index] =
600           smpi_irecv_init((char *)recvbuf + displs[src] * recvext,
601                           recvcounts[src], recvtype, src, system_tag, comm);
602         index++;
603       }
604     }
605     // Wait for completion of irecv's.
606     smpi_mpi_startall(size - 1, requests);
607     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
608     xbt_free(requests);
609   }
610 }
611
612 void smpi_mpi_allgather(void *sendbuf, int sendcount,
613                         MPI_Datatype sendtype, void *recvbuf,
614                         int recvcount, MPI_Datatype recvtype,
615                         MPI_Comm comm)
616 {
617   int system_tag = 666;
618   int rank, size, other, index;
619   MPI_Aint lb = 0, recvext = 0;
620   MPI_Request *requests;
621
622   rank = smpi_comm_rank(comm);
623   size = smpi_comm_size(comm);
624   // FIXME: check for errors
625   smpi_datatype_extent(recvtype, &lb, &recvext);
626   // Local copy from self
627   smpi_datatype_copy(sendbuf, sendcount, sendtype,
628                      (char *)recvbuf + rank * recvcount * recvext, recvcount,
629                      recvtype);
630   // Send/Recv buffers to/from others;
631   requests = xbt_new(MPI_Request, 2 * (size - 1));
632   index = 0;
633   for(other = 0; other < size; other++) {
634     if(other != rank) {
635       requests[index] =
636         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
637                         comm);
638       index++;
639       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext,
640                                         recvcount, recvtype, other,
641                                         system_tag, comm);
642       index++;
643     }
644   }
645   // Wait for completion of all comms.
646   smpi_mpi_startall(2 * (size - 1), requests);
647   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
648   xbt_free(requests);
649 }
650
651 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
652                          MPI_Datatype sendtype, void *recvbuf,
653                          int *recvcounts, int *displs,
654                          MPI_Datatype recvtype, MPI_Comm comm)
655 {
656   int system_tag = 666;
657   int rank, size, other, index;
658   MPI_Aint lb = 0, recvext = 0;
659   MPI_Request *requests;
660
661   rank = smpi_comm_rank(comm);
662   size = smpi_comm_size(comm);
663   // FIXME: check for errors
664   smpi_datatype_extent(recvtype, &lb, &recvext);
665   // Local copy from self
666   smpi_datatype_copy(sendbuf, sendcount, sendtype,
667                      (char *)recvbuf + displs[rank] * recvext,
668                      recvcounts[rank], recvtype);
669   // Send buffers to others;
670   requests = xbt_new(MPI_Request, 2 * (size - 1));
671   index = 0;
672   for(other = 0; other < size; other++) {
673     if(other != rank) {
674       requests[index] =
675         smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
676                         comm);
677       index++;
678       requests[index] =
679         smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
680                         recvtype, other, system_tag, comm);
681       index++;
682     }
683   }
684   // Wait for completion of all comms.
685   smpi_mpi_startall(2 * (size - 1), requests);
686   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
687   xbt_free(requests);
688 }
689
690 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
691                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
692                       int root, MPI_Comm comm)
693 {
694   int system_tag = 666;
695   int rank, size, dst, index;
696   MPI_Aint lb = 0, sendext = 0;
697   MPI_Request *requests;
698
699   rank = smpi_comm_rank(comm);
700   size = smpi_comm_size(comm);
701   if(rank != root) {
702     // Recv buffer from root
703     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
704                   MPI_STATUS_IGNORE);
705   } else {
706     // FIXME: check for errors
707     smpi_datatype_extent(sendtype, &lb, &sendext);
708     // Local copy from root
709     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
710                        sendcount, sendtype, recvbuf, recvcount, recvtype);
711     // Send buffers to receivers
712     requests = xbt_new(MPI_Request, size - 1);
713     index = 0;
714     for(dst = 0; dst < size; dst++) {
715       if(dst != root) {
716         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext,
717                                           sendcount, sendtype, dst,
718                                           system_tag, comm);
719         index++;
720       }
721     }
722     // Wait for completion of isend's.
723     smpi_mpi_startall(size - 1, requests);
724     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
725     xbt_free(requests);
726   }
727 }
728
729 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
730                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
731                        MPI_Datatype recvtype, int root, MPI_Comm comm)
732 {
733   int system_tag = 666;
734   int rank, size, dst, index;
735   MPI_Aint lb = 0, sendext = 0;
736   MPI_Request *requests;
737
738   rank = smpi_comm_rank(comm);
739   size = smpi_comm_size(comm);
740   if(rank != root) {
741     // Recv buffer from root
742     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
743                   MPI_STATUS_IGNORE);
744   } else {
745     // FIXME: check for errors
746     smpi_datatype_extent(sendtype, &lb, &sendext);
747     // Local copy from root
748     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root],
749                        sendtype, recvbuf, recvcount, recvtype);
750     // Send buffers to receivers
751     requests = xbt_new(MPI_Request, size - 1);
752     index = 0;
753     for(dst = 0; dst < size; dst++) {
754       if(dst != root) {
755         requests[index] =
756           smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst],
757                           sendtype, dst, system_tag, comm);
758         index++;
759       }
760     }
761     // Wait for completion of isend's.
762     smpi_mpi_startall(size - 1, requests);
763     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
764     xbt_free(requests);
765   }
766 }
767
768 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
769                      MPI_Datatype datatype, MPI_Op op, int root,
770                      MPI_Comm comm)
771 {
772   int system_tag = 666;
773   int rank, size, src, index;
774   MPI_Aint lb = 0, dataext = 0;
775   MPI_Request *requests;
776   void **tmpbufs;
777
778   rank = smpi_comm_rank(comm);
779   size = smpi_comm_size(comm);
780   if(rank != root) {
781     // Send buffer to root
782     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
783   } else {
784     // FIXME: check for errors
785     smpi_datatype_extent(datatype, &lb, &dataext);
786     // Local copy from root
787     if (sendbuf && recvbuf)
788       smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
789     // Receive buffers from senders
790     //TODO: make a MPI_barrier here ?
791     requests = xbt_new(MPI_Request, size - 1);
792     tmpbufs = xbt_new(void *, size - 1);
793     index = 0;
794     for(src = 0; src < size; src++) {
795       if(src != root) {
796         // FIXME: possibly overkill we we have contiguous/noncontiguous data
797         //  mapping...
798         tmpbufs[index] = xbt_malloc(count * dataext);
799         requests[index] =
800           smpi_irecv_init(tmpbufs[index], count, datatype, src,
801                           system_tag, comm);
802         index++;
803       }
804     }
805     // Wait for completion of irecv's.
806     smpi_mpi_startall(size - 1, requests);
807     for(src = 0; src < size - 1; src++) {
808       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
809       XBT_VERB("finished waiting any request with index %d", index);
810       if(index == MPI_UNDEFINED) {
811         break;
812       }
813       if(op) /* op can be MPI_OP_NULL that does nothing */
814         smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
815     }
816     for(index = 0; index < size - 1; index++) {
817       xbt_free(tmpbufs[index]);
818     }
819     xbt_free(tmpbufs);
820     xbt_free(requests);
821   }
822 }
823
824 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
825                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
826 {
827   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
828   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
829 }
830
831 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
832                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
833 {
834   int system_tag = 666;
835   int rank, size, other, index;
836   MPI_Aint lb = 0, dataext = 0;
837   MPI_Request *requests;
838   void **tmpbufs;
839
840   rank = smpi_comm_rank(comm);
841   size = smpi_comm_size(comm);
842
843   // FIXME: check for errors
844   smpi_datatype_extent(datatype, &lb, &dataext);
845
846   // Local copy from self
847   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
848
849   // Send/Recv buffers to/from others;
850   requests = xbt_new(MPI_Request, size - 1);
851   tmpbufs = xbt_new(void *, rank);
852   index = 0;
853   for(other = 0; other < rank; other++) {
854     // FIXME: possibly overkill we we have contiguous/noncontiguous data
855     // mapping...
856     tmpbufs[index] = xbt_malloc(count * dataext);
857     requests[index] =
858       smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
859                       comm);
860     index++;
861   }
862   for(other = rank + 1; other < size; other++) {
863     requests[index] =
864       smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
865     index++;
866   }
867   // Wait for completion of all comms.
868   smpi_mpi_startall(size - 1, requests);
869   for(other = 0; other < size - 1; other++) {
870     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
871     if(index == MPI_UNDEFINED) {
872       break;
873     }
874     if(index < rank) {
875       // #Request is below rank: it's a irecv
876       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
877     }
878   }
879   for(index = 0; index < rank; index++) {
880     xbt_free(tmpbufs[index]);
881   }
882   xbt_free(tmpbufs);
883   xbt_free(requests);
884 }