Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
08d9b3049f660e5292de95a1e42b916932762d5f
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 static MPI_Request build_request(void *buf, int count,
23                                  MPI_Datatype datatype, int src, int dst,
24                                  int tag, MPI_Comm comm, unsigned flags)
25 {
26   MPI_Request request;
27
28   request = xbt_new(s_smpi_mpi_request_t, 1);
29   request->buf = buf;
30   request->size = smpi_datatype_size(datatype) * count;
31   request->src = src;
32   request->dst = dst;
33   request->tag = tag;
34   request->comm = comm;
35   request->rdv = NULL;
36   request->pair = NULL;
37   request->complete = 0;
38   request->match = MPI_REQUEST_NULL;
39   request->flags = flags;
40 #ifdef HAVE_TRACING
41   request->send = 0;
42   request->recv = 0;
43 #endif
44   return request;
45 }
46
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49                                int dst, int tag, MPI_Comm comm)
50 {
51   MPI_Request request =
52       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53                     comm, PERSISTENT | SEND);
54
55   return request;
56 }
57
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59                                int src, int tag, MPI_Comm comm)
60 {
61   MPI_Request request =
62       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63                     comm, PERSISTENT | RECV);
64
65   return request;
66 }
67
68 void smpi_mpi_start(MPI_Request request)
69 {
70   xbt_assert0(request->complete == 0,
71               "Cannot start a non-finished communication");
72   if ((request->flags & RECV) == RECV) {
73     smpi_process_post_recv(request);
74     print_request("New recv", request);
75     request->pair =
76         SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size);
77   } else {
78     smpi_process_post_send(request->comm, request);     // FIXME
79     print_request("New send", request);
80     request->pair =
81         SIMIX_req_comm_isend(request->rdv, request->size, -1.0,
82                             request->buf, request->size, NULL);
83   }
84 }
85
86 void smpi_mpi_startall(int count, MPI_Request * requests)
87 {
88   int i;
89
90   for (i = 0; i < count; i++) {
91     smpi_mpi_start(requests[i]);
92   }
93 }
94
95 void smpi_mpi_request_free(MPI_Request * request)
96 {
97   xbt_free(*request);
98   *request = MPI_REQUEST_NULL;
99 }
100
101 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
102                             int dst, int tag, MPI_Comm comm)
103 {
104   MPI_Request request =
105       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
106                     comm, NON_PERSISTENT | SEND);
107
108   return request;
109 }
110
111 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
112                            int dst, int tag, MPI_Comm comm)
113 {
114   MPI_Request request =
115       smpi_isend_init(buf, count, datatype, dst, tag, comm);
116
117   smpi_mpi_start(request);
118   return request;
119 }
120
121 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
122                             int src, int tag, MPI_Comm comm)
123 {
124   MPI_Request request =
125       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
126                     comm, NON_PERSISTENT | RECV);
127
128   return request;
129 }
130
131 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
132                            int src, int tag, MPI_Comm comm)
133 {
134   MPI_Request request =
135       smpi_irecv_init(buf, count, datatype, src, tag, comm);
136
137   smpi_mpi_start(request);
138   return request;
139 }
140
141 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
142                    int tag, MPI_Comm comm, MPI_Status * status)
143 {
144   MPI_Request request;
145
146   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
147   smpi_mpi_wait(&request, status);
148 }
149
150 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
151                    int tag, MPI_Comm comm)
152 {
153   MPI_Request request;
154
155   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
156   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
157 }
158
159 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
160                        int dst, int sendtag, void *recvbuf, int recvcount,
161                        MPI_Datatype recvtype, int src, int recvtag,
162                        MPI_Comm comm, MPI_Status * status)
163 {
164   MPI_Request requests[2];
165   MPI_Status stats[2];
166
167   requests[0] =
168       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
169   requests[1] =
170       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
171   smpi_mpi_startall(2, requests);
172   smpi_mpi_waitall(2, requests, stats);
173   if (status != MPI_STATUS_IGNORE) {
174     // Copy receive status
175     memcpy(status, &stats[1], sizeof(MPI_Status));
176   }
177 }
178
179 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
180 {
181   return status->count / smpi_datatype_size(datatype);
182 }
183
184 static void finish_wait(MPI_Request * request, MPI_Status * status)
185 {
186   if (status != MPI_STATUS_IGNORE) {
187     status->MPI_SOURCE = (*request)->src;
188     status->MPI_TAG = (*request)->tag;
189     status->MPI_ERROR = MPI_SUCCESS;
190     status->count = SIMIX_req_comm_get_dst_buff_size((*request)->pair);
191   }
192   SIMIX_req_comm_destroy((*request)->pair);
193   print_request("finishing wait", *request);
194   if ((*request)->complete == 1) {
195     SIMIX_req_rdv_destroy((*request)->rdv);
196   } else {
197     (*request)->match->complete = 1;
198     (*request)->match->match = MPI_REQUEST_NULL;
199   }
200   if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
201     smpi_mpi_request_free(request);
202   } else {
203     (*request)->rdv = NULL;
204     (*request)->pair = NULL;
205   }
206 }
207
208 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
209 {
210   int flag = (*request)->complete;
211
212   if (flag) {
213     smpi_mpi_wait(request, status);
214   }
215   return flag;
216 }
217
218 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
219                      MPI_Status * status)
220 {
221   int i, flag;
222
223   *index = MPI_UNDEFINED;
224   flag = 0;
225   for (i = 0; i < count; i++) {
226     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
227       smpi_mpi_wait(&requests[i], status);
228       *index = i;
229       flag = 1;
230       break;
231     }
232   }
233   return flag;
234 }
235
236 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
237 {
238   print_request("wait", *request);
239   SIMIX_req_comm_wait((*request)->pair, -1.0);
240   finish_wait(request, status);
241 }
242
243 int smpi_mpi_waitany(int count, MPI_Request requests[],
244                      MPI_Status * status)
245 {
246   xbt_dynar_t comms;
247   int i, size, index;
248   int *map;
249
250   index = MPI_UNDEFINED;
251   if (count > 0) {
252     // First check for already completed requests
253     for (i = 0; i < count; i++) {
254       if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
255         index = i;
256         smpi_mpi_wait(&requests[index], status);
257         break;
258       }
259     }
260     if (index == MPI_UNDEFINED) {
261       // Otherwise, wait for a request to complete
262       comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
263       map = xbt_new(int, count);
264       size = 0;
265       DEBUG0("Wait for one of");
266       for (i = 0; i < count; i++) {
267         if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
268           print_request("   ", requests[i]);
269           xbt_dynar_push(comms, &requests[i]->pair);
270           map[size] = i;
271           size++;
272         }
273       }
274       if (size > 0) {
275         index = SIMIX_req_comm_waitany(comms);
276         index = map[index];
277         finish_wait(&requests[index], status);
278       }
279       xbt_free(map);
280       xbt_dynar_free(&comms);
281     }
282   }
283   return index;
284 }
285
286 void smpi_mpi_waitall(int count, MPI_Request requests[],
287                       MPI_Status status[])
288 {
289   int index, c;
290   MPI_Status stat;
291   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
292
293   c = count;
294   while (c > 0) {
295     index = smpi_mpi_waitany(count, requests, pstat);
296     if (index == MPI_UNDEFINED) {
297       break;
298     }
299     if (status != MPI_STATUS_IGNORE) {
300       memcpy(&status[index], pstat, sizeof *pstat);
301     }
302     c--;
303   }
304 }
305
306 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
307                       MPI_Status status[])
308 {
309   int i, count;
310
311   count = 0;
312   for (i = 0; i < incount; i++) {
313     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
314       smpi_mpi_wait(&requests[i],
315                     status !=
316                     MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
317       indices[count] = i;
318       count++;
319     }
320   }
321   return count;
322 }
323
324 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
325                     MPI_Comm comm)
326 {
327   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
328   nary_tree_bcast(buf, count, datatype, root, comm, 4);
329 }
330
331 void smpi_mpi_barrier(MPI_Comm comm)
332 {
333   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
334   nary_tree_barrier(comm, 4);
335 }
336
337 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
338                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
339                      int root, MPI_Comm comm)
340 {
341   int system_tag = 666;
342   int rank, size, src, index, sendsize, recvsize;
343   MPI_Request *requests;
344
345   rank = smpi_comm_rank(comm);
346   size = smpi_comm_size(comm);
347   if (rank != root) {
348     // Send buffer to root
349     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
350   } else {
351     sendsize = smpi_datatype_size(sendtype);
352     recvsize = smpi_datatype_size(recvtype);
353     // Local copy from root
354     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
355            sendcount * sendsize * sizeof(char));
356     // Receive buffers from senders
357     requests = xbt_new(MPI_Request, size - 1);
358     index = 0;
359     for (src = 0; src < size; src++) {
360       if (src != root) {
361         requests[index] = smpi_irecv_init(&((char *) recvbuf)
362                                           [src * recvcount * recvsize],
363                                           recvcount, recvtype, src,
364                                           system_tag, comm);
365         index++;
366       }
367     }
368     // Wait for completion of irecv's.
369     smpi_mpi_startall(size - 1, requests);
370     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
371     xbt_free(requests);
372   }
373 }
374
375 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
376                       void *recvbuf, int *recvcounts, int *displs,
377                       MPI_Datatype recvtype, int root, MPI_Comm comm)
378 {
379   int system_tag = 666;
380   int rank, size, src, index, sendsize;
381   MPI_Request *requests;
382
383   rank = smpi_comm_rank(comm);
384   size = smpi_comm_size(comm);
385   if (rank != root) {
386     // Send buffer to root
387     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
388   } else {
389     sendsize = smpi_datatype_size(sendtype);
390     // Local copy from root
391     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
392            sendcount * sendsize * sizeof(char));
393     // Receive buffers from senders
394     requests = xbt_new(MPI_Request, size - 1);
395     index = 0;
396     for (src = 0; src < size; src++) {
397       if (src != root) {
398         requests[index] =
399             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
400                             recvcounts[src], recvtype, src, system_tag,
401                             comm);
402         index++;
403       }
404     }
405     // Wait for completion of irecv's.
406     smpi_mpi_startall(size - 1, requests);
407     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
408     xbt_free(requests);
409   }
410 }
411
412 void smpi_mpi_allgather(void *sendbuf, int sendcount,
413                         MPI_Datatype sendtype, void *recvbuf,
414                         int recvcount, MPI_Datatype recvtype,
415                         MPI_Comm comm)
416 {
417   int system_tag = 666;
418   int rank, size, other, index, sendsize, recvsize;
419   MPI_Request *requests;
420
421   rank = smpi_comm_rank(comm);
422   size = smpi_comm_size(comm);
423   sendsize = smpi_datatype_size(sendtype);
424   recvsize = smpi_datatype_size(recvtype);
425   // Local copy from self
426   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
427          sendcount * sendsize * sizeof(char));
428   // Send/Recv buffers to/from others;
429   requests = xbt_new(MPI_Request, 2 * (size - 1));
430   index = 0;
431   for (other = 0; other < size; other++) {
432     if (other != rank) {
433       requests[index] =
434           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
435                           comm);
436       index++;
437       requests[index] = smpi_irecv_init(&((char *) recvbuf)
438                                         [other * recvcount * recvsize],
439                                         recvcount, recvtype, other,
440                                         system_tag, comm);
441       index++;
442     }
443   }
444   // Wait for completion of all comms.
445   smpi_mpi_startall(2 * (size - 1), requests);
446   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
447   xbt_free(requests);
448 }
449
450 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
451                          MPI_Datatype sendtype, void *recvbuf,
452                          int *recvcounts, int *displs,
453                          MPI_Datatype recvtype, MPI_Comm comm)
454 {
455   int system_tag = 666;
456   int rank, size, other, index, sendsize, recvsize;
457   MPI_Request *requests;
458
459   rank = smpi_comm_rank(comm);
460   size = smpi_comm_size(comm);
461   sendsize = smpi_datatype_size(sendtype);
462   recvsize = smpi_datatype_size(recvtype);
463   // Local copy from self
464   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
465          sendcount * sendsize * sizeof(char));
466   // Send buffers to others;
467   requests = xbt_new(MPI_Request, 2 * (size - 1));
468   index = 0;
469   for (other = 0; other < size; other++) {
470     if (other != rank) {
471       requests[index] =
472           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
473                           comm);
474       index++;
475       requests[index] =
476           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
477                           recvcounts[other], recvtype, other, system_tag,
478                           comm);
479       index++;
480     }
481   }
482   // Wait for completion of all comms.
483   smpi_mpi_startall(2 * (size - 1), requests);
484   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
485   xbt_free(requests);
486 }
487
488 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
489                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
490                       int root, MPI_Comm comm)
491 {
492   int system_tag = 666;
493   int rank, size, dst, index, sendsize, recvsize;
494   MPI_Request *requests;
495
496   rank = smpi_comm_rank(comm);
497   size = smpi_comm_size(comm);
498   if (rank != root) {
499     // Recv buffer from root
500     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
501                   MPI_STATUS_IGNORE);
502   } else {
503     sendsize = smpi_datatype_size(sendtype);
504     recvsize = smpi_datatype_size(recvtype);
505     // Local copy from root
506     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
507            recvcount * recvsize * sizeof(char));
508     // Send buffers to receivers
509     requests = xbt_new(MPI_Request, size - 1);
510     index = 0;
511     for (dst = 0; dst < size; dst++) {
512       if (dst != root) {
513         requests[index] = smpi_isend_init(&((char *) sendbuf)
514                                           [dst * sendcount * sendsize],
515                                           sendcount, sendtype, dst,
516                                           system_tag, comm);
517         index++;
518       }
519     }
520     // Wait for completion of isend's.
521     smpi_mpi_startall(size - 1, requests);
522     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
523     xbt_free(requests);
524   }
525 }
526
527 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
528                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
529                        MPI_Datatype recvtype, int root, MPI_Comm comm)
530 {
531   int system_tag = 666;
532   int rank, size, dst, index, sendsize, recvsize;
533   MPI_Request *requests;
534
535   rank = smpi_comm_rank(comm);
536   size = smpi_comm_size(comm);
537   if (rank != root) {
538     // Recv buffer from root
539     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
540                   MPI_STATUS_IGNORE);
541   } else {
542     sendsize = smpi_datatype_size(sendtype);
543     recvsize = smpi_datatype_size(recvtype);
544     // Local copy from root
545     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
546            recvcount * recvsize * sizeof(char));
547     // Send buffers to receivers
548     requests = xbt_new(MPI_Request, size - 1);
549     index = 0;
550     for (dst = 0; dst < size; dst++) {
551       if (dst != root) {
552         requests[index] =
553             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
554                             sendcounts[dst], sendtype, dst, system_tag,
555                             comm);
556         index++;
557       }
558     }
559     // Wait for completion of isend's.
560     smpi_mpi_startall(size - 1, requests);
561     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
562     xbt_free(requests);
563   }
564 }
565
566 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
567                      MPI_Datatype datatype, MPI_Op op, int root,
568                      MPI_Comm comm)
569 {
570   int system_tag = 666;
571   int rank, size, src, index, datasize;
572   MPI_Request *requests;
573   void **tmpbufs;
574
575   rank = smpi_comm_rank(comm);
576   size = smpi_comm_size(comm);
577   if (rank != root) {
578     // Send buffer to root
579     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
580   } else {
581     datasize = smpi_datatype_size(datatype);
582     // Local copy from root
583     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
584     // Receive buffers from senders
585     //TODO: make a MPI_barrier here ?
586     requests = xbt_new(MPI_Request, size - 1);
587     tmpbufs = xbt_new(void *, size - 1);
588     index = 0;
589     for (src = 0; src < size; src++) {
590       if (src != root) {
591         tmpbufs[index] = xbt_malloc(count * datasize);
592         requests[index] =
593             smpi_irecv_init(tmpbufs[index], count, datatype, src,
594                             system_tag, comm);
595         index++;
596       }
597     }
598     // Wait for completion of irecv's.
599     smpi_mpi_startall(size - 1, requests);
600     for (src = 0; src < size - 1; src++) {
601       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
602       if (index == MPI_UNDEFINED) {
603         break;
604       }
605       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
606     }
607     for (index = 0; index < size - 1; index++) {
608       xbt_free(tmpbufs[index]);
609     }
610     xbt_free(tmpbufs);
611     xbt_free(requests);
612   }
613 }
614
615 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
616                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
617 {
618   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
619   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
620
621 /*
622 FIXME: buggy implementation
623
624   int system_tag = 666;
625   int rank, size, other, index, datasize;
626   MPI_Request* requests;
627   void** tmpbufs;
628
629   rank = smpi_comm_rank(comm);
630   size = smpi_comm_size(comm);
631   datasize = smpi_datatype_size(datatype);
632   // Local copy from self
633   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
634   // Send/Recv buffers to/from others;
635   //TODO: make a MPI_barrier here ?
636   requests = xbt_new(MPI_Request, 2 * (size - 1));
637   tmpbufs = xbt_new(void*, size - 1);
638   index = 0;
639   for(other = 0; other < size; other++) {
640     if(other != rank) {
641       tmpbufs[index / 2] = xbt_malloc(count * datasize);
642       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
643       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
644       index += 2;
645     }
646   }
647   // Wait for completion of all comms.
648   for(other = 0; other < 2 * (size - 1); other++) {
649     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
650     if(index == MPI_UNDEFINED) {
651       break;
652     }
653     if((index & 1) == 1) {
654       // Request is odd: it's a irecv
655       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
656     }
657   }
658   for(index = 0; index < size - 1; index++) {
659     xbt_free(tmpbufs[index]);
660   }
661   xbt_free(tmpbufs);
662   xbt_free(requests);
663 */
664 }
665
666 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
667                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
668 {
669   int system_tag = 666;
670   int rank, size, other, index, datasize;
671   int total;
672   MPI_Request *requests;
673   void **tmpbufs;
674
675   rank = smpi_comm_rank(comm);
676   size = smpi_comm_size(comm);
677   datasize = smpi_datatype_size(datatype);
678   // Local copy from self
679   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
680   // Send/Recv buffers to/from others;
681   total = rank + (size - (rank + 1));
682   requests = xbt_new(MPI_Request, total);
683   tmpbufs = xbt_new(void *, rank);
684   index = 0;
685   for (other = 0; other < rank; other++) {
686     tmpbufs[index] = xbt_malloc(count * datasize);
687     requests[index] =
688         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
689                         comm);
690     index++;
691   }
692   for (other = rank + 1; other < size; other++) {
693     requests[index] =
694         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
695     index++;
696   }
697   // Wait for completion of all comms.
698   smpi_mpi_startall(size - 1, requests);
699   for (other = 0; other < total; other++) {
700     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
701     if (index == MPI_UNDEFINED) {
702       break;
703     }
704     if (index < rank) {
705       // #Request is below rank: it's a irecv
706       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
707     }
708   }
709   for (index = 0; index < size - 1; index++) {
710     xbt_free(tmpbufs[index]);
711   }
712   xbt_free(tmpbufs);
713   xbt_free(requests);
714 }