Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add missing includes
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12 #include "surf/surf.h"
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
15                                 "Logging specific to SMPI (base)");
16
17 static int match_recv(void* a, void* b, smx_action_t ignored) {
18    MPI_Request ref = (MPI_Request)a;
19    MPI_Request req = (MPI_Request)b;
20
21    xbt_assert(ref, "Cannot match recv against null reference");
22    xbt_assert(req, "Cannot match recv against null request");
23    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
24           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
25 }
26
27 static int match_send(void* a, void* b,smx_action_t ignored) {
28    MPI_Request ref = (MPI_Request)a;
29    MPI_Request req = (MPI_Request)b;
30
31    xbt_assert(ref, "Cannot match send against null reference");
32    xbt_assert(req, "Cannot match send against null request");
33    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
34           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
35 }
36
37 static MPI_Request build_request(void *buf, int count,
38                                  MPI_Datatype datatype, int src, int dst,
39                                  int tag, MPI_Comm comm, unsigned flags)
40 {
41   MPI_Request request;
42
43   request = xbt_new(s_smpi_mpi_request_t, 1);
44   request->buf = buf;
45   // FIXME: this will have to be changed to support non-contiguous datatypes
46   request->size = smpi_datatype_size(datatype) * count;
47   request->src = src;
48   request->dst = dst;
49   request->tag = tag;
50   request->comm = comm;
51   request->action = NULL;
52   request->flags = flags;
53 #ifdef HAVE_TRACING
54   request->send = 0;
55   request->recv = 0;
56 #endif
57   return request;
58 }
59
60 void smpi_action_trace_run(char *path)
61 {
62   char *name;
63   xbt_dynar_t todo;
64   xbt_dict_cursor_t cursor;
65
66   action_fp=NULL;
67   if (path) {
68     action_fp = fopen(path, "r");
69     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
70                 strerror(errno));
71   }
72
73   if (!xbt_dict_is_empty(action_queues)) {
74     XBT_WARN
75         ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
76
77
78     xbt_dict_foreach(action_queues, cursor, name, todo) {
79       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
80     }
81   }
82
83   if (path)
84     fclose(action_fp);
85   xbt_dict_free(&action_queues);
86   action_queues = xbt_dict_new_homogeneous(NULL);
87 }
88
89 static void smpi_mpi_request_free_voidp(void* request)
90 {
91   MPI_Request req = request;
92   smpi_mpi_request_free(&req);
93 }
94
95 /* MPI Low level calls */
96 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
97                                int dst, int tag, MPI_Comm comm)
98 {
99   MPI_Request request =
100       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
101                     comm, PERSISTENT | SEND);
102
103   return request;
104 }
105
106 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
107                                int src, int tag, MPI_Comm comm)
108 {
109   MPI_Request request =
110       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
111                     comm, PERSISTENT | RECV);
112
113   return request;
114 }
115
116 void smpi_mpi_start(MPI_Request request)
117 {
118   smx_rdv_t mailbox;
119   int detached = 0;
120
121   xbt_assert(!request->action,
122               "Cannot (re)start a non-finished communication");
123   if(request->flags & RECV) {
124     print_request("New recv", request);
125     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres"))
126     mailbox = smpi_process_mailbox_small();
127     else
128     mailbox = smpi_process_mailbox();
129
130     // FIXME: SIMIX does not yet support non-contiguous datatypes
131     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
132   } else {
133     print_request("New send", request);
134
135     if (request->size < xbt_cfg_get_int(_surf_cfg_set, "smpi/async_small_thres")) { // eager mode => detached send (FIXME: this limit should be configurable)
136       mailbox = smpi_process_remote_mailbox_small(
137             smpi_group_index(smpi_comm_group(request->comm), request->dst));
138     }else{
139       XBT_DEBUG("Send request %p is not in the permanent receive mailbox (buf: %p)",request,request->buf);
140       mailbox = smpi_process_remote_mailbox(
141                   smpi_group_index(smpi_comm_group(request->comm), request->dst));
142     }
143     if (request->size < 64*1024 ) { //(FIXME: this limit should be configurable)
144       void *oldbuf = request->buf;
145       detached = 1;
146       request->buf = malloc(request->size);
147       memcpy(request->buf,oldbuf,request->size);
148       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
149     }else{
150       XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
151       mailbox = smpi_process_remote_mailbox(
152                   smpi_group_index(smpi_comm_group(request->comm), request->dst));
153     }
154
155       request->action =
156       simcall_comm_isend(mailbox, request->size, -1.0,
157               request->buf, request->size,
158               &match_send,
159               &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
160               request,
161               // detach if msg size < eager/rdv switch limit
162               detached);
163
164   #ifdef HAVE_TRACING
165       /* FIXME: detached sends are not traceable (request->action == NULL) */
166       if (request->action)
167         simcall_set_category(request->action, TRACE_internal_smpi_get_category());
168   #endif
169
170     }
171
172 }
173
174 void smpi_mpi_startall(int count, MPI_Request * requests)
175 {
176     int i;
177
178   for(i = 0; i < count; i++) {
179     smpi_mpi_start(requests[i]);
180   }
181 }
182
183 void smpi_mpi_request_free(MPI_Request * request)
184 {
185   xbt_free(*request);
186   *request = MPI_REQUEST_NULL;
187 }
188
189 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
190                             int dst, int tag, MPI_Comm comm)
191 {
192   MPI_Request request =
193       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
194                     comm, NON_PERSISTENT | SEND);
195
196   return request;
197 }
198
199 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
200                            int dst, int tag, MPI_Comm comm)
201 {
202   MPI_Request request =
203       smpi_isend_init(buf, count, datatype, dst, tag, comm);
204
205   smpi_mpi_start(request);
206   return request;
207 }
208
209 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
210                             int src, int tag, MPI_Comm comm)
211 {
212   MPI_Request request =
213       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
214                     comm, NON_PERSISTENT | RECV);
215
216   return request;
217 }
218
219 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
220                            int src, int tag, MPI_Comm comm)
221 {
222   MPI_Request request =
223       smpi_irecv_init(buf, count, datatype, src, tag, comm);
224
225   smpi_mpi_start(request);
226   return request;
227 }
228
229 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
230                    int tag, MPI_Comm comm, MPI_Status * status)
231 {
232   MPI_Request request;
233
234   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
235   smpi_mpi_wait(&request, status);
236 }
237
238
239
240 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
241                    int tag, MPI_Comm comm)
242 {
243           MPI_Request request;
244
245           request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
246           smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
247 }
248
249 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
250                        int dst, int sendtag, void *recvbuf, int recvcount,
251                        MPI_Datatype recvtype, int src, int recvtag,
252                        MPI_Comm comm, MPI_Status * status)
253 {
254   MPI_Request requests[2];
255   MPI_Status stats[2];
256
257   requests[0] =
258       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
259   requests[1] =
260       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
261   smpi_mpi_startall(2, requests);
262   smpi_mpi_waitall(2, requests, stats);
263   if(status != MPI_STATUS_IGNORE) {
264     // Copy receive status
265     memcpy(status, &stats[1], sizeof(MPI_Status));
266   }
267 }
268
269 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
270 {
271   return status->count / smpi_datatype_size(datatype);
272 }
273
274 static void finish_wait(MPI_Request * request, MPI_Status * status)
275 {
276   MPI_Request req = *request;
277
278   if(status != MPI_STATUS_IGNORE) {
279     status->MPI_SOURCE = req->src;
280     status->MPI_TAG = req->tag;
281     status->MPI_ERROR = MPI_SUCCESS;
282     // FIXME: really this should just contain the count of receive-type blocks,
283     // right?
284     status->count = req->size;
285   }
286   print_request("Finishing", req);
287   if(req->flags & NON_PERSISTENT) {
288     smpi_mpi_request_free(request);
289   } else {
290     req->action = NULL;
291   }
292 }
293
294 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
295 int flag;
296
297    if ((*request)->action == NULL)
298   flag = 1;
299    else 
300     flag = simcall_comm_test((*request)->action);
301    if(flag) {
302         smpi_mpi_wait(request, status);
303     }
304     return flag;
305 }
306
307 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
308                      MPI_Status * status)
309 {
310   xbt_dynar_t comms;
311   int i, flag, size;
312   int* map;
313
314   *index = MPI_UNDEFINED;
315   flag = 0;
316   if(count > 0) {
317     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
318     map = xbt_new(int, count);
319     size = 0;
320     for(i = 0; i < count; i++) {
321       if(requests[i]->action) {
322          xbt_dynar_push(comms, &requests[i]->action);
323          map[size] = i;
324          size++;
325       }
326     }
327     if(size > 0) {
328       i = simcall_comm_testany(comms);
329       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
330       if(i != MPI_UNDEFINED) {
331         *index = map[i];
332         smpi_mpi_wait(&requests[*index], status);
333         flag = 1;
334       }
335     }
336     xbt_free(map);
337     xbt_dynar_free(&comms);
338   }
339   return flag;
340 }
341
342 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
343 {
344   print_request("Waiting", *request);
345   if ((*request)->action != NULL) { // this is not a detached send
346     simcall_comm_wait((*request)->action, -1.0);
347     finish_wait(request, status);
348   }
349   // FIXME for a detached send, finish_wait is not called:
350 }
351
352 int smpi_mpi_waitany(int count, MPI_Request requests[],
353                      MPI_Status * status)
354 {
355   xbt_dynar_t comms;
356   int i, size, index;
357   int *map;
358
359   index = MPI_UNDEFINED;
360   if(count > 0) {
361     // Wait for a request to complete
362     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
363     map = xbt_new(int, count);
364     size = 0;
365     XBT_DEBUG("Wait for one of");
366     for(i = 0; i < count; i++) {
367       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
368         print_request("Waiting any ", requests[i]);
369         xbt_dynar_push(comms, &requests[i]->action);
370         map[size] = i;
371         size++;
372       }
373     }
374     if(size > 0) {
375       i = simcall_comm_waitany(comms);
376
377       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
378       if (i != MPI_UNDEFINED) {
379         index = map[i];
380         finish_wait(&requests[index], status);
381
382       }
383     }
384     xbt_free(map);
385     xbt_dynar_free(&comms);
386   }
387   return index;
388 }
389
390 void smpi_mpi_waitall(int count, MPI_Request requests[],
391                       MPI_Status status[])
392 {
393   int index, c;
394   MPI_Status stat;
395   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
396
397   for(c = 0; c < count; c++) {
398     if(MC_IS_ENABLED) {
399       smpi_mpi_wait(&requests[c], pstat);
400       index = c;
401     } else {
402       index = smpi_mpi_waitany(count, requests, pstat);
403       if(index == MPI_UNDEFINED) {
404         break;
405       }
406     }
407     if(status != MPI_STATUS_IGNORE) {
408       memcpy(&status[index], pstat, sizeof(*pstat));
409     }
410   }
411 }
412
413 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
414                       MPI_Status status[])
415 {
416   int i, count, index;
417
418   count = 0;
419   for(i = 0; i < incount; i++) {
420      if(smpi_mpi_testany(incount, requests, &index, status)) {
421        indices[count] = index;
422        count++;
423      }
424   }
425   return count;
426 }
427
428 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
429                     MPI_Comm comm)
430 {
431   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
432   nary_tree_bcast(buf, count, datatype, root, comm, 4);
433 }
434
435 void smpi_mpi_barrier(MPI_Comm comm)
436 {
437   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
438   nary_tree_barrier(comm, 4);
439 }
440
441 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
442                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
443                      int root, MPI_Comm comm)
444 {
445   int system_tag = 666;
446   int rank, size, src, index;
447   MPI_Aint lb = 0, recvext = 0;
448   MPI_Request *requests;
449
450   rank = smpi_comm_rank(comm);
451   size = smpi_comm_size(comm);
452   if(rank != root) {
453     // Send buffer to root
454     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
455   } else {
456     // FIXME: check for errors
457     smpi_datatype_extent(recvtype, &lb, &recvext);
458     // Local copy from root
459     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
460         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
461     // Receive buffers from senders
462     requests = xbt_new(MPI_Request, size - 1);
463     index = 0;
464     for(src = 0; src < size; src++) {
465       if(src != root) {
466         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
467                                           recvcount, recvtype, 
468                                           src, system_tag, comm);
469         index++;
470       }
471     }
472     // Wait for completion of irecv's.
473     smpi_mpi_startall(size - 1, requests);
474     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
475     xbt_free(requests);
476   }
477 }
478
479 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
480                       void *recvbuf, int *recvcounts, int *displs,
481                       MPI_Datatype recvtype, int root, MPI_Comm comm)
482 {
483   int system_tag = 666;
484   int rank, size, src, index;
485   MPI_Aint lb = 0, recvext = 0;
486   MPI_Request *requests;
487
488   rank = smpi_comm_rank(comm);
489   size = smpi_comm_size(comm);
490   if(rank != root) {
491     // Send buffer to root
492     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
493   } else {
494     // FIXME: check for errors
495     smpi_datatype_extent(recvtype, &lb, &recvext);
496     // Local copy from root
497     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
498                        (char *)recvbuf + displs[root] * recvext, 
499                        recvcounts[root], recvtype);
500     // Receive buffers from senders
501     requests = xbt_new(MPI_Request, size - 1);
502     index = 0;
503     for(src = 0; src < size; src++) {
504       if(src != root) {
505         requests[index] =
506             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
507                             recvcounts[src], recvtype, src, system_tag, comm);
508         index++;
509       }
510     }
511     // Wait for completion of irecv's.
512     smpi_mpi_startall(size - 1, requests);
513     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
514     xbt_free(requests);
515   }
516 }
517
518 void smpi_mpi_allgather(void *sendbuf, int sendcount,
519                         MPI_Datatype sendtype, void *recvbuf,
520                         int recvcount, MPI_Datatype recvtype,
521                         MPI_Comm comm)
522 {
523   int system_tag = 666;
524   int rank, size, other, index;
525   MPI_Aint lb = 0, recvext = 0;
526   MPI_Request *requests;
527
528   rank = smpi_comm_rank(comm);
529   size = smpi_comm_size(comm);
530   // FIXME: check for errors
531   smpi_datatype_extent(recvtype, &lb, &recvext);
532   // Local copy from self
533   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
534                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
535                      recvtype);
536   // Send/Recv buffers to/from others;
537   requests = xbt_new(MPI_Request, 2 * (size - 1));
538   index = 0;
539   for(other = 0; other < size; other++) {
540     if(other != rank) {
541       requests[index] =
542           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
543                           comm);
544       index++;
545       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
546                                         recvcount, recvtype, other, 
547                                         system_tag, comm);
548       index++;
549     }
550   }
551   // Wait for completion of all comms.
552   smpi_mpi_startall(2 * (size - 1), requests);
553   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
554   xbt_free(requests);
555 }
556
557 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
558                          MPI_Datatype sendtype, void *recvbuf,
559                          int *recvcounts, int *displs,
560                          MPI_Datatype recvtype, MPI_Comm comm)
561 {
562   int system_tag = 666;
563   int rank, size, other, index;
564   MPI_Aint lb = 0, recvext = 0;
565   MPI_Request *requests;
566
567   rank = smpi_comm_rank(comm);
568   size = smpi_comm_size(comm);
569   // FIXME: check for errors
570   smpi_datatype_extent(recvtype, &lb, &recvext);
571   // Local copy from self
572   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
573                      (char *)recvbuf + displs[rank] * recvext, 
574                      recvcounts[rank], recvtype);
575   // Send buffers to others;
576   requests = xbt_new(MPI_Request, 2 * (size - 1));
577   index = 0;
578   for(other = 0; other < size; other++) {
579     if(other != rank) {
580       requests[index] =
581           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
582                           comm);
583       index++;
584       requests[index] =
585           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
586                           recvtype, other, system_tag, comm);
587       index++;
588     }
589   }
590   // Wait for completion of all comms.
591   smpi_mpi_startall(2 * (size - 1), requests);
592   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
593   xbt_free(requests);
594 }
595
596 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
597                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
598                       int root, MPI_Comm comm)
599 {
600   int system_tag = 666;
601   int rank, size, dst, index;
602   MPI_Aint lb = 0, sendext = 0;
603   MPI_Request *requests;
604
605   rank = smpi_comm_rank(comm);
606   size = smpi_comm_size(comm);
607   if(rank != root) {
608     // Recv buffer from root
609     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
610                   MPI_STATUS_IGNORE);
611   } else {
612     // FIXME: check for errors
613     smpi_datatype_extent(sendtype, &lb, &sendext);
614     // Local copy from root
615     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
616       sendcount, sendtype, recvbuf, recvcount, recvtype);
617     // Send buffers to receivers
618     requests = xbt_new(MPI_Request, size - 1);
619     index = 0;
620     for(dst = 0; dst < size; dst++) {
621       if(dst != root) {
622         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
623                                           sendcount, sendtype, dst,
624                                           system_tag, comm);
625         index++;
626       }
627     }
628     // Wait for completion of isend's.
629     smpi_mpi_startall(size - 1, requests);
630     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
631     xbt_free(requests);
632   }
633 }
634
635 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
636                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
637                        MPI_Datatype recvtype, int root, MPI_Comm comm)
638 {
639   int system_tag = 666;
640   int rank, size, dst, index;
641   MPI_Aint lb = 0, sendext = 0;
642   MPI_Request *requests;
643
644   rank = smpi_comm_rank(comm);
645   size = smpi_comm_size(comm);
646   if(rank != root) {
647     // Recv buffer from root
648     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
649                   MPI_STATUS_IGNORE);
650   } else {
651     // FIXME: check for errors
652     smpi_datatype_extent(sendtype, &lb, &sendext);
653     // Local copy from root
654     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
655                        sendtype, recvbuf, recvcount, recvtype);
656     // Send buffers to receivers
657     requests = xbt_new(MPI_Request, size - 1);
658     index = 0;
659     for(dst = 0; dst < size; dst++) {
660       if(dst != root) {
661         requests[index] =
662             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
663                             sendtype, dst, system_tag, comm);
664         index++;
665       }
666     }
667     // Wait for completion of isend's.
668     smpi_mpi_startall(size - 1, requests);
669     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
670     xbt_free(requests);
671   }
672 }
673
674 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
675                      MPI_Datatype datatype, MPI_Op op, int root,
676                      MPI_Comm comm)
677 {
678   int system_tag = 666;
679   int rank, size, src, index;
680   MPI_Aint lb = 0, dataext = 0;
681   MPI_Request *requests;
682   void **tmpbufs;
683
684   rank = smpi_comm_rank(comm);
685   size = smpi_comm_size(comm);
686   if(rank != root) {
687     // Send buffer to root
688     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
689   } else {
690     // FIXME: check for errors
691     smpi_datatype_extent(datatype, &lb, &dataext);
692     // Local copy from root
693     smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
694     // Receive buffers from senders
695     //TODO: make a MPI_barrier here ?
696     requests = xbt_new(MPI_Request, size - 1);
697     tmpbufs = xbt_new(void *, size - 1);
698     index = 0;
699     for(src = 0; src < size; src++) {
700       if(src != root) {
701         // FIXME: possibly overkill we we have contiguous/noncontiguous data
702         //  mapping...
703         tmpbufs[index] = xbt_malloc(count * dataext);
704         requests[index] =
705             smpi_irecv_init(tmpbufs[index], count, datatype, src,
706                             system_tag, comm);
707         index++;
708       }
709     }
710     // Wait for completion of irecv's.
711     smpi_mpi_startall(size - 1, requests);
712     for(src = 0; src < size - 1; src++) {
713       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
714       XBT_VERB("finished waiting any request with index %d", index);
715       if(index == MPI_UNDEFINED) {
716         break;
717       }
718       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
719     }
720     for(index = 0; index < size - 1; index++) {
721       xbt_free(tmpbufs[index]);
722     }
723     xbt_free(tmpbufs);
724     xbt_free(requests);
725   }
726 }
727
728 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
729                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
730 {
731   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
732   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
733 }
734
735 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
736                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
737 {
738   int system_tag = 666;
739   int rank, size, other, index;
740   MPI_Aint lb = 0, dataext = 0;
741   MPI_Request *requests;
742   void **tmpbufs;
743
744   rank = smpi_comm_rank(comm);
745   size = smpi_comm_size(comm);
746
747   // FIXME: check for errors
748   smpi_datatype_extent(datatype, &lb, &dataext);
749
750   // Local copy from self
751   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
752
753   // Send/Recv buffers to/from others;
754   requests = xbt_new(MPI_Request, size - 1);
755   tmpbufs = xbt_new(void *, rank);
756   index = 0;
757   for(other = 0; other < rank; other++) {
758     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
759     // mapping...
760     tmpbufs[index] = xbt_malloc(count * dataext);
761     requests[index] =
762         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
763                         comm);
764     index++;
765   }
766   for(other = rank + 1; other < size; other++) {
767     requests[index] =
768         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
769     index++;
770   }
771   // Wait for completion of all comms.
772   smpi_mpi_startall(size - 1, requests);
773   for(other = 0; other < size - 1; other++) {
774     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
775     if(index == MPI_UNDEFINED) {
776       break;
777     }
778     if(index < rank) {
779       // #Request is below rank: it's a irecv
780       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
781     }
782   }
783   for(index = 0; index < rank; index++) {
784     xbt_free(tmpbufs[index]);
785   }
786   xbt_free(tmpbufs);
787   xbt_free(requests);
788 }