Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
9b3641a07f7b0019b44b31c07cea9ac4f4352e89
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10 #include "xbt/replay.h"
11 #include <errno.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
14                                 "Logging specific to SMPI (base)");
15
16 static int match_recv(void* a, void* b, smx_action_t ignored) {
17    MPI_Request ref = (MPI_Request)a;
18    MPI_Request req = (MPI_Request)b;
19
20    xbt_assert(ref, "Cannot match recv against null reference");
21    xbt_assert(req, "Cannot match recv against null request");
22    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
23           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
24 }
25
26 static int match_send(void* a, void* b,smx_action_t ignored) {
27    MPI_Request ref = (MPI_Request)a;
28    MPI_Request req = (MPI_Request)b;
29
30    xbt_assert(ref, "Cannot match send against null reference");
31    xbt_assert(req, "Cannot match send against null request");
32    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
33           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
34 }
35
36 static MPI_Request build_request(void *buf, int count,
37                                  MPI_Datatype datatype, int src, int dst,
38                                  int tag, MPI_Comm comm, unsigned flags)
39 {
40   MPI_Request request;
41
42   request = xbt_new(s_smpi_mpi_request_t, 1);
43   request->buf = buf;
44   // FIXME: this will have to be changed to support non-contiguous datatypes
45   request->size = smpi_datatype_size(datatype) * count;
46   request->src = src;
47   request->dst = dst;
48   request->tag = tag;
49   request->comm = comm;
50   request->action = NULL;
51   request->flags = flags;
52 #ifdef HAVE_TRACING
53   request->send = 0;
54   request->recv = 0;
55 #endif
56   return request;
57 }
58
59 void smpi_action_trace_run(char *path)
60 {
61   char *name;
62   xbt_dynar_t todo;
63   xbt_dict_cursor_t cursor;
64
65   action_fp=NULL;
66   if (path) {
67     action_fp = fopen(path, "r");
68     xbt_assert(action_fp != NULL, "Cannot open %s: %s", path,
69                 strerror(errno));
70   }
71
72   if (!xbt_dict_is_empty(action_queues)) {
73     XBT_WARN
74         ("Not all actions got consumed. If the simulation ended successfully (without deadlock), you may want to add new processes to your deployment file.");
75
76
77     xbt_dict_foreach(action_queues, cursor, name, todo) {
78       XBT_WARN("Still %lu actions for %s", xbt_dynar_length(todo), name);
79     }
80   }
81
82   if (path)
83     fclose(action_fp);
84   xbt_dict_free(&action_queues);
85   action_queues = xbt_dict_new_homogeneous(NULL);
86 }
87
88 static void smpi_mpi_request_free_voidp(void* request)
89 {
90   MPI_Request req = request;
91   smpi_mpi_request_free(&req);
92 }
93
94 /* MPI Low level calls */
95 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
96                                int dst, int tag, MPI_Comm comm)
97 {
98   MPI_Request request =
99       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
100                     comm, PERSISTENT | SEND);
101
102   return request;
103 }
104
105 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
106                                int src, int tag, MPI_Comm comm)
107 {
108   MPI_Request request =
109       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
110                     comm, PERSISTENT | RECV);
111
112   return request;
113 }
114
115 void smpi_mpi_start(MPI_Request request)
116 {
117   smx_rdv_t mailbox;
118   int detached = 0;
119
120   xbt_assert(!request->action,
121               "Cannot (re)start a non-finished communication");
122   if(request->flags & RECV) {
123     print_request("New recv", request);
124     mailbox = smpi_process_mailbox();
125     // FIXME: SIMIX does not yet support non-contiguous datatypes
126     request->action = simcall_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
127   } else {
128     print_request("New send", request);
129     mailbox = smpi_process_remote_mailbox(
130         smpi_group_index(smpi_comm_group(request->comm), request->dst));
131     // FIXME: SIMIX does not yet support non-contiguous datatypes
132
133     if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
134       void *oldbuf = request->buf;
135       detached = 1;
136       request->buf = malloc(request->size);
137       memcpy(request->buf,oldbuf,request->size);
138       XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
139     } else {
140       XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
141     }
142     request->action = 
143     simcall_comm_isend(mailbox, request->size, -1.0,
144             request->buf, request->size,
145             &match_send,
146             &smpi_mpi_request_free_voidp, // how to free the userdata if a detached send fails
147             request,
148             // detach if msg size < eager/rdv switch limit
149             detached);
150
151 #ifdef HAVE_TRACING
152     /* FIXME: detached sends are not traceable (request->action == NULL) */
153     if (request->action)
154       simcall_set_category(request->action, TRACE_internal_smpi_get_category());
155 #endif
156   }
157 }
158
159 void smpi_mpi_startall(int count, MPI_Request * requests)
160 {
161     int i;
162
163   for(i = 0; i < count; i++) {
164     smpi_mpi_start(requests[i]);
165   }
166 }
167
168 void smpi_mpi_request_free(MPI_Request * request)
169 {
170   xbt_free(*request);
171   *request = MPI_REQUEST_NULL;
172 }
173
174 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
175                             int dst, int tag, MPI_Comm comm)
176 {
177   MPI_Request request =
178       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
179                     comm, NON_PERSISTENT | SEND);
180
181   return request;
182 }
183
184 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
185                            int dst, int tag, MPI_Comm comm)
186 {
187   MPI_Request request =
188       smpi_isend_init(buf, count, datatype, dst, tag, comm);
189
190   smpi_mpi_start(request);
191   return request;
192 }
193
194 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
195                             int src, int tag, MPI_Comm comm)
196 {
197   MPI_Request request =
198       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
199                     comm, NON_PERSISTENT | RECV);
200
201   return request;
202 }
203
204 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
205                            int src, int tag, MPI_Comm comm)
206 {
207   MPI_Request request =
208       smpi_irecv_init(buf, count, datatype, src, tag, comm);
209
210   smpi_mpi_start(request);
211   return request;
212 }
213
214 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
215                    int tag, MPI_Comm comm, MPI_Status * status)
216 {
217   MPI_Request request;
218
219   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
220   smpi_mpi_wait(&request, status);
221 }
222
223 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
224                    int tag, MPI_Comm comm)
225 {
226   MPI_Request request;
227
228   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
229   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
230 }
231
232 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
233                        int dst, int sendtag, void *recvbuf, int recvcount,
234                        MPI_Datatype recvtype, int src, int recvtag,
235                        MPI_Comm comm, MPI_Status * status)
236 {
237   MPI_Request requests[2];
238   MPI_Status stats[2];
239
240   requests[0] =
241       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
242   requests[1] =
243       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
244   smpi_mpi_startall(2, requests);
245   smpi_mpi_waitall(2, requests, stats);
246   if(status != MPI_STATUS_IGNORE) {
247     // Copy receive status
248     memcpy(status, &stats[1], sizeof(MPI_Status));
249   }
250 }
251
252 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
253 {
254   return status->count / smpi_datatype_size(datatype);
255 }
256
257 static void finish_wait(MPI_Request * request, MPI_Status * status)
258 {
259   MPI_Request req = *request;
260
261   if(status != MPI_STATUS_IGNORE) {
262     status->MPI_SOURCE = req->src;
263     status->MPI_TAG = req->tag;
264     status->MPI_ERROR = MPI_SUCCESS;
265     // FIXME: really this should just contain the count of receive-type blocks,
266     // right?
267     status->count = req->size;
268   }
269   print_request("Finishing", req);
270   if(req->flags & NON_PERSISTENT) {
271     smpi_mpi_request_free(request);
272   } else {
273     req->action = NULL;
274   }
275 }
276
277 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
278 int flag;
279
280    if ((*request)->action == NULL)
281   flag = 1;
282    else 
283     flag = simcall_comm_test((*request)->action);
284    if(flag) {
285         smpi_mpi_wait(request, status);
286     }
287     return flag;
288 }
289
290 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
291                      MPI_Status * status)
292 {
293   xbt_dynar_t comms;
294   int i, flag, size;
295   int* map;
296
297   *index = MPI_UNDEFINED;
298   flag = 0;
299   if(count > 0) {
300     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
301     map = xbt_new(int, count);
302     size = 0;
303     for(i = 0; i < count; i++) {
304       if(requests[i]->action) {
305          xbt_dynar_push(comms, &requests[i]->action);
306          map[size] = i;
307          size++;
308       }
309     }
310     if(size > 0) {
311       i = simcall_comm_testany(comms);
312       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
313       if(i != MPI_UNDEFINED) {
314         *index = map[i];
315         smpi_mpi_wait(&requests[*index], status);
316         flag = 1;
317       }
318     }
319     xbt_free(map);
320     xbt_dynar_free(&comms);
321   }
322   return flag;
323 }
324
325 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
326 {
327   print_request("Waiting", *request);
328   if ((*request)->action != NULL) { // this is not a detached send
329     simcall_comm_wait((*request)->action, -1.0);
330     finish_wait(request, status);
331   }
332   // FIXME for a detached send, finish_wait is not called:
333 }
334
335 int smpi_mpi_waitany(int count, MPI_Request requests[],
336                      MPI_Status * status)
337 {
338   xbt_dynar_t comms;
339   int i, size, index;
340   int *map;
341
342   index = MPI_UNDEFINED;
343   if(count > 0) {
344     // Wait for a request to complete
345     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
346     map = xbt_new(int, count);
347     size = 0;
348     XBT_DEBUG("Wait for one of");
349     for(i = 0; i < count; i++) {
350       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
351         print_request("   ", requests[i]);
352         xbt_dynar_push(comms, &requests[i]->action);
353         map[size] = i;
354         size++;
355       }
356     }
357     if(size > 0) {
358       i = simcall_comm_waitany(comms);
359       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
360       if (i != MPI_UNDEFINED) {
361         index = map[i];
362         finish_wait(&requests[index], status);
363       }
364     }
365     xbt_free(map);
366     xbt_dynar_free(&comms);
367   }
368   return index;
369 }
370
371 void smpi_mpi_waitall(int count, MPI_Request requests[],
372                       MPI_Status status[])
373 {
374   int index, c;
375   MPI_Status stat;
376   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
377
378   for(c = 0; c < count; c++) {
379     if(MC_IS_ENABLED) {
380       smpi_mpi_wait(&requests[c], pstat);
381       index = c;
382     } else {
383       index = smpi_mpi_waitany(count, requests, pstat);
384       if(index == MPI_UNDEFINED) {
385         break;
386       }
387     }
388     if(status != MPI_STATUS_IGNORE) {
389       memcpy(&status[index], pstat, sizeof(*pstat));
390     }
391   }
392 }
393
394 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
395                       MPI_Status status[])
396 {
397   int i, count, index;
398
399   count = 0;
400   for(i = 0; i < incount; i++) {
401      if(smpi_mpi_testany(incount, requests, &index, status)) {
402        indices[count] = index;
403        count++;
404      }
405   }
406   return count;
407 }
408
409 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
410                     MPI_Comm comm)
411 {
412   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
413   nary_tree_bcast(buf, count, datatype, root, comm, 4);
414 }
415
416 void smpi_mpi_barrier(MPI_Comm comm)
417 {
418   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
419   nary_tree_barrier(comm, 4);
420 }
421
422 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
423                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
424                      int root, MPI_Comm comm)
425 {
426   int system_tag = 666;
427   int rank, size, src, index;
428   MPI_Aint lb = 0, recvext = 0;
429   MPI_Request *requests;
430
431   rank = smpi_comm_rank(comm);
432   size = smpi_comm_size(comm);
433   if(rank != root) {
434     // Send buffer to root
435     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
436   } else {
437     // FIXME: check for errors
438     smpi_datatype_extent(recvtype, &lb, &recvext);
439     // Local copy from root
440     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
441         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
442     // Receive buffers from senders
443     requests = xbt_new(MPI_Request, size - 1);
444     index = 0;
445     for(src = 0; src < size; src++) {
446       if(src != root) {
447         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
448                                           recvcount, recvtype, 
449                                           src, system_tag, comm);
450         index++;
451       }
452     }
453     // Wait for completion of irecv's.
454     smpi_mpi_startall(size - 1, requests);
455     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
456     xbt_free(requests);
457   }
458 }
459
460 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
461                       void *recvbuf, int *recvcounts, int *displs,
462                       MPI_Datatype recvtype, int root, MPI_Comm comm)
463 {
464   int system_tag = 666;
465   int rank, size, src, index;
466   MPI_Aint lb = 0, recvext = 0;
467   MPI_Request *requests;
468
469   rank = smpi_comm_rank(comm);
470   size = smpi_comm_size(comm);
471   if(rank != root) {
472     // Send buffer to root
473     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
474   } else {
475     // FIXME: check for errors
476     smpi_datatype_extent(recvtype, &lb, &recvext);
477     // Local copy from root
478     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
479                        (char *)recvbuf + displs[root] * recvext, 
480                        recvcounts[root], recvtype);
481     // Receive buffers from senders
482     requests = xbt_new(MPI_Request, size - 1);
483     index = 0;
484     for(src = 0; src < size; src++) {
485       if(src != root) {
486         requests[index] =
487             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
488                             recvcounts[src], recvtype, src, system_tag, comm);
489         index++;
490       }
491     }
492     // Wait for completion of irecv's.
493     smpi_mpi_startall(size - 1, requests);
494     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
495     xbt_free(requests);
496   }
497 }
498
499 void smpi_mpi_allgather(void *sendbuf, int sendcount,
500                         MPI_Datatype sendtype, void *recvbuf,
501                         int recvcount, MPI_Datatype recvtype,
502                         MPI_Comm comm)
503 {
504   int system_tag = 666;
505   int rank, size, other, index;
506   MPI_Aint lb = 0, recvext = 0;
507   MPI_Request *requests;
508
509   rank = smpi_comm_rank(comm);
510   size = smpi_comm_size(comm);
511   // FIXME: check for errors
512   smpi_datatype_extent(recvtype, &lb, &recvext);
513   // Local copy from self
514   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
515                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
516                      recvtype);
517   // Send/Recv buffers to/from others;
518   requests = xbt_new(MPI_Request, 2 * (size - 1));
519   index = 0;
520   for(other = 0; other < size; other++) {
521     if(other != rank) {
522       requests[index] =
523           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
524                           comm);
525       index++;
526       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
527                                         recvcount, recvtype, other, 
528                                         system_tag, comm);
529       index++;
530     }
531   }
532   // Wait for completion of all comms.
533   smpi_mpi_startall(2 * (size - 1), requests);
534   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
535   xbt_free(requests);
536 }
537
538 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
539                          MPI_Datatype sendtype, void *recvbuf,
540                          int *recvcounts, int *displs,
541                          MPI_Datatype recvtype, MPI_Comm comm)
542 {
543   int system_tag = 666;
544   int rank, size, other, index;
545   MPI_Aint lb = 0, recvext = 0;
546   MPI_Request *requests;
547
548   rank = smpi_comm_rank(comm);
549   size = smpi_comm_size(comm);
550   // FIXME: check for errors
551   smpi_datatype_extent(recvtype, &lb, &recvext);
552   // Local copy from self
553   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
554                      (char *)recvbuf + displs[rank] * recvext, 
555                      recvcounts[rank], recvtype);
556   // Send buffers to others;
557   requests = xbt_new(MPI_Request, 2 * (size - 1));
558   index = 0;
559   for(other = 0; other < size; other++) {
560     if(other != rank) {
561       requests[index] =
562           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
563                           comm);
564       index++;
565       requests[index] =
566           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
567                           recvtype, other, system_tag, comm);
568       index++;
569     }
570   }
571   // Wait for completion of all comms.
572   smpi_mpi_startall(2 * (size - 1), requests);
573   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
574   xbt_free(requests);
575 }
576
577 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
578                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
579                       int root, MPI_Comm comm)
580 {
581   int system_tag = 666;
582   int rank, size, dst, index;
583   MPI_Aint lb = 0, sendext = 0;
584   MPI_Request *requests;
585
586   rank = smpi_comm_rank(comm);
587   size = smpi_comm_size(comm);
588   if(rank != root) {
589     // Recv buffer from root
590     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
591                   MPI_STATUS_IGNORE);
592   } else {
593     // FIXME: check for errors
594     smpi_datatype_extent(sendtype, &lb, &sendext);
595     // Local copy from root
596     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
597       sendcount, sendtype, recvbuf, recvcount, recvtype);
598     // Send buffers to receivers
599     requests = xbt_new(MPI_Request, size - 1);
600     index = 0;
601     for(dst = 0; dst < size; dst++) {
602       if(dst != root) {
603         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
604                                           sendcount, sendtype, dst,
605                                           system_tag, comm);
606         index++;
607       }
608     }
609     // Wait for completion of isend's.
610     smpi_mpi_startall(size - 1, requests);
611     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
612     xbt_free(requests);
613   }
614 }
615
616 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
617                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
618                        MPI_Datatype recvtype, int root, MPI_Comm comm)
619 {
620   int system_tag = 666;
621   int rank, size, dst, index;
622   MPI_Aint lb = 0, sendext = 0;
623   MPI_Request *requests;
624
625   rank = smpi_comm_rank(comm);
626   size = smpi_comm_size(comm);
627   if(rank != root) {
628     // Recv buffer from root
629     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
630                   MPI_STATUS_IGNORE);
631   } else {
632     // FIXME: check for errors
633     smpi_datatype_extent(sendtype, &lb, &sendext);
634     // Local copy from root
635     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
636                        sendtype, recvbuf, recvcount, recvtype);
637     // Send buffers to receivers
638     requests = xbt_new(MPI_Request, size - 1);
639     index = 0;
640     for(dst = 0; dst < size; dst++) {
641       if(dst != root) {
642         requests[index] =
643             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
644                             sendtype, dst, system_tag, comm);
645         index++;
646       }
647     }
648     // Wait for completion of isend's.
649     smpi_mpi_startall(size - 1, requests);
650     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
651     xbt_free(requests);
652   }
653 }
654
655 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
656                      MPI_Datatype datatype, MPI_Op op, int root,
657                      MPI_Comm comm)
658 {
659   int system_tag = 666;
660   int rank, size, src, index;
661   MPI_Aint lb = 0, dataext = 0;
662   MPI_Request *requests;
663   void **tmpbufs;
664
665   rank = smpi_comm_rank(comm);
666   size = smpi_comm_size(comm);
667   if(rank != root) {
668     // Send buffer to root
669     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
670   } else {
671     // FIXME: check for errors
672     smpi_datatype_extent(datatype, &lb, &dataext);
673     // Local copy from root
674     smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
675     // Receive buffers from senders
676     //TODO: make a MPI_barrier here ?
677     requests = xbt_new(MPI_Request, size - 1);
678     tmpbufs = xbt_new(void *, size - 1);
679     index = 0;
680     for(src = 0; src < size; src++) {
681       if(src != root) {
682         // FIXME: possibly overkill we we have contiguous/noncontiguous data
683         //  mapping...
684         tmpbufs[index] = xbt_malloc(count * dataext);
685         requests[index] =
686             smpi_irecv_init(tmpbufs[index], count, datatype, src,
687                             system_tag, comm);
688         index++;
689       }
690     }
691     // Wait for completion of irecv's.
692     smpi_mpi_startall(size - 1, requests);
693     for(src = 0; src < size - 1; src++) {
694       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
695       if(index == MPI_UNDEFINED) {
696         break;
697       }
698       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
699     }
700     for(index = 0; index < size - 1; index++) {
701       xbt_free(tmpbufs[index]);
702     }
703     xbt_free(tmpbufs);
704     xbt_free(requests);
705   }
706 }
707
708 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
709                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
710 {
711   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
712   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
713 }
714
715 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
716                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
717 {
718   int system_tag = 666;
719   int rank, size, other, index;
720   MPI_Aint lb = 0, dataext = 0;
721   MPI_Request *requests;
722   void **tmpbufs;
723
724   rank = smpi_comm_rank(comm);
725   size = smpi_comm_size(comm);
726
727   // FIXME: check for errors
728   smpi_datatype_extent(datatype, &lb, &dataext);
729
730   // Local copy from self
731   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
732
733   // Send/Recv buffers to/from others;
734   requests = xbt_new(MPI_Request, size - 1);
735   tmpbufs = xbt_new(void *, rank);
736   index = 0;
737   for(other = 0; other < rank; other++) {
738     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
739     // mapping...
740     tmpbufs[index] = xbt_malloc(count * dataext);
741     requests[index] =
742         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
743                         comm);
744     index++;
745   }
746   for(other = rank + 1; other < size; other++) {
747     requests[index] =
748         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
749     index++;
750   }
751   // Wait for completion of all comms.
752   smpi_mpi_startall(size - 1, requests);
753   for(other = 0; other < size - 1; other++) {
754     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
755     if(index == MPI_UNDEFINED) {
756       break;
757     }
758     if(index < rank) {
759       // #Request is below rank: it's a irecv
760       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
761     }
762   }
763   for(index = 0; index < rank; index++) {
764     xbt_free(tmpbufs[index]);
765   }
766   xbt_free(tmpbufs);
767   xbt_free(requests);
768 }