Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2bc6b976de6471227c81e080384f42bd92e4d4a0
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
14                                 "Logging specific to SMPI (base)");
15
16 static int match_recv(void* a, void* b, smx_action_t ignored) {
17    MPI_Request ref = (MPI_Request)a;
18    MPI_Request req = (MPI_Request)b;
19
20    xbt_assert(ref, "Cannot match recv against null reference");
21    xbt_assert(req, "Cannot match recv against null request");
22    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
23           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
24 }
25
26 static int match_send(void* a, void* b,smx_action_t ignored) {
27    MPI_Request ref = (MPI_Request)a;
28    MPI_Request req = (MPI_Request)b;
29
30    xbt_assert(ref, "Cannot match send against null reference");
31    xbt_assert(req, "Cannot match send against null request");
32    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
33           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
34 }
35
36 static MPI_Request build_request(void *buf, int count,
37                                  MPI_Datatype datatype, int src, int dst,
38                                  int tag, MPI_Comm comm, unsigned flags)
39 {
40   MPI_Request request;
41
42   request = xbt_new(s_smpi_mpi_request_t, 1);
43   request->buf = buf;
44   // FIXME: this will have to be changed to support non-contiguous datatypes
45   request->size = smpi_datatype_size(datatype) * count;
46   request->src = src;
47   request->dst = dst;
48   request->tag = tag;
49   request->comm = comm;
50   request->action = NULL;
51   request->flags = flags;
52 #ifdef HAVE_TRACING
53   request->send = 0;
54   request->recv = 0;
55 #endif
56   return request;
57 }
58
59 void smpi_action_trace_run(char *path)
60 {
61   char *name;
62   xbt_dynar_t todo;
63   xbt_dict_cursor_t cursor;
64
65   action_fp=NULL;
66   if (path) {
67     action_fp = fopen(path, "r");
68     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
69                 strerror(errno));
70   }
71
72   if (!xbt_dict_is_empty(action_queues)) {
73     XBT_WARN
74         ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
75
76
77     xbt_dict_foreach(action_queues, cursor, name, todo) {
78       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
79     }
80   }
81
82   if (path)
83     fclose(action_fp);
84   xbt_dict_free(&action_queues);
85   action_queues = xbt_dict_new_homogeneous(NULL);
86 }
87
88 static void smpi_mpi_request_free_voidp(void* request)
89 {
90   MPI_Request req = request;
91   smpi_mpi_request_free(&req);
92 }
93
94 /* MPI Low level calls */
95 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
96                                int dst, int tag, MPI_Comm comm)
97 {
98   MPI_Request request =
99       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
100                     comm, PERSISTENT | SEND);
101
102   return request;
103 }
104
105 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
106                                int src, int tag, MPI_Comm comm)
107 {
108   MPI_Request request =
109       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
110                     comm, PERSISTENT | RECV);
111
112   return request;
113 }
114
115 void smpi_mpi_start(MPI_Request request)
116 {
117   smx_rdv_t mailbox;
118   int detached = 0;
119
120   xbt_assert(!request->action,
121               "Cannot (re)start a non-finished communication");
122   if(request->flags & RECV) {
123     print_request("New recv", request);
124     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
125     mailbox = smpi_process_mailbox_small();
126     else
127     mailbox = smpi_process_mailbox();
128
129     // FIXME: SIMIX does not yet support non-contiguous datatypes
130     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
131   } else {
132     print_request("New send", request);
133
134     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
135       mailbox = smpi_process_remote_mailbox_small(
136             smpi_group_index(smpi_comm_group(request->comm), request->dst));
137     }else{
138       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
139       mailbox = smpi_process_remote_mailbox(
140                   smpi_group_index(smpi_comm_group(request->comm), request->dst));
141     }
142     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
143       void *oldbuf = request->buf;
144       detached = 1;
145       request->buf = malloc(request->size);
146       memcpy(request->buf,oldbuf,request->size);
147       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
148     }else{
149       XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
150       mailbox = smpi_process_remote_mailbox(
151                   smpi_group_index(smpi_comm_group(request->comm), request->dst));
152     }
153
154       request->action =
155       simcall_comm_isend(mailbox, request->size, -1.0,
156               request->buf, request->size,
157               &match_send,
158               &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
159               request,
160               // detach if msg size < eager/rdv switch limit
161               detached);
162
163   #ifdef HAVE_TRACING
164       /* FIXME: detached sends are not traceable (request->action == NULL) */
165       if (request->action)
166         simcall_set_category(request->action, TRACE_internal_smpi_get_category());
167   #endif
168
169     }
170
171 }
172
173 void smpi_mpi_startall(int count, MPI_Request * requests)
174 {
175     int i;
176
177   for(i = 0; i < count; i++) {
178     smpi_mpi_start(requests[i]);
179   }
180 }
181
182 void smpi_mpi_request_free(MPI_Request * request)
183 {
184   xbt_free(*request);
185   *request = MPI_REQUEST_NULL;
186 }
187
188 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
189                             int dst, int tag, MPI_Comm comm)
190 {
191   MPI_Request request =
192       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
193                     comm, NON_PERSISTENT | SEND);
194
195   return request;
196 }
197
198 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
199                            int dst, int tag, MPI_Comm comm)
200 {
201   MPI_Request request =
202       smpi_isend_init(buf, count, datatype, dst, tag, comm);
203
204   smpi_mpi_start(request);
205   return request;
206 }
207
208 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
209                             int src, int tag, MPI_Comm comm)
210 {
211   MPI_Request request =
212       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
213                     comm, NON_PERSISTENT | RECV);
214
215   return request;
216 }
217
218 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
219                            int src, int tag, MPI_Comm comm)
220 {
221   MPI_Request request =
222       smpi_irecv_init(buf, count, datatype, src, tag, comm);
223
224   smpi_mpi_start(request);
225   return request;
226 }
227
228 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
229                    int tag, MPI_Comm comm, MPI_Status * status)
230 {
231   MPI_Request request;
232
233   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
234   smpi_mpi_wait(&request, status);
235 }
236
237
238
239 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
240                    int tag, MPI_Comm comm)
241 {
242           MPI_Request request;
243
244           request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
245           smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
246 }
247
248 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
249                        int dst, int sendtag, void *recvbuf, int recvcount,
250                        MPI_Datatype recvtype, int src, int recvtag,
251                        MPI_Comm comm, MPI_Status * status)
252 {
253   MPI_Request requests[2];
254   MPI_Status stats[2];
255
256   requests[0] =
257       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
258   requests[1] =
259       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
260   smpi_mpi_startall(2, requests);
261   smpi_mpi_waitall(2, requests, stats);
262   if(status != MPI_STATUS_IGNORE) {
263     // Copy receive status
264     memcpy(status, &stats[1], sizeof(MPI_Status));
265   }
266 }
267
268 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
269 {
270   return status->count / smpi_datatype_size(datatype);
271 }
272
273 static void finish_wait(MPI_Request * request, MPI_Status * status)
274 {
275   MPI_Request req = *request;
276
277   if(status != MPI_STATUS_IGNORE) {
278     status->MPI_SOURCE = req->src;
279     status->MPI_TAG = req->tag;
280     status->MPI_ERROR = MPI_SUCCESS;
281     // FIXME: really this should just contain the count of receive-type blocks,
282     // right?
283     status->count = req->size;
284   }
285   print_request("Finishing", req);
286   if(req->flags & NON_PERSISTENT) {
287     smpi_mpi_request_free(request);
288   } else {
289     req->action = NULL;
290   }
291 }
292
293 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
294 int flag;
295
296    if ((*request)->action == NULL)
297   flag = 1;
298    else 
299     flag = simcall_comm_test((*request)->action);
300    if(flag) {
301         smpi_mpi_wait(request, status);
302     }
303     return flag;
304 }
305
306 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
307                      MPI_Status * status)
308 {
309   xbt_dynar_t comms;
310   int i, flag, size;
311   int* map;
312
313   *index = MPI_UNDEFINED;
314   flag = 0;
315   if(count > 0) {
316     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
317     map = xbt_new(int, count);
318     size = 0;
319     for(i = 0; i < count; i++) {
320       if(requests[i]->action) {
321          xbt_dynar_push(comms, &requests[i]->action);
322          map[size] = i;
323          size++;
324       }
325     }
326     if(size > 0) {
327       i = simcall_comm_testany(comms);
328       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
329       if(i != MPI_UNDEFINED) {
330         *index = map[i];
331         smpi_mpi_wait(&requests[*index], status);
332         flag = 1;
333       }
334     }
335     xbt_free(map);
336     xbt_dynar_free(&comms);
337   }
338   return flag;
339 }
340
341 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
342 {
343   print_request("Waiting", *request);
344   if ((*request)->action != NULL) { // this is not a detached send
345     simcall_comm_wait((*request)->action, -1.0);
346     finish_wait(request, status);
347   }
348   // FIXME for a detached send, finish_wait is not called:
349 }
350
351 int smpi_mpi_waitany(int count, MPI_Request requests[],
352                      MPI_Status * status)
353 {
354   xbt_dynar_t comms;
355   int i, size, index;
356   int *map;
357
358   index = MPI_UNDEFINED;
359   if(count > 0) {
360     // Wait for a request to complete
361     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
362     map = xbt_new(int, count);
363     size = 0;
364     XBT_DEBUG("Wait for one of");
365     for(i = 0; i < count; i++) {
366       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
367         print_request("Waiting any ", requests[i]);
368         xbt_dynar_push(comms, &requests[i]->action);
369         map[size] = i;
370         size++;
371       }
372     }
373     if(size > 0) {
374       i = simcall_comm_waitany(comms);
375
376       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
377       if (i != MPI_UNDEFINED) {
378         index = map[i];
379         finish_wait(&requests[index], status);
380
381       }
382     }
383     xbt_free(map);
384     xbt_dynar_free(&comms);
385   }
386   return index;
387 }
388
389 void smpi_mpi_waitall(int count, MPI_Request requests[],
390                       MPI_Status status[])
391 {
392   int index, c;
393   MPI_Status stat;
394   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
395
396   for(c = 0; c < count; c++) {
397     if(MC_IS_ENABLED) {
398       smpi_mpi_wait(&requests[c], pstat);
399       index = c;
400     } else {
401       index = smpi_mpi_waitany(count, requests, pstat);
402       if(index == MPI_UNDEFINED) {
403         break;
404       }
405     }
406     if(status != MPI_STATUS_IGNORE) {
407       memcpy(&status[index], pstat, sizeof(*pstat));
408     }
409   }
410 }
411
412 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
413                       MPI_Status status[])
414 {
415   int i, count, index;
416
417   count = 0;
418   for(i = 0; i < incount; i++) {
419      if(smpi_mpi_testany(incount, requests, &index, status)) {
420        indices[count] = index;
421        count++;
422      }
423   }
424   return count;
425 }
426
427 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
428                     MPI_Comm comm)
429 {
430   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
431   nary_tree_bcast(buf, count, datatype, root, comm, 4);
432 }
433
434 void smpi_mpi_barrier(MPI_Comm comm)
435 {
436   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
437   nary_tree_barrier(comm, 4);
438 }
439
440 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
441                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
442                      int root, MPI_Comm comm)
443 {
444   int system_tag = 666;
445   int rank, size, src, index;
446   MPI_Aint lb = 0, recvext = 0;
447   MPI_Request *requests;
448
449   rank = smpi_comm_rank(comm);
450   size = smpi_comm_size(comm);
451   if(rank != root) {
452     // Send buffer to root
453     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
454   } else {
455     // FIXME: check for errors
456     smpi_datatype_extent(recvtype, &lb, &recvext);
457     // Local copy from root
458     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
459         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
460     // Receive buffers from senders
461     requests = xbt_new(MPI_Request, size - 1);
462     index = 0;
463     for(src = 0; src < size; src++) {
464       if(src != root) {
465         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
466                                           recvcount, recvtype, 
467                                           src, system_tag, comm);
468         index++;
469       }
470     }
471     // Wait for completion of irecv's.
472     smpi_mpi_startall(size - 1, requests);
473     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
474     xbt_free(requests);
475   }
476 }
477
478 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
479                       void *recvbuf, int *recvcounts, int *displs,
480                       MPI_Datatype recvtype, int root, MPI_Comm comm)
481 {
482   int system_tag = 666;
483   int rank, size, src, index;
484   MPI_Aint lb = 0, recvext = 0;
485   MPI_Request *requests;
486
487   rank = smpi_comm_rank(comm);
488   size = smpi_comm_size(comm);
489   if(rank != root) {
490     // Send buffer to root
491     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
492   } else {
493     // FIXME: check for errors
494     smpi_datatype_extent(recvtype, &lb, &recvext);
495     // Local copy from root
496     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
497                        (char *)recvbuf + displs[root] * recvext, 
498                        recvcounts[root], recvtype);
499     // Receive buffers from senders
500     requests = xbt_new(MPI_Request, size - 1);
501     index = 0;
502     for(src = 0; src < size; src++) {
503       if(src != root) {
504         requests[index] =
505             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
506                             recvcounts[src], recvtype, src, system_tag, comm);
507         index++;
508       }
509     }
510     // Wait for completion of irecv's.
511     smpi_mpi_startall(size - 1, requests);
512     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
513     xbt_free(requests);
514   }
515 }
516
517 void smpi_mpi_allgather(void *sendbuf, int sendcount,
518                         MPI_Datatype sendtype, void *recvbuf,
519                         int recvcount, MPI_Datatype recvtype,
520                         MPI_Comm comm)
521 {
522   int system_tag = 666;
523   int rank, size, other, index;
524   MPI_Aint lb = 0, recvext = 0;
525   MPI_Request *requests;
526
527   rank = smpi_comm_rank(comm);
528   size = smpi_comm_size(comm);
529   // FIXME: check for errors
530   smpi_datatype_extent(recvtype, &lb, &recvext);
531   // Local copy from self
532   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
533                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
534                      recvtype);
535   // Send/Recv buffers to/from others;
536   requests = xbt_new(MPI_Request, 2 * (size - 1));
537   index = 0;
538   for(other = 0; other < size; other++) {
539     if(other != rank) {
540       requests[index] =
541           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
542                           comm);
543       index++;
544       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
545                                         recvcount, recvtype, other, 
546                                         system_tag, comm);
547       index++;
548     }
549   }
550   // Wait for completion of all comms.
551   smpi_mpi_startall(2 * (size - 1), requests);
552   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
553   xbt_free(requests);
554 }
555
556 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
557                          MPI_Datatype sendtype, void *recvbuf,
558                          int *recvcounts, int *displs,
559                          MPI_Datatype recvtype, MPI_Comm comm)
560 {
561   int system_tag = 666;
562   int rank, size, other, index;
563   MPI_Aint lb = 0, recvext = 0;
564   MPI_Request *requests;
565
566   rank = smpi_comm_rank(comm);
567   size = smpi_comm_size(comm);
568   // FIXME: check for errors
569   smpi_datatype_extent(recvtype, &lb, &recvext);
570   // Local copy from self
571   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
572                      (char *)recvbuf + displs[rank] * recvext, 
573                      recvcounts[rank], recvtype);
574   // Send buffers to others;
575   requests = xbt_new(MPI_Request, 2 * (size - 1));
576   index = 0;
577   for(other = 0; other < size; other++) {
578     if(other != rank) {
579       requests[index] =
580           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
581                           comm);
582       index++;
583       requests[index] =
584           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
585                           recvtype, other, system_tag, comm);
586       index++;
587     }
588   }
589   // Wait for completion of all comms.
590   smpi_mpi_startall(2 * (size - 1), requests);
591   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
592   xbt_free(requests);
593 }
594
595 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
596                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
597                       int root, MPI_Comm comm)
598 {
599   int system_tag = 666;
600   int rank, size, dst, index;
601   MPI_Aint lb = 0, sendext = 0;
602   MPI_Request *requests;
603
604   rank = smpi_comm_rank(comm);
605   size = smpi_comm_size(comm);
606   if(rank != root) {
607     // Recv buffer from root
608     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
609                   MPI_STATUS_IGNORE);
610   } else {
611     // FIXME: check for errors
612     smpi_datatype_extent(sendtype, &lb, &sendext);
613     // Local copy from root
614     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
615       sendcount, sendtype, recvbuf, recvcount, recvtype);
616     // Send buffers to receivers
617     requests = xbt_new(MPI_Request, size - 1);
618     index = 0;
619     for(dst = 0; dst < size; dst++) {
620       if(dst != root) {
621         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
622                                           sendcount, sendtype, dst,
623                                           system_tag, comm);
624         index++;
625       }
626     }
627     // Wait for completion of isend's.
628     smpi_mpi_startall(size - 1, requests);
629     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
630     xbt_free(requests);
631   }
632 }
633
634 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
635                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
636                        MPI_Datatype recvtype, int root, MPI_Comm comm)
637 {
638   int system_tag = 666;
639   int rank, size, dst, index;
640   MPI_Aint lb = 0, sendext = 0;
641   MPI_Request *requests;
642
643   rank = smpi_comm_rank(comm);
644   size = smpi_comm_size(comm);
645   if(rank != root) {
646     // Recv buffer from root
647     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
648                   MPI_STATUS_IGNORE);
649   } else {
650     // FIXME: check for errors
651     smpi_datatype_extent(sendtype, &lb, &sendext);
652     // Local copy from root
653     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
654                        sendtype, recvbuf, recvcount, recvtype);
655     // Send buffers to receivers
656     requests = xbt_new(MPI_Request, size - 1);
657     index = 0;
658     for(dst = 0; dst < size; dst++) {
659       if(dst != root) {
660         requests[index] =
661             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
662                             sendtype, dst, system_tag, comm);
663         index++;
664       }
665     }
666     // Wait for completion of isend's.
667     smpi_mpi_startall(size - 1, requests);
668     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
669     xbt_free(requests);
670   }
671 }
672
673 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
674                      MPI_Datatype datatype, MPI_Op op, int root,
675                      MPI_Comm comm)
676 {
677   int system_tag = 666;
678   int rank, size, src, index;
679   MPI_Aint lb = 0, dataext = 0;
680   MPI_Request *requests;
681   void **tmpbufs;
682
683   rank = smpi_comm_rank(comm);
684   size = smpi_comm_size(comm);
685   if(rank != root) {
686     // Send buffer to root
687     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
688   } else {
689     // FIXME: check for errors
690     smpi_datatype_extent(datatype, &lb, &dataext);
691     // Local copy from root
692     smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
693     // Receive buffers from senders
694     //TODO: make a MPI_barrier here ?
695     requests = xbt_new(MPI_Request, size - 1);
696     tmpbufs = xbt_new(void *, size - 1);
697     index = 0;
698     for(src = 0; src < size; src++) {
699       if(src != root) {
700         // FIXME: possibly overkill we we have contiguous/noncontiguous data
701         //  mapping...
702         tmpbufs[index] = xbt_malloc(count * dataext);
703         requests[index] =
704             smpi_irecv_init(tmpbufs[index], count, datatype, src,
705                             system_tag, comm);
706         index++;
707       }
708     }
709     // Wait for completion of irecv's.
710     smpi_mpi_startall(size - 1, requests);
711     for(src = 0; src < size - 1; src++) {
712       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
713       XBT_VERB("finished waiting any request with index %d", index);
714       if(index == MPI_UNDEFINED) {
715         break;
716       }
717       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
718     }
719     for(index = 0; index < size - 1; index++) {
720       xbt_free(tmpbufs[index]);
721     }
722     xbt_free(tmpbufs);
723     xbt_free(requests);
724   }
725 }
726
727 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
728                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
729 {
730   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
731   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
732 }
733
734 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
735                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
736 {
737   int system_tag = 666;
738   int rank, size, other, index;
739   MPI_Aint lb = 0, dataext = 0;
740   MPI_Request *requests;
741   void **tmpbufs;
742
743   rank = smpi_comm_rank(comm);
744   size = smpi_comm_size(comm);
745
746   // FIXME: check for errors
747   smpi_datatype_extent(datatype, &lb, &dataext);
748
749   // Local copy from self
750   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
751
752   // Send/Recv buffers to/from others;
753   requests = xbt_new(MPI_Request, size - 1);
754   tmpbufs = xbt_new(void *, rank);
755   index = 0;
756   for(other = 0; other < rank; other++) {
757     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
758     // mapping...
759     tmpbufs[index] = xbt_malloc(count * dataext);
760     requests[index] =
761         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
762                         comm);
763     index++;
764   }
765   for(other = rank + 1; other < size; other++) {
766     requests[index] =
767         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
768     index++;
769   }
770   // Wait for completion of all comms.
771   smpi_mpi_startall(size - 1, requests);
772   for(other = 0; other < size - 1; other++) {
773     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
774     if(index == MPI_UNDEFINED) {
775       break;
776     }
777     if(index < rank) {
778       // #Request is below rank: it's a irecv
779       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
780     }
781   }
782   for(index = 0; index < rank; index++) {
783     xbt_free(tmpbufs[index]);
784   }
785   xbt_free(tmpbufs);
786   xbt_free(requests);
787 }