Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MPI_Comm_split is back to life.
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 void smpi_process_init(int *argc, char ***argv)
23 {
24   int index;
25   smpi_process_data_t data;
26   smx_process_t proc;
27
28   proc = SIMIX_process_self();
29   index = atoi((*argv)[1]);
30   data = smpi_process_remote_data(index);
31   SIMIX_process_set_data(proc, data);
32   if (*argc > 2) {
33     free((*argv)[1]);
34     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
35     (*argv)[(*argc) - 1] = NULL;
36   }
37   (*argc)--;
38   DEBUG2("<%d> New process in the game: %p", index, proc);
39 }
40
41 void smpi_process_destroy(void)
42 {
43   int index = smpi_process_index();
44
45   DEBUG1("<%d> Process left the game", index);
46 }
47
48 static MPI_Request build_request(void *buf, int count,
49                                  MPI_Datatype datatype, int src, int dst,
50                                  int tag, MPI_Comm comm, unsigned flags)
51 {
52   MPI_Request request;
53
54   request = xbt_new(s_smpi_mpi_request_t, 1);
55   request->buf = buf;
56   request->size = smpi_datatype_size(datatype) * count;
57   request->src = src;
58   request->dst = dst;
59   request->tag = tag;
60   request->comm = comm;
61   request->rdv = NULL;
62   request->pair = NULL;
63   request->complete = 0;
64   request->match = MPI_REQUEST_NULL;
65   request->flags = flags;
66 #ifdef HAVE_TRACING
67   request->send = 0;
68   request->recv = 0;
69 #endif
70   return request;
71 }
72
73 /* MPI Low level calls */
74 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
75                                int dst, int tag, MPI_Comm comm)
76 {
77   MPI_Request request =
78       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
79                     comm, PERSISTENT | SEND);
80
81   return request;
82 }
83
84 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
85                                int src, int tag, MPI_Comm comm)
86 {
87   MPI_Request request =
88       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
89                     comm, PERSISTENT | RECV);
90
91   return request;
92 }
93
94 void smpi_mpi_start(MPI_Request request)
95 {
96   xbt_assert0(request->complete == 0,
97               "Cannot start a non-finished communication");
98   if ((request->flags & RECV) == RECV) {
99     smpi_process_post_recv(request);
100     print_request("New recv", request);
101     request->pair =
102         SIMIX_network_irecv(request->rdv, request->buf, &request->size);
103   } else {
104     smpi_process_post_send(request->comm, request);     // FIXME
105     print_request("New send", request);
106     request->pair =
107         SIMIX_network_isend(request->rdv, request->size, -1.0,
108                             request->buf, request->size, NULL);
109   }
110 }
111
112 void smpi_mpi_startall(int count, MPI_Request * requests)
113 {
114   int i;
115
116   for (i = 0; i < count; i++) {
117     smpi_mpi_start(requests[i]);
118   }
119 }
120
121 void smpi_mpi_request_free(MPI_Request * request)
122 {
123   xbt_free(*request);
124   *request = MPI_REQUEST_NULL;
125 }
126
127 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
128                             int dst, int tag, MPI_Comm comm)
129 {
130   MPI_Request request =
131       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132                     comm, NON_PERSISTENT | SEND);
133
134   return request;
135 }
136
137 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
138                            int dst, int tag, MPI_Comm comm)
139 {
140   MPI_Request request =
141       smpi_isend_init(buf, count, datatype, dst, tag, comm);
142
143   smpi_mpi_start(request);
144   return request;
145 }
146
147 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
148                             int src, int tag, MPI_Comm comm)
149 {
150   MPI_Request request =
151       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
152                     comm, NON_PERSISTENT | RECV);
153
154   return request;
155 }
156
157 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
158                            int src, int tag, MPI_Comm comm)
159 {
160   MPI_Request request =
161       smpi_irecv_init(buf, count, datatype, src, tag, comm);
162
163   smpi_mpi_start(request);
164   return request;
165 }
166
167 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
168                    int tag, MPI_Comm comm, MPI_Status * status)
169 {
170   MPI_Request request;
171
172   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
173   smpi_mpi_wait(&request, status);
174 }
175
176 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
177                    int tag, MPI_Comm comm)
178 {
179   MPI_Request request;
180
181   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
182   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
183 }
184
185 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
186                        int dst, int sendtag, void *recvbuf, int recvcount,
187                        MPI_Datatype recvtype, int src, int recvtag,
188                        MPI_Comm comm, MPI_Status * status)
189 {
190   MPI_Request requests[2];
191   MPI_Status stats[2];
192
193   requests[0] =
194       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
195   requests[1] =
196       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
197   smpi_mpi_startall(2, requests);
198   smpi_mpi_waitall(2, requests, stats);
199   if (status != MPI_STATUS_IGNORE) {
200     // Copy receive status
201     memcpy(status, &stats[1], sizeof(MPI_Status));
202   }
203 }
204
205 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
206 {
207   return status->count / smpi_datatype_size(datatype);
208 }
209
210 static void finish_wait(MPI_Request * request, MPI_Status * status)
211 {
212   if (status != MPI_STATUS_IGNORE) {
213     status->MPI_SOURCE = (*request)->src;
214     status->MPI_TAG = (*request)->tag;
215     status->MPI_ERROR = MPI_SUCCESS;
216     status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
217   }
218   SIMIX_communication_destroy((*request)->pair);
219   print_request("finishing wait", *request);
220   if ((*request)->complete == 1) {
221     SIMIX_rdv_destroy((*request)->rdv);
222   } else {
223     (*request)->match->complete = 1;
224     (*request)->match->match = MPI_REQUEST_NULL;
225   }
226   if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
227     smpi_mpi_request_free(request);
228   } else {
229     (*request)->rdv = NULL;
230     (*request)->pair = NULL;
231   }
232 }
233
234 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
235 {
236   int flag = (*request)->complete;
237
238   if (flag) {
239     smpi_mpi_wait(request, status);
240   }
241   return flag;
242 }
243
244 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
245                      MPI_Status * status)
246 {
247   int i, flag;
248
249   *index = MPI_UNDEFINED;
250   flag = 0;
251   for (i = 0; i < count; i++) {
252     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
253       smpi_mpi_wait(&requests[i], status);
254       *index = i;
255       flag = 1;
256       break;
257     }
258   }
259   return flag;
260 }
261
262 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
263 {
264   print_request("wait", *request);
265   SIMIX_network_wait((*request)->pair, -1.0);
266   finish_wait(request, status);
267 }
268
269 int smpi_mpi_waitany(int count, MPI_Request requests[],
270                      MPI_Status * status)
271 {
272   xbt_dynar_t comms;
273   int i, size, index;
274   int *map;
275
276   index = MPI_UNDEFINED;
277   if (count > 0) {
278     // First check for already completed requests
279     for (i = 0; i < count; i++) {
280       if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
281         index = i;
282         smpi_mpi_wait(&requests[index], status);
283         break;
284       }
285     }
286     if (index == MPI_UNDEFINED) {
287       // Otherwise, wait for a request to complete
288       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
289       map = xbt_new(int, count);
290       size = 0;
291       DEBUG0("Wait for one of");
292       for (i = 0; i < count; i++) {
293         if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
294           print_request("   ", requests[i]);
295           xbt_dynar_push(comms, &requests[i]->pair);
296           map[size] = i;
297           size++;
298         }
299       }
300       if (size > 0) {
301         index = SIMIX_network_waitany(comms);
302         index = map[index];
303         finish_wait(&requests[index], status);
304       }
305       xbt_free(map);
306       xbt_dynar_free(&comms);
307     }
308   }
309   return index;
310 }
311
312 void smpi_mpi_waitall(int count, MPI_Request requests[],
313                       MPI_Status status[])
314 {
315   int index;
316   MPI_Status stat;
317
318   while (count > 0) {
319     index = smpi_mpi_waitany(count, requests, &stat);
320     if (index == MPI_UNDEFINED) {
321       break;
322     }
323     if (status != MPI_STATUS_IGNORE) {
324       memcpy(&status[index], &stat, sizeof(stat));
325     }
326     // FIXME: check this -v
327     // Move the last request to the found position
328     requests[index] = requests[count - 1];
329     requests[count - 1] = MPI_REQUEST_NULL;
330     count--;
331   }
332 }
333
334 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
335                       MPI_Status status[])
336 {
337   int i, count;
338
339   count = 0;
340   for (i = 0; i < incount; i++) {
341     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
342       smpi_mpi_wait(&requests[i],
343                     status !=
344                     MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
345       indices[count] = i;
346       count++;
347     }
348   }
349   return count;
350 }
351
352 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
353                     MPI_Comm comm)
354 {
355   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
356   nary_tree_bcast(buf, count, datatype, root, comm, 4);
357 }
358
359 void smpi_mpi_barrier(MPI_Comm comm)
360 {
361   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
362   nary_tree_barrier(comm, 4);
363 }
364
365 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
366                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
367                      int root, MPI_Comm comm)
368 {
369   int system_tag = 666;
370   int rank, size, src, index, sendsize, recvsize;
371   MPI_Request *requests;
372
373   rank = smpi_comm_rank(comm);
374   size = smpi_comm_size(comm);
375   if (rank != root) {
376     // Send buffer to root
377     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
378   } else {
379     sendsize = smpi_datatype_size(sendtype);
380     recvsize = smpi_datatype_size(recvtype);
381     // Local copy from root
382     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
383            sendcount * sendsize * sizeof(char));
384     // Receive buffers from senders
385     requests = xbt_new(MPI_Request, size - 1);
386     index = 0;
387     for (src = 0; src < size; src++) {
388       if (src != root) {
389         requests[index] = smpi_irecv_init(&((char *) recvbuf)
390                                           [src * recvcount * recvsize],
391                                           recvcount, recvtype, src,
392                                           system_tag, comm);
393         index++;
394       }
395     }
396     // Wait for completion of irecv's.
397     smpi_mpi_startall(size - 1, requests);
398     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
399     xbt_free(requests);
400   }
401 }
402
403 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
404                       void *recvbuf, int *recvcounts, int *displs,
405                       MPI_Datatype recvtype, int root, MPI_Comm comm)
406 {
407   int system_tag = 666;
408   int rank, size, src, index, sendsize;
409   MPI_Request *requests;
410
411   rank = smpi_comm_rank(comm);
412   size = smpi_comm_size(comm);
413   if (rank != root) {
414     // Send buffer to root
415     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
416   } else {
417     sendsize = smpi_datatype_size(sendtype);
418     // Local copy from root
419     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
420            sendcount * sendsize * sizeof(char));
421     // Receive buffers from senders
422     requests = xbt_new(MPI_Request, size - 1);
423     index = 0;
424     for (src = 0; src < size; src++) {
425       if (src != root) {
426         requests[index] =
427             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
428                             recvcounts[src], recvtype, src, system_tag,
429                             comm);
430         index++;
431       }
432     }
433     // Wait for completion of irecv's.
434     smpi_mpi_startall(size - 1, requests);
435     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
436     xbt_free(requests);
437   }
438 }
439
440 void smpi_mpi_allgather(void *sendbuf, int sendcount,
441                         MPI_Datatype sendtype, void *recvbuf,
442                         int recvcount, MPI_Datatype recvtype,
443                         MPI_Comm comm)
444 {
445   int system_tag = 666;
446   int rank, size, other, index, sendsize, recvsize;
447   MPI_Request *requests;
448
449   rank = smpi_comm_rank(comm);
450   size = smpi_comm_size(comm);
451   sendsize = smpi_datatype_size(sendtype);
452   recvsize = smpi_datatype_size(recvtype);
453   // Local copy from self
454   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
455          sendcount * sendsize * sizeof(char));
456   // Send/Recv buffers to/from others;
457   requests = xbt_new(MPI_Request, 2 * (size - 1));
458   index = 0;
459   for (other = 0; other < size; other++) {
460     if (other != rank) {
461       requests[index] =
462           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
463                           comm);
464       index++;
465       requests[index] = smpi_irecv_init(&((char *) recvbuf)
466                                         [other * recvcount * recvsize],
467                                         recvcount, recvtype, other,
468                                         system_tag, comm);
469       index++;
470     }
471   }
472   // Wait for completion of all comms.
473   smpi_mpi_startall(2 * (size - 1), requests);
474   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
475   xbt_free(requests);
476 }
477
478 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
479                          MPI_Datatype sendtype, void *recvbuf,
480                          int *recvcounts, int *displs,
481                          MPI_Datatype recvtype, MPI_Comm comm)
482 {
483   int system_tag = 666;
484   int rank, size, other, index, sendsize, recvsize;
485   MPI_Request *requests;
486
487   rank = smpi_comm_rank(comm);
488   size = smpi_comm_size(comm);
489   sendsize = smpi_datatype_size(sendtype);
490   recvsize = smpi_datatype_size(recvtype);
491   // Local copy from self
492   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
493          sendcount * sendsize * sizeof(char));
494   // Send buffers to others;
495   requests = xbt_new(MPI_Request, 2 * (size - 1));
496   index = 0;
497   for (other = 0; other < size; other++) {
498     if (other != rank) {
499       requests[index] =
500           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
501                           comm);
502       index++;
503       requests[index] =
504           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
505                           recvcounts[other], recvtype, other, system_tag,
506                           comm);
507       index++;
508     }
509   }
510   // Wait for completion of all comms.
511   smpi_mpi_startall(2 * (size - 1), requests);
512   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
513   xbt_free(requests);
514 }
515
516 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
517                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
518                       int root, MPI_Comm comm)
519 {
520   int system_tag = 666;
521   int rank, size, dst, index, sendsize, recvsize;
522   MPI_Request *requests;
523
524   rank = smpi_comm_rank(comm);
525   size = smpi_comm_size(comm);
526   if (rank != root) {
527     // Recv buffer from root
528     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
529                   MPI_STATUS_IGNORE);
530   } else {
531     sendsize = smpi_datatype_size(sendtype);
532     recvsize = smpi_datatype_size(recvtype);
533     // Local copy from root
534     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
535            recvcount * recvsize * sizeof(char));
536     // Send buffers to receivers
537     requests = xbt_new(MPI_Request, size - 1);
538     index = 0;
539     for (dst = 0; dst < size; dst++) {
540       if (dst != root) {
541         requests[index] = smpi_isend_init(&((char *) sendbuf)
542                                           [dst * sendcount * sendsize],
543                                           sendcount, sendtype, dst,
544                                           system_tag, comm);
545         index++;
546       }
547     }
548     // Wait for completion of isend's.
549     smpi_mpi_startall(size - 1, requests);
550     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
551     xbt_free(requests);
552   }
553 }
554
555 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
556                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
557                        MPI_Datatype recvtype, int root, MPI_Comm comm)
558 {
559   int system_tag = 666;
560   int rank, size, dst, index, sendsize, recvsize;
561   MPI_Request *requests;
562
563   rank = smpi_comm_rank(comm);
564   size = smpi_comm_size(comm);
565   if (rank != root) {
566     // Recv buffer from root
567     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
568                   MPI_STATUS_IGNORE);
569   } else {
570     sendsize = smpi_datatype_size(sendtype);
571     recvsize = smpi_datatype_size(recvtype);
572     // Local copy from root
573     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
574            recvcount * recvsize * sizeof(char));
575     // Send buffers to receivers
576     requests = xbt_new(MPI_Request, size - 1);
577     index = 0;
578     for (dst = 0; dst < size; dst++) {
579       if (dst != root) {
580         requests[index] =
581             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
582                             sendcounts[dst], sendtype, dst, system_tag,
583                             comm);
584         index++;
585       }
586     }
587     // Wait for completion of isend's.
588     smpi_mpi_startall(size - 1, requests);
589     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
590     xbt_free(requests);
591   }
592 }
593
594 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
595                      MPI_Datatype datatype, MPI_Op op, int root,
596                      MPI_Comm comm)
597 {
598   int system_tag = 666;
599   int rank, size, src, index, datasize;
600   MPI_Request *requests;
601   void **tmpbufs;
602
603   rank = smpi_comm_rank(comm);
604   size = smpi_comm_size(comm);
605   if (rank != root) {
606     // Send buffer to root
607     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
608   } else {
609     datasize = smpi_datatype_size(datatype);
610     // Local copy from root
611     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
612     // Receive buffers from senders
613     //TODO: make a MPI_barrier here ?
614     requests = xbt_new(MPI_Request, size - 1);
615     tmpbufs = xbt_new(void *, size - 1);
616     index = 0;
617     for (src = 0; src < size; src++) {
618       if (src != root) {
619         tmpbufs[index] = xbt_malloc(count * datasize);
620         requests[index] =
621             smpi_irecv_init(tmpbufs[index], count, datatype, src,
622                             system_tag, comm);
623         index++;
624       }
625     }
626     // Wait for completion of irecv's.
627     smpi_mpi_startall(size - 1, requests);
628     for (src = 0; src < size - 1; src++) {
629       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
630       if (index == MPI_UNDEFINED) {
631         break;
632       }
633       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
634     }
635     for (index = 0; index < size - 1; index++) {
636       xbt_free(tmpbufs[index]);
637     }
638     xbt_free(tmpbufs);
639     xbt_free(requests);
640   }
641 }
642
643 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
644                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
645 {
646   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
647   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
648
649 /*
650 FIXME: buggy implementation
651
652   int system_tag = 666;
653   int rank, size, other, index, datasize;
654   MPI_Request* requests;
655   void** tmpbufs;
656
657   rank = smpi_comm_rank(comm);
658   size = smpi_comm_size(comm);
659   datasize = smpi_datatype_size(datatype);
660   // Local copy from self
661   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
662   // Send/Recv buffers to/from others;
663   //TODO: make a MPI_barrier here ?
664   requests = xbt_new(MPI_Request, 2 * (size - 1));
665   tmpbufs = xbt_new(void*, size - 1);
666   index = 0;
667   for(other = 0; other < size; other++) {
668     if(other != rank) {
669       tmpbufs[index / 2] = xbt_malloc(count * datasize);
670       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
671       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
672       index += 2;
673     }
674   }
675   // Wait for completion of all comms.
676   for(other = 0; other < 2 * (size - 1); other++) {
677     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
678     if(index == MPI_UNDEFINED) {
679       break;
680     }
681     if((index & 1) == 1) {
682       // Request is odd: it's a irecv
683       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
684     }
685   }
686   for(index = 0; index < size - 1; index++) {
687     xbt_free(tmpbufs[index]);
688   }
689   xbt_free(tmpbufs);
690   xbt_free(requests);
691 */
692 }
693
694 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
695                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
696 {
697   int system_tag = 666;
698   int rank, size, other, index, datasize;
699   int total;
700   MPI_Request *requests;
701   void **tmpbufs;
702
703   rank = smpi_comm_rank(comm);
704   size = smpi_comm_size(comm);
705   datasize = smpi_datatype_size(datatype);
706   // Local copy from self
707   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
708   // Send/Recv buffers to/from others;
709   total = rank + (size - (rank + 1));
710   requests = xbt_new(MPI_Request, total);
711   tmpbufs = xbt_new(void *, rank);
712   index = 0;
713   for (other = 0; other < rank; other++) {
714     tmpbufs[index] = xbt_malloc(count * datasize);
715     requests[index] =
716         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
717                         comm);
718     index++;
719   }
720   for (other = rank + 1; other < size; other++) {
721     requests[index] =
722         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
723     index++;
724   }
725   // Wait for completion of all comms.
726   smpi_mpi_startall(size - 1, requests);
727   for (other = 0; other < total; other++) {
728     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
729     if (index == MPI_UNDEFINED) {
730       break;
731     }
732     if (index < rank) {
733       // #Request is below rank: it's a irecv
734       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
735     }
736   }
737   for (index = 0; index < size - 1; index++) {
738     xbt_free(tmpbufs[index]);
739   }
740   xbt_free(tmpbufs);
741   xbt_free(requests);
742 }