Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
0fc8226a36e1c2dc055b2b62b3196c8daee2e567
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12                                 "Logging specific to SMPI (base)");
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
21 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22
23 static int match_recv(void* a, void* b) {
24    MPI_Request ref = (MPI_Request)a;
25    MPI_Request req = (MPI_Request)b;
26
27    xbt_assert(ref, "Cannot match recv against null reference");
28    xbt_assert(req, "Cannot match recv against null request");
29    return (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
30           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
31 }
32
33 static int match_send(void* a, void* b) {
34    MPI_Request ref = (MPI_Request)a;
35    MPI_Request req = (MPI_Request)b;
36
37    xbt_assert(ref, "Cannot match send against null reference");
38    xbt_assert(req, "Cannot match send against null request");
39    return (req->src == MPI_ANY_SOURCE || req->src == ref->src)
40           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
41 }
42
43 static MPI_Request build_request(void *buf, int count,
44                                  MPI_Datatype datatype, int src, int dst,
45                                  int tag, MPI_Comm comm, unsigned flags)
46 {
47   MPI_Request request;
48
49   request = xbt_new(s_smpi_mpi_request_t, 1);
50   request->buf = buf;
51   // FIXME: this will have to be changed to support non-contiguous datatypes
52   request->size = smpi_datatype_size(datatype) * count;
53   request->src = src;
54   request->dst = dst;
55   request->tag = tag;
56   request->comm = comm;
57   request->action = NULL;
58   request->flags = flags;
59 #ifdef HAVE_TRACING
60   request->send = 0;
61   request->recv = 0;
62 #endif
63   return request;
64 }
65
66 /* MPI Low level calls */
67 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
68                                int dst, int tag, MPI_Comm comm)
69 {
70   MPI_Request request =
71       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
72                     comm, PERSISTENT | SEND);
73
74   return request;
75 }
76
77 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
78                                int src, int tag, MPI_Comm comm)
79 {
80   MPI_Request request =
81       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
82                     comm, PERSISTENT | RECV);
83
84   return request;
85 }
86
87 void smpi_mpi_start(MPI_Request request)
88 {
89   smx_rdv_t mailbox;
90   int detached = 0;
91
92   xbt_assert(!request->action,
93               "Cannot (re)start a non-finished communication");
94   if(request->flags & RECV) {
95     print_request("New recv", request);
96     mailbox = smpi_process_mailbox();
97     // FIXME: SIMIX does not yet support non-contiguous datatypes
98     request->action = SIMIX_req_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
99   } else {
100     print_request("New send", request);
101     mailbox = smpi_process_remote_mailbox(
102                           smpi_group_index(smpi_comm_group(request->comm), request->dst));
103     // FIXME: SIMIX does not yet support non-contiguous datatypes
104
105     if (request->size < 64*1024 ) { // eager mode => detached send (FIXME: this limit should be configurable)
106         void *oldbuf = request->buf;
107         detached = 1;
108         request->buf = malloc(request->size);
109         memcpy(request->buf,oldbuf,request->size);
110         XBT_DEBUG("Send request %p is detached; buf %p copied into %p",request,oldbuf,request->buf);
111     } else {
112         XBT_DEBUG("Send request %p is not detached (buf: %p)",request,request->buf);
113     }
114     request->action = 
115                 SIMIX_req_comm_isend(mailbox, request->size, -1.0,
116                                     request->buf, request->size, &match_send, request,  
117                                     // detach if msg size < eager/rdv switch limit
118                                     detached);
119
120 #ifdef HAVE_TRACING
121     SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
122 #endif
123   }
124 }
125
126 void smpi_mpi_startall(int count, MPI_Request * requests)
127 {
128           int i;
129
130   for(i = 0; i < count; i++) {
131     smpi_mpi_start(requests[i]);
132   }
133 }
134
135 void smpi_mpi_request_free(MPI_Request * request)
136 {
137   xbt_free(*request);
138   *request = MPI_REQUEST_NULL;
139 }
140
141 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
142                             int dst, int tag, MPI_Comm comm)
143 {
144   MPI_Request request =
145       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
146                     comm, NON_PERSISTENT | SEND);
147
148   return request;
149 }
150
151 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
152                            int dst, int tag, MPI_Comm comm)
153 {
154   MPI_Request request =
155       smpi_isend_init(buf, count, datatype, dst, tag, comm);
156
157   smpi_mpi_start(request);
158   return request;
159 }
160
161 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
162                             int src, int tag, MPI_Comm comm)
163 {
164   MPI_Request request =
165       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
166                     comm, NON_PERSISTENT | RECV);
167
168   return request;
169 }
170
171 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
172                            int src, int tag, MPI_Comm comm)
173 {
174   MPI_Request request =
175       smpi_irecv_init(buf, count, datatype, src, tag, comm);
176
177   smpi_mpi_start(request);
178   return request;
179 }
180
181 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
182                    int tag, MPI_Comm comm, MPI_Status * status)
183 {
184   MPI_Request request;
185
186   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
187   smpi_mpi_wait(&request, status);
188 }
189
190 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
191                    int tag, MPI_Comm comm)
192 {
193   MPI_Request request;
194
195   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
196   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
197 }
198
199 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
200                        int dst, int sendtag, void *recvbuf, int recvcount,
201                        MPI_Datatype recvtype, int src, int recvtag,
202                        MPI_Comm comm, MPI_Status * status)
203 {
204   MPI_Request requests[2];
205   MPI_Status stats[2];
206
207   requests[0] =
208       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
209   requests[1] =
210       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
211   smpi_mpi_startall(2, requests);
212   smpi_mpi_waitall(2, requests, stats);
213   if(status != MPI_STATUS_IGNORE) {
214     // Copy receive status
215     memcpy(status, &stats[1], sizeof(MPI_Status));
216   }
217 }
218
219 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
220 {
221   return status->count / smpi_datatype_size(datatype);
222 }
223
224 static void finish_wait(MPI_Request * request, MPI_Status * status)
225 {
226   MPI_Request req = *request;
227
228   if(status != MPI_STATUS_IGNORE) {
229     status->MPI_SOURCE = req->src;
230     status->MPI_TAG = req->tag;
231     status->MPI_ERROR = MPI_SUCCESS;
232     // FIXME: really this should just contain the count of receive-type blocks,
233     // right?
234     status->count = req->size;
235   }
236   print_request("Finishing", req);
237   if(req->flags & NON_PERSISTENT) {
238     smpi_mpi_request_free(request);
239   } else {
240     req->action = NULL;
241   }
242 }
243
244 int smpi_mpi_test(MPI_Request * request, MPI_Status * status) {
245 int flag;
246
247    if ((*request)->action == NULL)
248         flag = 1;
249    else 
250     flag = SIMIX_req_comm_test((*request)->action);
251    if(flag) {
252                     smpi_mpi_wait(request, status);
253           }
254           return flag;
255 }
256
257 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
258                      MPI_Status * status)
259 {
260   xbt_dynar_t comms;
261   int i, flag, size;
262   int* map;
263
264   *index = MPI_UNDEFINED;
265   flag = 0;
266   if(count > 0) {
267     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
268     map = xbt_new(int, count);
269     size = 0;
270     for(i = 0; i < count; i++) {
271       if(requests[i]->action) {
272          xbt_dynar_push(comms, &requests[i]->action);
273          map[size] = i;
274          size++;
275       }
276     }
277     if(size > 0) {
278       i = SIMIX_req_comm_testany(comms);
279       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
280       if(i != MPI_UNDEFINED) {
281         *index = map[i];
282         smpi_mpi_wait(&requests[*index], status);
283         flag = 1;
284       }
285     }
286     xbt_free(map);
287     xbt_dynar_free(&comms);
288   }
289   return flag;
290 }
291
292 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
293 {
294   print_request("Waiting", *request);
295   if ((*request)->action != NULL ) 
296          SIMIX_req_comm_wait((*request)->action, -1.0);
297   finish_wait(request, status);
298 }
299
300 int smpi_mpi_waitany(int count, MPI_Request requests[],
301                      MPI_Status * status)
302 {
303   xbt_dynar_t comms;
304   int i, size, index;
305   int *map;
306
307   index = MPI_UNDEFINED;
308   if(count > 0) {
309     // Wait for a request to complete
310     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
311     map = xbt_new(int, count);
312     size = 0;
313     XBT_DEBUG("Wait for one of");
314     for(i = 0; i < count; i++) {
315       if((requests[i] != MPI_REQUEST_NULL) && (requests[i]->action != NULL)) {
316         print_request("   ", requests[i]);
317         xbt_dynar_push(comms, &requests[i]->action);
318         map[size] = i;
319         size++;
320       }
321     }
322     if(size > 0) {
323       i = SIMIX_req_comm_waitany(comms);
324       // FIXME: MPI_UNDEFINED or does SIMIX have a return code?
325       if (i != MPI_UNDEFINED) {
326         index = map[i];
327         finish_wait(&requests[index], status);
328       }
329     }
330     xbt_free(map);
331     xbt_dynar_free(&comms);
332   }
333   return index;
334 }
335
336 void smpi_mpi_waitall(int count, MPI_Request requests[],
337                       MPI_Status status[])
338 {
339   int index, c;
340   MPI_Status stat;
341   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
342
343   for(c = 0; c < count; c++) {
344     if(MC_IS_ENABLED) {
345       smpi_mpi_wait(&requests[c], pstat);
346       index = c;
347     } else {
348       index = smpi_mpi_waitany(count, requests, pstat);
349       if(index == MPI_UNDEFINED) {
350         break;
351       }
352     }
353     if(status != MPI_STATUS_IGNORE) {
354       memcpy(&status[index], pstat, sizeof(*pstat));
355     }
356   }
357 }
358
359 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
360                       MPI_Status status[])
361 {
362   int i, count, index;
363
364   count = 0;
365   for(i = 0; i < incount; i++) {
366      if(smpi_mpi_testany(incount, requests, &index, status)) {
367        indices[count] = index;
368        count++;
369      }
370   }
371   return count;
372 }
373
374 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
375                     MPI_Comm comm)
376 {
377   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
378   nary_tree_bcast(buf, count, datatype, root, comm, 4);
379 }
380
381 void smpi_mpi_barrier(MPI_Comm comm)
382 {
383   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
384   nary_tree_barrier(comm, 4);
385 }
386
387 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
388                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
389                      int root, MPI_Comm comm)
390 {
391   int system_tag = 666;
392   int rank, size, src, index;
393   MPI_Aint lb = 0, recvext = 0;
394   MPI_Request *requests;
395
396   rank = smpi_comm_rank(comm);
397   size = smpi_comm_size(comm);
398   if(rank != root) {
399     // Send buffer to root
400     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
401   } else {
402     // FIXME: check for errors
403     smpi_datatype_extent(recvtype, &lb, &recvext);
404     // Local copy from root
405     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
406         (char *)recvbuf + root * recvcount * recvext, recvcount, recvtype);
407     // Receive buffers from senders
408     requests = xbt_new(MPI_Request, size - 1);
409     index = 0;
410     for(src = 0; src < size; src++) {
411       if(src != root) {
412         requests[index] = smpi_irecv_init((char *)recvbuf + src * recvcount * recvext, 
413                                           recvcount, recvtype, 
414                                           src, system_tag, comm);
415         index++;
416       }
417     }
418     // Wait for completion of irecv's.
419     smpi_mpi_startall(size - 1, requests);
420     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
421     xbt_free(requests);
422   }
423 }
424
425 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
426                       void *recvbuf, int *recvcounts, int *displs,
427                       MPI_Datatype recvtype, int root, MPI_Comm comm)
428 {
429   int system_tag = 666;
430   int rank, size, src, index;
431   MPI_Aint lb = 0, recvext = 0;
432   MPI_Request *requests;
433
434   rank = smpi_comm_rank(comm);
435   size = smpi_comm_size(comm);
436   if(rank != root) {
437     // Send buffer to root
438     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
439   } else {
440     // FIXME: check for errors
441     smpi_datatype_extent(recvtype, &lb, &recvext);
442     // Local copy from root
443     smpi_datatype_copy(sendbuf, sendcount, sendtype, 
444                        (char *)recvbuf + displs[root] * recvext, 
445                        recvcounts[root], recvtype);
446     // Receive buffers from senders
447     requests = xbt_new(MPI_Request, size - 1);
448     index = 0;
449     for(src = 0; src < size; src++) {
450       if(src != root) {
451         requests[index] =
452             smpi_irecv_init((char *)recvbuf + displs[src] * recvext, 
453                             recvcounts[src], recvtype, src, system_tag, comm);
454         index++;
455       }
456     }
457     // Wait for completion of irecv's.
458     smpi_mpi_startall(size - 1, requests);
459     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
460     xbt_free(requests);
461   }
462 }
463
464 void smpi_mpi_allgather(void *sendbuf, int sendcount,
465                         MPI_Datatype sendtype, void *recvbuf,
466                         int recvcount, MPI_Datatype recvtype,
467                         MPI_Comm comm)
468 {
469   int system_tag = 666;
470   int rank, size, other, index;
471   MPI_Aint lb = 0, recvext = 0;
472   MPI_Request *requests;
473
474   rank = smpi_comm_rank(comm);
475   size = smpi_comm_size(comm);
476   // FIXME: check for errors
477   smpi_datatype_extent(recvtype, &lb, &recvext);
478   // Local copy from self
479   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
480                      (char *)recvbuf + rank * recvcount * recvext, recvcount, 
481                      recvtype);
482   // Send/Recv buffers to/from others;
483   requests = xbt_new(MPI_Request, 2 * (size - 1));
484   index = 0;
485   for(other = 0; other < size; other++) {
486     if(other != rank) {
487       requests[index] =
488           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
489                           comm);
490       index++;
491       requests[index] = smpi_irecv_init((char *)recvbuf + other * recvcount * recvext, 
492                                         recvcount, recvtype, other, 
493                                         system_tag, comm);
494       index++;
495     }
496   }
497   // Wait for completion of all comms.
498   smpi_mpi_startall(2 * (size - 1), requests);
499   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
500   xbt_free(requests);
501 }
502
503 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
504                          MPI_Datatype sendtype, void *recvbuf,
505                          int *recvcounts, int *displs,
506                          MPI_Datatype recvtype, MPI_Comm comm)
507 {
508   int system_tag = 666;
509   int rank, size, other, index;
510   MPI_Aint lb = 0, recvext = 0;
511   MPI_Request *requests;
512
513   rank = smpi_comm_rank(comm);
514   size = smpi_comm_size(comm);
515   // FIXME: check for errors
516   smpi_datatype_extent(recvtype, &lb, &recvext);
517   // Local copy from self
518   smpi_datatype_copy(sendbuf, sendcount, sendtype, 
519                      (char *)recvbuf + displs[rank] * recvext, 
520                      recvcounts[rank], recvtype);
521   // Send buffers to others;
522   requests = xbt_new(MPI_Request, 2 * (size - 1));
523   index = 0;
524   for(other = 0; other < size; other++) {
525     if(other != rank) {
526       requests[index] =
527           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
528                           comm);
529       index++;
530       requests[index] =
531           smpi_irecv_init((char *)recvbuf + displs[other] * recvext, recvcounts[other],
532                           recvtype, other, system_tag, comm);
533       index++;
534     }
535   }
536   // Wait for completion of all comms.
537   smpi_mpi_startall(2 * (size - 1), requests);
538   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
539   xbt_free(requests);
540 }
541
542 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
543                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
544                       int root, MPI_Comm comm)
545 {
546   int system_tag = 666;
547   int rank, size, dst, index;
548   MPI_Aint lb = 0, sendext = 0;
549   MPI_Request *requests;
550
551   rank = smpi_comm_rank(comm);
552   size = smpi_comm_size(comm);
553   if(rank != root) {
554     // Recv buffer from root
555     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
556                   MPI_STATUS_IGNORE);
557   } else {
558     // FIXME: check for errors
559     smpi_datatype_extent(sendtype, &lb, &sendext);
560     // Local copy from root
561     smpi_datatype_copy((char *)sendbuf + root * sendcount * sendext,
562       sendcount, sendtype, recvbuf, recvcount, recvtype);
563     // Send buffers to receivers
564     requests = xbt_new(MPI_Request, size - 1);
565     index = 0;
566     for(dst = 0; dst < size; dst++) {
567       if(dst != root) {
568         requests[index] = smpi_isend_init((char *)sendbuf + dst * sendcount * sendext, 
569                                           sendcount, sendtype, dst,
570                                           system_tag, comm);
571         index++;
572       }
573     }
574     // Wait for completion of isend's.
575     smpi_mpi_startall(size - 1, requests);
576     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
577     xbt_free(requests);
578   }
579 }
580
581 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
582                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
583                        MPI_Datatype recvtype, int root, MPI_Comm comm)
584 {
585   int system_tag = 666;
586   int rank, size, dst, index;
587   MPI_Aint lb = 0, sendext = 0;
588   MPI_Request *requests;
589
590   rank = smpi_comm_rank(comm);
591   size = smpi_comm_size(comm);
592   if(rank != root) {
593     // Recv buffer from root
594     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
595                   MPI_STATUS_IGNORE);
596   } else {
597     // FIXME: check for errors
598     smpi_datatype_extent(sendtype, &lb, &sendext);
599     // Local copy from root
600     smpi_datatype_copy((char *)sendbuf + displs[root] * sendext, sendcounts[root], 
601                        sendtype, recvbuf, recvcount, recvtype);
602     // Send buffers to receivers
603     requests = xbt_new(MPI_Request, size - 1);
604     index = 0;
605     for(dst = 0; dst < size; dst++) {
606       if(dst != root) {
607         requests[index] =
608             smpi_isend_init((char *)sendbuf + displs[dst] * sendext, sendcounts[dst], 
609                             sendtype, dst, system_tag, comm);
610         index++;
611       }
612     }
613     // Wait for completion of isend's.
614     smpi_mpi_startall(size - 1, requests);
615     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
616     xbt_free(requests);
617   }
618 }
619
620 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
621                      MPI_Datatype datatype, MPI_Op op, int root,
622                      MPI_Comm comm)
623 {
624   int system_tag = 666;
625   int rank, size, src, index;
626   MPI_Aint lb = 0, dataext = 0;
627   MPI_Request *requests;
628   void **tmpbufs;
629
630   rank = smpi_comm_rank(comm);
631   size = smpi_comm_size(comm);
632   if(rank != root) {
633     // Send buffer to root
634     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
635   } else {
636     // FIXME: check for errors
637     smpi_datatype_extent(datatype, &lb, &dataext);
638     // Local copy from root
639     smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
640     // Receive buffers from senders
641     //TODO: make a MPI_barrier here ?
642     requests = xbt_new(MPI_Request, size - 1);
643     tmpbufs = xbt_new(void *, size - 1);
644     index = 0;
645     for(src = 0; src < size; src++) {
646       if(src != root) {
647         // FIXME: possibly overkill we we have contiguous/noncontiguous data
648         //  mapping...
649         tmpbufs[index] = xbt_malloc(count * dataext);
650         requests[index] =
651             smpi_irecv_init(tmpbufs[index], count, datatype, src,
652                             system_tag, comm);
653         index++;
654       }
655     }
656     // Wait for completion of irecv's.
657     smpi_mpi_startall(size - 1, requests);
658     for(src = 0; src < size - 1; src++) {
659       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
660       if(index == MPI_UNDEFINED) {
661         break;
662       }
663       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
664     }
665     for(index = 0; index < size - 1; index++) {
666       xbt_free(tmpbufs[index]);
667     }
668     xbt_free(tmpbufs);
669     xbt_free(requests);
670   }
671 }
672
673 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
674                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
675 {
676   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
677   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
678 }
679
680 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
681                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
682 {
683   int system_tag = 666;
684   int rank, size, other, index;
685   MPI_Aint lb = 0, dataext = 0;
686   MPI_Request *requests;
687   void **tmpbufs;
688
689   rank = smpi_comm_rank(comm);
690   size = smpi_comm_size(comm);
691
692   // FIXME: check for errors
693   smpi_datatype_extent(datatype, &lb, &dataext);
694
695   // Local copy from self
696   smpi_datatype_copy(sendbuf, count, datatype, recvbuf, count, datatype);
697
698   // Send/Recv buffers to/from others;
699   requests = xbt_new(MPI_Request, size - 1);
700   tmpbufs = xbt_new(void *, rank);
701   index = 0;
702   for(other = 0; other < rank; other++) {
703     // FIXME: possibly overkill we we have contiguous/noncontiguous data 
704     // mapping...
705     tmpbufs[index] = xbt_malloc(count * dataext);
706     requests[index] =
707         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
708                         comm);
709     index++;
710   }
711   for(other = rank + 1; other < size; other++) {
712     requests[index] =
713         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
714     index++;
715   }
716   // Wait for completion of all comms.
717   smpi_mpi_startall(size - 1, requests);
718   for(other = 0; other < size - 1; other++) {
719     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
720     if(index == MPI_UNDEFINED) {
721       break;
722     }
723     if(index < rank) {
724       // #Request is below rank: it's a irecv
725       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
726     }
727   }
728   for(index = 0; index < rank; index++) {
729     xbt_free(tmpbufs[index]);
730   }
731   xbt_free(tmpbufs);
732   xbt_free(requests);
733 }