Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use a linear sweep waitall when MC is enabled.
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12                                 "Logging specific to SMPI (base)");
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
21 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22
23 static int match_recv(void* a, void* b) {
24    MPI_Request ref = (MPI_Request)a;
25    MPI_Request req = (MPI_Request)b;
26
27    xbt_assert0(ref, "Cannot match recv against null reference");
28    xbt_assert0(req, "Cannot match recv against null request");
29    return req->comm == ref->comm
30           && (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
31           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
32 }
33
34 static int match_send(void* a, void* b) {
35    MPI_Request ref = (MPI_Request)a;
36    MPI_Request req = (MPI_Request)b;
37
38    xbt_assert0(ref, "Cannot match send against null reference");
39    xbt_assert0(req, "Cannot match send against null request");
40    return req->comm == ref->comm
41           && (req->src == MPI_ANY_SOURCE || req->src == ref->src)
42           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
43 }
44
45 static MPI_Request build_request(void *buf, int count,
46                                  MPI_Datatype datatype, int src, int dst,
47                                  int tag, MPI_Comm comm, unsigned flags)
48 {
49   MPI_Request request;
50
51   request = xbt_new(s_smpi_mpi_request_t, 1);
52   request->buf = buf;
53   request->size = smpi_datatype_size(datatype) * count;
54   request->src = src;
55   request->dst = dst;
56   request->tag = tag;
57   request->comm = comm;
58   request->action = NULL;
59   request->flags = flags;
60 #ifdef HAVE_TRACING
61   request->send = 0;
62   request->recv = 0;
63 #endif
64   return request;
65 }
66
67 /* MPI Low level calls */
68 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
69                                int dst, int tag, MPI_Comm comm)
70 {
71   MPI_Request request =
72       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
73                     comm, PERSISTENT | SEND);
74
75   return request;
76 }
77
78 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
79                                int src, int tag, MPI_Comm comm)
80 {
81   MPI_Request request =
82       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
83                     comm, PERSISTENT | RECV);
84
85   return request;
86 }
87
88 void smpi_mpi_start(MPI_Request request)
89 {
90   smx_rdv_t mailbox;
91
92   xbt_assert0(!request->action,
93               "Cannot (re)start a non-finished communication");
94   if(request->flags & RECV) {
95     print_request("New recv", request);
96     mailbox = smpi_process_mailbox();
97     request->action = SIMIX_req_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
98   } else {
99     print_request("New send", request);
100     mailbox = smpi_process_remote_mailbox(request->dst);
101     request->action = SIMIX_req_comm_isend(mailbox, request->size, -1.0,
102                                            request->buf, request->size, &match_send, request);
103 #ifdef HAVE_TRACING
104     SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
105 #endif
106   }
107 }
108
109 void smpi_mpi_startall(int count, MPI_Request * requests)
110 {
111   int i;
112
113   for(i = 0; i < count; i++) {
114     smpi_mpi_start(requests[i]);
115   }
116 }
117
118 void smpi_mpi_request_free(MPI_Request * request)
119 {
120   xbt_free(*request);
121   *request = MPI_REQUEST_NULL;
122 }
123
124 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
125                             int dst, int tag, MPI_Comm comm)
126 {
127   MPI_Request request =
128       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
129                     comm, NON_PERSISTENT | SEND);
130
131   return request;
132 }
133
134 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
135                            int dst, int tag, MPI_Comm comm)
136 {
137   MPI_Request request =
138       smpi_isend_init(buf, count, datatype, dst, tag, comm);
139
140   smpi_mpi_start(request);
141   return request;
142 }
143
144 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
145                             int src, int tag, MPI_Comm comm)
146 {
147   MPI_Request request =
148       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
149                     comm, NON_PERSISTENT | RECV);
150
151   return request;
152 }
153
154 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
155                            int src, int tag, MPI_Comm comm)
156 {
157   MPI_Request request =
158       smpi_irecv_init(buf, count, datatype, src, tag, comm);
159
160   smpi_mpi_start(request);
161   return request;
162 }
163
164 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
165                    int tag, MPI_Comm comm, MPI_Status * status)
166 {
167   MPI_Request request;
168
169   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
170   smpi_mpi_wait(&request, status);
171 }
172
173 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
174                    int tag, MPI_Comm comm)
175 {
176   MPI_Request request;
177
178   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
179   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
180 }
181
182 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
183                        int dst, int sendtag, void *recvbuf, int recvcount,
184                        MPI_Datatype recvtype, int src, int recvtag,
185                        MPI_Comm comm, MPI_Status * status)
186 {
187   MPI_Request requests[2];
188   MPI_Status stats[2];
189
190   requests[0] =
191       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
192   requests[1] =
193       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
194   smpi_mpi_startall(2, requests);
195   smpi_mpi_waitall(2, requests, stats);
196   if(status != MPI_STATUS_IGNORE) {
197     // Copy receive status
198     memcpy(status, &stats[1], sizeof(MPI_Status));
199   }
200 }
201
202 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
203 {
204   return status->count / smpi_datatype_size(datatype);
205 }
206
207 static void finish_wait(MPI_Request * request, MPI_Status * status)
208 {
209   MPI_Request req = *request;
210
211   if(status != MPI_STATUS_IGNORE) {
212     status->MPI_SOURCE = req->src;
213     status->MPI_TAG = req->tag;
214     status->MPI_ERROR = MPI_SUCCESS;
215     status->count = SIMIX_req_comm_get_dst_buff_size(req->action);
216   }
217   SIMIX_req_comm_destroy(req->action);
218   print_request("Finishing", req);
219   if(req->flags & NON_PERSISTENT) {
220     smpi_mpi_request_free(request);
221   } else {
222     req->action = NULL;
223   }
224 }
225
226 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
227 {
228    int flag = SIMIX_req_comm_test((*request)->action);
229
230    if(flag) {
231       smpi_mpi_wait(request, status);
232    }
233    return flag;
234 }
235
236 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
237                      MPI_Status * status)
238 {
239   xbt_dynar_t comms;
240   int i, flag, size;
241   int* map;
242
243   *index = MPI_UNDEFINED;
244   flag = 0;
245   if(count > 0) {
246     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
247     map = xbt_new(int, count);
248     size = 0;
249     for(i = 0; i < count; i++) {
250       if(requests[i]->action) {
251          xbt_dynar_push(comms, &requests[i]->action);
252          map[size] = i;
253          size++;
254       }
255     }
256     if(size > 0) {
257       *index = SIMIX_req_comm_testany(comms);
258       *index = map[*index];
259       if(*index != MPI_UNDEFINED) {
260         smpi_mpi_wait(&requests[*index], status);
261         flag = 1;
262       }
263     }
264     xbt_free(map);
265     xbt_dynar_free(&comms);
266   }
267   return flag;
268 }
269
270 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
271 {
272   print_request("Waiting", *request);
273   SIMIX_req_comm_wait((*request)->action, -1.0);
274   finish_wait(request, status);
275 }
276
277 int smpi_mpi_waitany(int count, MPI_Request requests[],
278                      MPI_Status * status)
279 {
280   xbt_dynar_t comms;
281   int i, size, index;
282   int *map;
283
284   index = MPI_UNDEFINED;
285   if(count > 0) {
286     // Wait for a request to complete
287     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
288     map = xbt_new(int, count);
289     size = 0;
290     DEBUG0("Wait for one of");
291     for(i = 0; i < count; i++) {
292       if(requests[i] != MPI_REQUEST_NULL) {
293         print_request("   ", requests[i]);
294         xbt_dynar_push(comms, &requests[i]->action);
295         map[size] = i;
296         size++;
297       }
298     }
299     if(size > 0) {
300       index = SIMIX_req_comm_waitany(comms);
301       index = map[index];
302       finish_wait(&requests[index], status);
303     }
304     xbt_free(map);
305     xbt_dynar_free(&comms);
306   }
307   return index;
308 }
309
310 void smpi_mpi_waitall(int count, MPI_Request requests[],
311                       MPI_Status status[])
312 {
313   int index, c;
314   MPI_Status stat;
315   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
316
317   for(c = 0; c < count; c++) {
318     if(MC_IS_ENABLED) {
319       smpi_mpi_wait(&requests[c], pstat);
320       index = c;
321     } else {
322       index = smpi_mpi_waitany(count, requests, pstat);
323       if(index == MPI_UNDEFINED) {
324         break;
325       }
326     }
327     if(status != MPI_STATUS_IGNORE) {
328       memcpy(&status[index], pstat, sizeof(*pstat));
329     }
330   }
331 }
332
333 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
334                       MPI_Status status[])
335 {
336   int i, count, index;
337
338   count = 0;
339   for(i = 0; i < incount; i++) {
340      if(smpi_mpi_testany(incount, requests, &index, status)) {
341        indices[count] = index;
342        count++;
343      }
344   }
345   return count;
346 }
347
348 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
349                     MPI_Comm comm)
350 {
351   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
352   nary_tree_bcast(buf, count, datatype, root, comm, 4);
353 }
354
355 void smpi_mpi_barrier(MPI_Comm comm)
356 {
357   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
358   nary_tree_barrier(comm, 4);
359 }
360
361 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
362                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
363                      int root, MPI_Comm comm)
364 {
365   int system_tag = 666;
366   int rank, size, src, index, sendsize, recvsize;
367   MPI_Request *requests;
368
369   rank = smpi_comm_rank(comm);
370   size = smpi_comm_size(comm);
371   if(rank != root) {
372     // Send buffer to root
373     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
374   } else {
375     sendsize = smpi_datatype_size(sendtype);
376     recvsize = smpi_datatype_size(recvtype);
377     // Local copy from root
378     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
379            sendcount * sendsize * sizeof(char));
380     // Receive buffers from senders
381     requests = xbt_new(MPI_Request, size - 1);
382     index = 0;
383     for(src = 0; src < size; src++) {
384       if(src != root) {
385         requests[index] = smpi_irecv_init(&((char *) recvbuf)
386                                           [src * recvcount * recvsize],
387                                           recvcount, recvtype, src,
388                                           system_tag, comm);
389         index++;
390       }
391     }
392     // Wait for completion of irecv's.
393     smpi_mpi_startall(size - 1, requests);
394     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
395     xbt_free(requests);
396   }
397 }
398
399 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
400                       void *recvbuf, int *recvcounts, int *displs,
401                       MPI_Datatype recvtype, int root, MPI_Comm comm)
402 {
403   int system_tag = 666;
404   int rank, size, src, index, sendsize;
405   MPI_Request *requests;
406
407   rank = smpi_comm_rank(comm);
408   size = smpi_comm_size(comm);
409   if(rank != root) {
410     // Send buffer to root
411     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
412   } else {
413     sendsize = smpi_datatype_size(sendtype);
414     // Local copy from root
415     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
416            sendcount * sendsize * sizeof(char));
417     // Receive buffers from senders
418     requests = xbt_new(MPI_Request, size - 1);
419     index = 0;
420     for(src = 0; src < size; src++) {
421       if(src != root) {
422         requests[index] =
423             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
424                             recvcounts[src], recvtype, src, system_tag,
425                             comm);
426         index++;
427       }
428     }
429     // Wait for completion of irecv's.
430     smpi_mpi_startall(size - 1, requests);
431     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
432     xbt_free(requests);
433   }
434 }
435
436 void smpi_mpi_allgather(void *sendbuf, int sendcount,
437                         MPI_Datatype sendtype, void *recvbuf,
438                         int recvcount, MPI_Datatype recvtype,
439                         MPI_Comm comm)
440 {
441   int system_tag = 666;
442   int rank, size, other, index, sendsize, recvsize;
443   MPI_Request *requests;
444
445   rank = smpi_comm_rank(comm);
446   size = smpi_comm_size(comm);
447   sendsize = smpi_datatype_size(sendtype);
448   recvsize = smpi_datatype_size(recvtype);
449   // Local copy from self
450   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
451          sendcount * sendsize * sizeof(char));
452   // Send/Recv buffers to/from others;
453   requests = xbt_new(MPI_Request, 2 * (size - 1));
454   index = 0;
455   for(other = 0; other < size; other++) {
456     if(other != rank) {
457       requests[index] =
458           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
459                           comm);
460       index++;
461       requests[index] = smpi_irecv_init(&((char *) recvbuf)
462                                         [other * recvcount * recvsize],
463                                         recvcount, recvtype, other,
464                                         system_tag, comm);
465       index++;
466     }
467   }
468   // Wait for completion of all comms.
469   smpi_mpi_startall(2 * (size - 1), requests);
470   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
471   xbt_free(requests);
472 }
473
474 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
475                          MPI_Datatype sendtype, void *recvbuf,
476                          int *recvcounts, int *displs,
477                          MPI_Datatype recvtype, MPI_Comm comm)
478 {
479   int system_tag = 666;
480   int rank, size, other, index, sendsize, recvsize;
481   MPI_Request *requests;
482
483   rank = smpi_comm_rank(comm);
484   size = smpi_comm_size(comm);
485   sendsize = smpi_datatype_size(sendtype);
486   recvsize = smpi_datatype_size(recvtype);
487   // Local copy from self
488   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
489          sendcount * sendsize * sizeof(char));
490   // Send buffers to others;
491   requests = xbt_new(MPI_Request, 2 * (size - 1));
492   index = 0;
493   for(other = 0; other < size; other++) {
494     if(other != rank) {
495       requests[index] =
496           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
497                           comm);
498       index++;
499       requests[index] =
500           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
501                           recvcounts[other], recvtype, other, system_tag,
502                           comm);
503       index++;
504     }
505   }
506   // Wait for completion of all comms.
507   smpi_mpi_startall(2 * (size - 1), requests);
508   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
509   xbt_free(requests);
510 }
511
512 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
513                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
514                       int root, MPI_Comm comm)
515 {
516   int system_tag = 666;
517   int rank, size, dst, index, sendsize, recvsize;
518   MPI_Request *requests;
519
520   rank = smpi_comm_rank(comm);
521   size = smpi_comm_size(comm);
522   if(rank != root) {
523     // Recv buffer from root
524     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
525                   MPI_STATUS_IGNORE);
526   } else {
527     sendsize = smpi_datatype_size(sendtype);
528     recvsize = smpi_datatype_size(recvtype);
529     // Local copy from root
530     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
531            recvcount * recvsize * sizeof(char));
532     // Send buffers to receivers
533     requests = xbt_new(MPI_Request, size - 1);
534     index = 0;
535     for(dst = 0; dst < size; dst++) {
536       if(dst != root) {
537         requests[index] = smpi_isend_init(&((char *) sendbuf)
538                                           [dst * sendcount * sendsize],
539                                           sendcount, sendtype, dst,
540                                           system_tag, comm);
541         index++;
542       }
543     }
544     // Wait for completion of isend's.
545     smpi_mpi_startall(size - 1, requests);
546     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
547     xbt_free(requests);
548   }
549 }
550
551 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
552                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
553                        MPI_Datatype recvtype, int root, MPI_Comm comm)
554 {
555   int system_tag = 666;
556   int rank, size, dst, index, sendsize, recvsize;
557   MPI_Request *requests;
558
559   rank = smpi_comm_rank(comm);
560   size = smpi_comm_size(comm);
561   if(rank != root) {
562     // Recv buffer from root
563     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
564                   MPI_STATUS_IGNORE);
565   } else {
566     sendsize = smpi_datatype_size(sendtype);
567     recvsize = smpi_datatype_size(recvtype);
568     // Local copy from root
569     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
570            recvcount * recvsize * sizeof(char));
571     // Send buffers to receivers
572     requests = xbt_new(MPI_Request, size - 1);
573     index = 0;
574     for(dst = 0; dst < size; dst++) {
575       if(dst != root) {
576         requests[index] =
577             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
578                             sendcounts[dst], sendtype, dst, system_tag,
579                             comm);
580         index++;
581       }
582     }
583     // Wait for completion of isend's.
584     smpi_mpi_startall(size - 1, requests);
585     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
586     xbt_free(requests);
587   }
588 }
589
590 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
591                      MPI_Datatype datatype, MPI_Op op, int root,
592                      MPI_Comm comm)
593 {
594   int system_tag = 666;
595   int rank, size, src, index, datasize;
596   MPI_Request *requests;
597   void **tmpbufs;
598
599   rank = smpi_comm_rank(comm);
600   size = smpi_comm_size(comm);
601   if(rank != root) {
602     // Send buffer to root
603     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
604   } else {
605     datasize = smpi_datatype_size(datatype);
606     // Local copy from root
607     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
608     // Receive buffers from senders
609     //TODO: make a MPI_barrier here ?
610     requests = xbt_new(MPI_Request, size - 1);
611     tmpbufs = xbt_new(void *, size - 1);
612     index = 0;
613     for(src = 0; src < size; src++) {
614       if(src != root) {
615         tmpbufs[index] = xbt_malloc(count * datasize);
616         requests[index] =
617             smpi_irecv_init(tmpbufs[index], count, datatype, src,
618                             system_tag, comm);
619         index++;
620       }
621     }
622     // Wait for completion of irecv's.
623     smpi_mpi_startall(size - 1, requests);
624     for(src = 0; src < size - 1; src++) {
625       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
626       if(index == MPI_UNDEFINED) {
627         break;
628       }
629       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
630     }
631     for(index = 0; index < size - 1; index++) {
632       xbt_free(tmpbufs[index]);
633     }
634     xbt_free(tmpbufs);
635     xbt_free(requests);
636   }
637 }
638
639 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
640                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
641 {
642   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
643   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
644
645 /*
646 FIXME: buggy implementation
647
648   int system_tag = 666;
649   int rank, size, other, index, datasize;
650   MPI_Request* requests;
651   void** tmpbufs;
652
653   rank = smpi_comm_rank(comm);
654   size = smpi_comm_size(comm);
655   datasize = smpi_datatype_size(datatype);
656   // Local copy from self
657   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
658   // Send/Recv buffers to/from others;
659   //TODO: make a MPI_barrier here ?
660   requests = xbt_new(MPI_Request, 2 * (size - 1));
661   tmpbufs = xbt_new(void*, size - 1);
662   index = 0;
663   for(other = 0; other < size; other++) {
664     if(other != rank) {
665       tmpbufs[index / 2] = xbt_malloc(count * datasize);
666       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
667       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
668       index += 2;
669     }
670   }
671   // Wait for completion of all comms.
672   for(other = 0; other < 2 * (size - 1); other++) {
673     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
674     if(index == MPI_UNDEFINED) {
675       break;
676     }
677     if((index & 1) == 1) {
678       // Request is odd: it's a irecv
679       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
680     }
681   }
682   for(index = 0; index < size - 1; index++) {
683     xbt_free(tmpbufs[index]);
684   }
685   xbt_free(tmpbufs);
686   xbt_free(requests);
687 */
688 }
689
690 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
691                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
692 {
693   int system_tag = 666;
694   int rank, size, other, index, datasize;
695   int total;
696   MPI_Request *requests;
697   void **tmpbufs;
698
699   rank = smpi_comm_rank(comm);
700   size = smpi_comm_size(comm);
701   datasize = smpi_datatype_size(datatype);
702   // Local copy from self
703   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
704   // Send/Recv buffers to/from others;
705   total = rank + (size - (rank + 1));
706   requests = xbt_new(MPI_Request, total);
707   tmpbufs = xbt_new(void *, rank);
708   index = 0;
709   for(other = 0; other < rank; other++) {
710     tmpbufs[index] = xbt_malloc(count * datasize);
711     requests[index] =
712         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
713                         comm);
714     index++;
715   }
716   for(other = rank + 1; other < size; other++) {
717     requests[index] =
718         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
719     index++;
720   }
721   // Wait for completion of all comms.
722   smpi_mpi_startall(size - 1, requests);
723   for(other = 0; other < total; other++) {
724     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
725     if(index == MPI_UNDEFINED) {
726       break;
727     }
728     if(index < rank) {
729       // #Request is below rank: it's a irecv
730       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
731     }
732   }
733   for(index = 0; index < size - 1; index++) {
734     xbt_free(tmpbufs[index]);
735   }
736   xbt_free(tmpbufs);
737   xbt_free(requests);
738 }