Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove usage of xbt_assert[0-9].
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12                                 "Logging specific to SMPI (base)");
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
21 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22
23 static int match_recv(void* a, void* b) {
24    MPI_Request ref = (MPI_Request)a;
25    MPI_Request req = (MPI_Request)b;
26
27    xbt_assert(ref, "Cannot match recv against null reference");
28    xbt_assert(req, "Cannot match recv against null request");
29    return req->comm == ref->comm
30           && (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
31           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
32 }
33
34 static int match_send(void* a, void* b) {
35    MPI_Request ref = (MPI_Request)a;
36    MPI_Request req = (MPI_Request)b;
37
38    xbt_assert(ref, "Cannot match send against null reference");
39    xbt_assert(req, "Cannot match send against null request");
40    return req->comm == ref->comm
41           && (req->src == MPI_ANY_SOURCE || req->src == ref->src)
42           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
43 }
44
45 static MPI_Request build_request(void *buf, int count,
46                                  MPI_Datatype datatype, int src, int dst,
47                                  int tag, MPI_Comm comm, unsigned flags)
48 {
49   MPI_Request request;
50
51   request = xbt_new(s_smpi_mpi_request_t, 1);
52   request->buf = buf;
53   request->size = smpi_datatype_size(datatype) * count;
54   request->src = src;
55   request->dst = dst;
56   request->tag = tag;
57   request->comm = comm;
58   request->action = NULL;
59   request->flags = flags;
60 #ifdef HAVE_TRACING
61   request->send = 0;
62   request->recv = 0;
63 #endif
64   return request;
65 }
66
67 /* MPI Low level calls */
68 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
69                                int dst, int tag, MPI_Comm comm)
70 {
71   MPI_Request request =
72       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
73                     comm, PERSISTENT | SEND);
74
75   return request;
76 }
77
78 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
79                                int src, int tag, MPI_Comm comm)
80 {
81   MPI_Request request =
82       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
83                     comm, PERSISTENT | RECV);
84
85   return request;
86 }
87
88 void smpi_mpi_start(MPI_Request request)
89 {
90   smx_rdv_t mailbox;
91
92   xbt_assert(!request->action,
93               "Cannot (re)start a non-finished communication");
94   if(request->flags & RECV) {
95     print_request("New recv", request);
96     mailbox = smpi_process_mailbox();
97     request->action = SIMIX_req_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
98   } else {
99     print_request("New send", request);
100     mailbox = smpi_process_remote_mailbox(request->dst);
101     request->action = SIMIX_req_comm_isend(mailbox, request->size, -1.0,
102                                            request->buf, request->size, &match_send, request, 0);
103 #ifdef HAVE_TRACING
104     SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
105 #endif
106   }
107 }
108
109 void smpi_mpi_startall(int count, MPI_Request * requests)
110 {
111   int i;
112
113   for(i = 0; i < count; i++) {
114     smpi_mpi_start(requests[i]);
115   }
116 }
117
118 void smpi_mpi_request_free(MPI_Request * request)
119 {
120   xbt_free(*request);
121   *request = MPI_REQUEST_NULL;
122 }
123
124 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
125                             int dst, int tag, MPI_Comm comm)
126 {
127   MPI_Request request =
128       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
129                     comm, NON_PERSISTENT | SEND);
130
131   return request;
132 }
133
134 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
135                            int dst, int tag, MPI_Comm comm)
136 {
137   MPI_Request request =
138       smpi_isend_init(buf, count, datatype, dst, tag, comm);
139
140   smpi_mpi_start(request);
141   return request;
142 }
143
144 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
145                             int src, int tag, MPI_Comm comm)
146 {
147   MPI_Request request =
148       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
149                     comm, NON_PERSISTENT | RECV);
150
151   return request;
152 }
153
154 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
155                            int src, int tag, MPI_Comm comm)
156 {
157   MPI_Request request =
158       smpi_irecv_init(buf, count, datatype, src, tag, comm);
159
160   smpi_mpi_start(request);
161   return request;
162 }
163
164 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
165                    int tag, MPI_Comm comm, MPI_Status * status)
166 {
167   MPI_Request request;
168
169   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
170   smpi_mpi_wait(&request, status);
171 }
172
173 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
174                    int tag, MPI_Comm comm)
175 {
176   MPI_Request request;
177
178   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
179   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
180 }
181
182 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
183                        int dst, int sendtag, void *recvbuf, int recvcount,
184                        MPI_Datatype recvtype, int src, int recvtag,
185                        MPI_Comm comm, MPI_Status * status)
186 {
187   MPI_Request requests[2];
188   MPI_Status stats[2];
189
190   requests[0] =
191       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
192   requests[1] =
193       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
194   smpi_mpi_startall(2, requests);
195   smpi_mpi_waitall(2, requests, stats);
196   if(status != MPI_STATUS_IGNORE) {
197     // Copy receive status
198     memcpy(status, &stats[1], sizeof(MPI_Status));
199   }
200 }
201
202 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
203 {
204   return status->count / smpi_datatype_size(datatype);
205 }
206
207 static void finish_wait(MPI_Request * request, MPI_Status * status)
208 {
209   MPI_Request req = *request;
210
211   if(status != MPI_STATUS_IGNORE) {
212     status->MPI_SOURCE = req->src;
213     status->MPI_TAG = req->tag;
214     status->MPI_ERROR = MPI_SUCCESS;
215     status->count = req->size;
216   }
217   print_request("Finishing", req);
218   if(req->flags & NON_PERSISTENT) {
219     smpi_mpi_request_free(request);
220   } else {
221     req->action = NULL;
222   }
223 }
224
225 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
226 {
227    int flag = SIMIX_req_comm_test((*request)->action);
228
229    if(flag) {
230       smpi_mpi_wait(request, status);
231    }
232    return flag;
233 }
234
235 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
236                      MPI_Status * status)
237 {
238   xbt_dynar_t comms;
239   int i, flag, size;
240   int* map;
241
242   *index = MPI_UNDEFINED;
243   flag = 0;
244   if(count > 0) {
245     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
246     map = xbt_new(int, count);
247     size = 0;
248     for(i = 0; i < count; i++) {
249       if(requests[i]->action) {
250          xbt_dynar_push(comms, &requests[i]->action);
251          map[size] = i;
252          size++;
253       }
254     }
255     if(size > 0) {
256       *index = SIMIX_req_comm_testany(comms);
257       *index = map[*index];
258       if(*index != MPI_UNDEFINED) {
259         smpi_mpi_wait(&requests[*index], status);
260         flag = 1;
261       }
262     }
263     xbt_free(map);
264     xbt_dynar_free(&comms);
265   }
266   return flag;
267 }
268
269 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
270 {
271   print_request("Waiting", *request);
272   SIMIX_req_comm_wait((*request)->action, -1.0);
273   finish_wait(request, status);
274 }
275
276 int smpi_mpi_waitany(int count, MPI_Request requests[],
277                      MPI_Status * status)
278 {
279   xbt_dynar_t comms;
280   int i, size, index;
281   int *map;
282
283   index = MPI_UNDEFINED;
284   if(count > 0) {
285     // Wait for a request to complete
286     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
287     map = xbt_new(int, count);
288     size = 0;
289     XBT_DEBUG("Wait for one of");
290     for(i = 0; i < count; i++) {
291       if(requests[i] != MPI_REQUEST_NULL) {
292         print_request("   ", requests[i]);
293         xbt_dynar_push(comms, &requests[i]->action);
294         map[size] = i;
295         size++;
296       }
297     }
298     if(size > 0) {
299       index = SIMIX_req_comm_waitany(comms);
300       index = map[index];
301       finish_wait(&requests[index], status);
302     }
303     xbt_free(map);
304     xbt_dynar_free(&comms);
305   }
306   return index;
307 }
308
309 void smpi_mpi_waitall(int count, MPI_Request requests[],
310                       MPI_Status status[])
311 {
312   int index, c;
313   MPI_Status stat;
314   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
315
316   for(c = 0; c < count; c++) {
317     if(MC_IS_ENABLED) {
318       smpi_mpi_wait(&requests[c], pstat);
319       index = c;
320     } else {
321       index = smpi_mpi_waitany(count, requests, pstat);
322       if(index == MPI_UNDEFINED) {
323         break;
324       }
325     }
326     if(status != MPI_STATUS_IGNORE) {
327       memcpy(&status[index], pstat, sizeof(*pstat));
328     }
329   }
330 }
331
332 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
333                       MPI_Status status[])
334 {
335   int i, count, index;
336
337   count = 0;
338   for(i = 0; i < incount; i++) {
339      if(smpi_mpi_testany(incount, requests, &index, status)) {
340        indices[count] = index;
341        count++;
342      }
343   }
344   return count;
345 }
346
347 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
348                     MPI_Comm comm)
349 {
350   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
351   nary_tree_bcast(buf, count, datatype, root, comm, 4);
352 }
353
354 void smpi_mpi_barrier(MPI_Comm comm)
355 {
356   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
357   nary_tree_barrier(comm, 4);
358 }
359
360 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
361                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
362                      int root, MPI_Comm comm)
363 {
364   int system_tag = 666;
365   int rank, size, src, index, sendsize, recvsize;
366   MPI_Request *requests;
367
368   rank = smpi_comm_rank(comm);
369   size = smpi_comm_size(comm);
370   if(rank != root) {
371     // Send buffer to root
372     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
373   } else {
374     sendsize = smpi_datatype_size(sendtype);
375     recvsize = smpi_datatype_size(recvtype);
376     // Local copy from root
377     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
378            sendcount * sendsize * sizeof(char));
379     // Receive buffers from senders
380     requests = xbt_new(MPI_Request, size - 1);
381     index = 0;
382     for(src = 0; src < size; src++) {
383       if(src != root) {
384         requests[index] = smpi_irecv_init(&((char *) recvbuf)
385                                           [src * recvcount * recvsize],
386                                           recvcount, recvtype, src,
387                                           system_tag, comm);
388         index++;
389       }
390     }
391     // Wait for completion of irecv's.
392     smpi_mpi_startall(size - 1, requests);
393     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
394     xbt_free(requests);
395   }
396 }
397
398 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
399                       void *recvbuf, int *recvcounts, int *displs,
400                       MPI_Datatype recvtype, int root, MPI_Comm comm)
401 {
402   int system_tag = 666;
403   int rank, size, src, index, sendsize;
404   MPI_Request *requests;
405
406   rank = smpi_comm_rank(comm);
407   size = smpi_comm_size(comm);
408   if(rank != root) {
409     // Send buffer to root
410     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
411   } else {
412     sendsize = smpi_datatype_size(sendtype);
413     // Local copy from root
414     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
415            sendcount * sendsize * sizeof(char));
416     // Receive buffers from senders
417     requests = xbt_new(MPI_Request, size - 1);
418     index = 0;
419     for(src = 0; src < size; src++) {
420       if(src != root) {
421         requests[index] =
422             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
423                             recvcounts[src], recvtype, src, system_tag,
424                             comm);
425         index++;
426       }
427     }
428     // Wait for completion of irecv's.
429     smpi_mpi_startall(size - 1, requests);
430     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
431     xbt_free(requests);
432   }
433 }
434
435 void smpi_mpi_allgather(void *sendbuf, int sendcount,
436                         MPI_Datatype sendtype, void *recvbuf,
437                         int recvcount, MPI_Datatype recvtype,
438                         MPI_Comm comm)
439 {
440   int system_tag = 666;
441   int rank, size, other, index, sendsize, recvsize;
442   MPI_Request *requests;
443
444   rank = smpi_comm_rank(comm);
445   size = smpi_comm_size(comm);
446   sendsize = smpi_datatype_size(sendtype);
447   recvsize = smpi_datatype_size(recvtype);
448   // Local copy from self
449   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
450          sendcount * sendsize * sizeof(char));
451   // Send/Recv buffers to/from others;
452   requests = xbt_new(MPI_Request, 2 * (size - 1));
453   index = 0;
454   for(other = 0; other < size; other++) {
455     if(other != rank) {
456       requests[index] =
457           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
458                           comm);
459       index++;
460       requests[index] = smpi_irecv_init(&((char *) recvbuf)
461                                         [other * recvcount * recvsize],
462                                         recvcount, recvtype, other,
463                                         system_tag, comm);
464       index++;
465     }
466   }
467   // Wait for completion of all comms.
468   smpi_mpi_startall(2 * (size - 1), requests);
469   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
470   xbt_free(requests);
471 }
472
473 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
474                          MPI_Datatype sendtype, void *recvbuf,
475                          int *recvcounts, int *displs,
476                          MPI_Datatype recvtype, MPI_Comm comm)
477 {
478   int system_tag = 666;
479   int rank, size, other, index, sendsize, recvsize;
480   MPI_Request *requests;
481
482   rank = smpi_comm_rank(comm);
483   size = smpi_comm_size(comm);
484   sendsize = smpi_datatype_size(sendtype);
485   recvsize = smpi_datatype_size(recvtype);
486   // Local copy from self
487   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
488          sendcount * sendsize * sizeof(char));
489   // Send buffers to others;
490   requests = xbt_new(MPI_Request, 2 * (size - 1));
491   index = 0;
492   for(other = 0; other < size; other++) {
493     if(other != rank) {
494       requests[index] =
495           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
496                           comm);
497       index++;
498       requests[index] =
499           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
500                           recvcounts[other], recvtype, other, system_tag,
501                           comm);
502       index++;
503     }
504   }
505   // Wait for completion of all comms.
506   smpi_mpi_startall(2 * (size - 1), requests);
507   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
508   xbt_free(requests);
509 }
510
511 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
512                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
513                       int root, MPI_Comm comm)
514 {
515   int system_tag = 666;
516   int rank, size, dst, index, sendsize, recvsize;
517   MPI_Request *requests;
518
519   rank = smpi_comm_rank(comm);
520   size = smpi_comm_size(comm);
521   if(rank != root) {
522     // Recv buffer from root
523     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
524                   MPI_STATUS_IGNORE);
525   } else {
526     sendsize = smpi_datatype_size(sendtype);
527     recvsize = smpi_datatype_size(recvtype);
528     // Local copy from root
529     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
530            recvcount * recvsize * sizeof(char));
531     // Send buffers to receivers
532     requests = xbt_new(MPI_Request, size - 1);
533     index = 0;
534     for(dst = 0; dst < size; dst++) {
535       if(dst != root) {
536         requests[index] = smpi_isend_init(&((char *) sendbuf)
537                                           [dst * sendcount * sendsize],
538                                           sendcount, sendtype, dst,
539                                           system_tag, comm);
540         index++;
541       }
542     }
543     // Wait for completion of isend's.
544     smpi_mpi_startall(size - 1, requests);
545     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
546     xbt_free(requests);
547   }
548 }
549
550 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
551                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
552                        MPI_Datatype recvtype, int root, MPI_Comm comm)
553 {
554   int system_tag = 666;
555   int rank, size, dst, index, sendsize, recvsize;
556   MPI_Request *requests;
557
558   rank = smpi_comm_rank(comm);
559   size = smpi_comm_size(comm);
560   if(rank != root) {
561     // Recv buffer from root
562     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
563                   MPI_STATUS_IGNORE);
564   } else {
565     sendsize = smpi_datatype_size(sendtype);
566     recvsize = smpi_datatype_size(recvtype);
567     // Local copy from root
568     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
569            recvcount * recvsize * sizeof(char));
570     // Send buffers to receivers
571     requests = xbt_new(MPI_Request, size - 1);
572     index = 0;
573     for(dst = 0; dst < size; dst++) {
574       if(dst != root) {
575         requests[index] =
576             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
577                             sendcounts[dst], sendtype, dst, system_tag,
578                             comm);
579         index++;
580       }
581     }
582     // Wait for completion of isend's.
583     smpi_mpi_startall(size - 1, requests);
584     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
585     xbt_free(requests);
586   }
587 }
588
589 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
590                      MPI_Datatype datatype, MPI_Op op, int root,
591                      MPI_Comm comm)
592 {
593   int system_tag = 666;
594   int rank, size, src, index, datasize;
595   MPI_Request *requests;
596   void **tmpbufs;
597
598   rank = smpi_comm_rank(comm);
599   size = smpi_comm_size(comm);
600   if(rank != root) {
601     // Send buffer to root
602     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
603   } else {
604     datasize = smpi_datatype_size(datatype);
605     // Local copy from root
606     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
607     // Receive buffers from senders
608     //TODO: make a MPI_barrier here ?
609     requests = xbt_new(MPI_Request, size - 1);
610     tmpbufs = xbt_new(void *, size - 1);
611     index = 0;
612     for(src = 0; src < size; src++) {
613       if(src != root) {
614         tmpbufs[index] = xbt_malloc(count * datasize);
615         requests[index] =
616             smpi_irecv_init(tmpbufs[index], count, datatype, src,
617                             system_tag, comm);
618         index++;
619       }
620     }
621     // Wait for completion of irecv's.
622     smpi_mpi_startall(size - 1, requests);
623     for(src = 0; src < size - 1; src++) {
624       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
625       if(index == MPI_UNDEFINED) {
626         break;
627       }
628       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
629     }
630     for(index = 0; index < size - 1; index++) {
631       xbt_free(tmpbufs[index]);
632     }
633     xbt_free(tmpbufs);
634     xbt_free(requests);
635   }
636 }
637
638 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
639                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
640 {
641   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
642   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
643
644 /*
645 FIXME: buggy implementation
646
647   int system_tag = 666;
648   int rank, size, other, index, datasize;
649   MPI_Request* requests;
650   void** tmpbufs;
651
652   rank = smpi_comm_rank(comm);
653   size = smpi_comm_size(comm);
654   datasize = smpi_datatype_size(datatype);
655   // Local copy from self
656   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
657   // Send/Recv buffers to/from others;
658   //TODO: make a MPI_barrier here ?
659   requests = xbt_new(MPI_Request, 2 * (size - 1));
660   tmpbufs = xbt_new(void*, size - 1);
661   index = 0;
662   for(other = 0; other < size; other++) {
663     if(other != rank) {
664       tmpbufs[index / 2] = xbt_malloc(count * datasize);
665       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
666       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
667       index += 2;
668     }
669   }
670   // Wait for completion of all comms.
671   for(other = 0; other < 2 * (size - 1); other++) {
672     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
673     if(index == MPI_UNDEFINED) {
674       break;
675     }
676     if((index & 1) == 1) {
677       // Request is odd: it's a irecv
678       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
679     }
680   }
681   for(index = 0; index < size - 1; index++) {
682     xbt_free(tmpbufs[index]);
683   }
684   xbt_free(tmpbufs);
685   xbt_free(requests);
686 */
687 }
688
689 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
690                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
691 {
692   int system_tag = 666;
693   int rank, size, other, index, datasize;
694   int total;
695   MPI_Request *requests;
696   void **tmpbufs;
697
698   rank = smpi_comm_rank(comm);
699   size = smpi_comm_size(comm);
700   datasize = smpi_datatype_size(datatype);
701   // Local copy from self
702   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
703   // Send/Recv buffers to/from others;
704   total = rank + (size - (rank + 1));
705   requests = xbt_new(MPI_Request, total);
706   tmpbufs = xbt_new(void *, rank);
707   index = 0;
708   for(other = 0; other < rank; other++) {
709     tmpbufs[index] = xbt_malloc(count * datasize);
710     requests[index] =
711         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
712                         comm);
713     index++;
714   }
715   for(other = rank + 1; other < size; other++) {
716     requests[index] =
717         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
718     index++;
719   }
720   // Wait for completion of all comms.
721   smpi_mpi_startall(size - 1, requests);
722   for(other = 0; other < total; other++) {
723     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
724     if(index == MPI_UNDEFINED) {
725       break;
726     }
727     if(index < rank) {
728       // #Request is below rank: it's a irecv
729       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
730     }
731   }
732   for(index = 0; index < size - 1; index++) {
733     xbt_free(tmpbufs[index]);
734   }
735   xbt_free(tmpbufs);
736   xbt_free(requests);
737 }