Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Support for Fortran code in SMPI based on f2c, some perl and some dirty hacks.
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 static MPI_Request build_request(void *buf, int count,
23                                  MPI_Datatype datatype, int src, int dst,
24                                  int tag, MPI_Comm comm, unsigned flags)
25 {
26   MPI_Request request;
27
28   request = xbt_new(s_smpi_mpi_request_t, 1);
29   request->buf = buf;
30   request->size = smpi_datatype_size(datatype) * count;
31   request->src = src;
32   request->dst = dst;
33   request->tag = tag;
34   request->comm = comm;
35   request->rdv = NULL;
36   request->pair = NULL;
37   request->complete = 0;
38   request->match = MPI_REQUEST_NULL;
39   request->flags = flags;
40 #ifdef HAVE_TRACING
41   request->send = 0;
42   request->recv = 0;
43 #endif
44   return request;
45 }
46
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49                                int dst, int tag, MPI_Comm comm)
50 {
51   MPI_Request request =
52       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53                     comm, PERSISTENT | SEND);
54
55   return request;
56 }
57
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59                                int src, int tag, MPI_Comm comm)
60 {
61   MPI_Request request =
62       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63                     comm, PERSISTENT | RECV);
64
65   return request;
66 }
67
68 void smpi_mpi_start(MPI_Request request)
69 {
70   xbt_assert0(request->complete == 0,
71               "Cannot start a non-finished communication");
72   if ((request->flags & RECV) == RECV) {
73     smpi_process_post_recv(request);
74     print_request("New recv", request);
75     request->pair =
76         SIMIX_network_irecv(request->rdv, request->buf, &request->size);
77   } else {
78     smpi_process_post_send(request->comm, request);     // FIXME
79     print_request("New send", request);
80     request->pair =
81         SIMIX_network_isend(request->rdv, request->size, -1.0,
82                             request->buf, request->size, NULL);
83   }
84 }
85
86 void smpi_mpi_startall(int count, MPI_Request * requests)
87 {
88   int i;
89
90   for (i = 0; i < count; i++) {
91     smpi_mpi_start(requests[i]);
92   }
93 }
94
95 void smpi_mpi_request_free(MPI_Request * request)
96 {
97   xbt_free(*request);
98   *request = MPI_REQUEST_NULL;
99 }
100
101 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
102                             int dst, int tag, MPI_Comm comm)
103 {
104   MPI_Request request =
105       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
106                     comm, NON_PERSISTENT | SEND);
107
108   return request;
109 }
110
111 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
112                            int dst, int tag, MPI_Comm comm)
113 {
114   MPI_Request request =
115       smpi_isend_init(buf, count, datatype, dst, tag, comm);
116
117   smpi_mpi_start(request);
118   return request;
119 }
120
121 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
122                             int src, int tag, MPI_Comm comm)
123 {
124   MPI_Request request =
125       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
126                     comm, NON_PERSISTENT | RECV);
127
128   return request;
129 }
130
131 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
132                            int src, int tag, MPI_Comm comm)
133 {
134   MPI_Request request =
135       smpi_irecv_init(buf, count, datatype, src, tag, comm);
136
137   smpi_mpi_start(request);
138   return request;
139 }
140
141 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
142                    int tag, MPI_Comm comm, MPI_Status * status)
143 {
144   MPI_Request request;
145
146   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
147   smpi_mpi_wait(&request, status);
148 }
149
150 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
151                    int tag, MPI_Comm comm)
152 {
153   MPI_Request request;
154
155   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
156   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
157 }
158
159 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
160                        int dst, int sendtag, void *recvbuf, int recvcount,
161                        MPI_Datatype recvtype, int src, int recvtag,
162                        MPI_Comm comm, MPI_Status * status)
163 {
164   MPI_Request requests[2];
165   MPI_Status stats[2];
166
167   requests[0] =
168       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
169   requests[1] =
170       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
171   smpi_mpi_startall(2, requests);
172   smpi_mpi_waitall(2, requests, stats);
173   if (status != MPI_STATUS_IGNORE) {
174     // Copy receive status
175     memcpy(status, &stats[1], sizeof(MPI_Status));
176   }
177 }
178
179 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
180 {
181   return status->count / smpi_datatype_size(datatype);
182 }
183
184 static void finish_wait(MPI_Request * request, MPI_Status * status)
185 {
186   if (status != MPI_STATUS_IGNORE) {
187     status->MPI_SOURCE = (*request)->src;
188     status->MPI_TAG = (*request)->tag;
189     status->MPI_ERROR = MPI_SUCCESS;
190     status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
191   }
192   SIMIX_communication_destroy((*request)->pair);
193   print_request("finishing wait", *request);
194   if ((*request)->complete == 1) {
195     SIMIX_rdv_destroy((*request)->rdv);
196   } else {
197     (*request)->match->complete = 1;
198     (*request)->match->match = MPI_REQUEST_NULL;
199   }
200   if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
201     smpi_mpi_request_free(request);
202   } else {
203     (*request)->rdv = NULL;
204     (*request)->pair = NULL;
205   }
206 }
207
208 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
209 {
210   int flag = (*request)->complete;
211
212   if (flag) {
213     smpi_mpi_wait(request, status);
214   }
215   return flag;
216 }
217
218 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
219                      MPI_Status * status)
220 {
221   int i, flag;
222
223   *index = MPI_UNDEFINED;
224   flag = 0;
225   for (i = 0; i < count; i++) {
226     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
227       smpi_mpi_wait(&requests[i], status);
228       *index = i;
229       flag = 1;
230       break;
231     }
232   }
233   return flag;
234 }
235
236 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
237 {
238   print_request("wait", *request);
239   SIMIX_network_wait((*request)->pair, -1.0);
240   finish_wait(request, status);
241 }
242
243 int smpi_mpi_waitany(int count, MPI_Request requests[],
244                      MPI_Status * status)
245 {
246   xbt_dynar_t comms;
247   int i, size, index;
248   int *map;
249
250   index = MPI_UNDEFINED;
251   if (count > 0) {
252     // First check for already completed requests
253     for (i = 0; i < count; i++) {
254       if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
255         index = i;
256         smpi_mpi_wait(&requests[index], status);
257         break;
258       }
259     }
260     if (index == MPI_UNDEFINED) {
261       // Otherwise, wait for a request to complete
262       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
263       map = xbt_new(int, count);
264       size = 0;
265       DEBUG0("Wait for one of");
266       for (i = 0; i < count; i++) {
267         if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
268           print_request("   ", requests[i]);
269           xbt_dynar_push(comms, &requests[i]->pair);
270           map[size] = i;
271           size++;
272         }
273       }
274       if (size > 0) {
275         index = SIMIX_network_waitany(comms);
276         index = map[index];
277         finish_wait(&requests[index], status);
278       }
279       xbt_free(map);
280       xbt_dynar_free(&comms);
281     }
282   }
283   return index;
284 }
285
286 void smpi_mpi_waitall(int count, MPI_Request requests[],
287                       MPI_Status status[])
288 {
289   int index;
290   MPI_Status stat;
291
292   while (count > 0) {
293     index = smpi_mpi_waitany(count, requests, &stat);
294     if (index == MPI_UNDEFINED) {
295       break;
296     }
297     if (status != MPI_STATUS_IGNORE) {
298       memcpy(&status[index], &stat, sizeof(stat));
299     }
300     // FIXME: check this -v
301     // Move the last request to the found position
302     requests[index] = requests[count - 1];
303     requests[count - 1] = MPI_REQUEST_NULL;
304     count--;
305   }
306 }
307
308 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
309                       MPI_Status status[])
310 {
311   int i, count;
312
313   count = 0;
314   for (i = 0; i < incount; i++) {
315     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
316       smpi_mpi_wait(&requests[i],
317                     status !=
318                     MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
319       indices[count] = i;
320       count++;
321     }
322   }
323   return count;
324 }
325
326 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
327                     MPI_Comm comm)
328 {
329   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
330   nary_tree_bcast(buf, count, datatype, root, comm, 4);
331 }
332
333 void smpi_mpi_barrier(MPI_Comm comm)
334 {
335   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
336   nary_tree_barrier(comm, 4);
337 }
338
339 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
340                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
341                      int root, MPI_Comm comm)
342 {
343   int system_tag = 666;
344   int rank, size, src, index, sendsize, recvsize;
345   MPI_Request *requests;
346
347   rank = smpi_comm_rank(comm);
348   size = smpi_comm_size(comm);
349   if (rank != root) {
350     // Send buffer to root
351     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
352   } else {
353     sendsize = smpi_datatype_size(sendtype);
354     recvsize = smpi_datatype_size(recvtype);
355     // Local copy from root
356     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
357            sendcount * sendsize * sizeof(char));
358     // Receive buffers from senders
359     requests = xbt_new(MPI_Request, size - 1);
360     index = 0;
361     for (src = 0; src < size; src++) {
362       if (src != root) {
363         requests[index] = smpi_irecv_init(&((char *) recvbuf)
364                                           [src * recvcount * recvsize],
365                                           recvcount, recvtype, src,
366                                           system_tag, comm);
367         index++;
368       }
369     }
370     // Wait for completion of irecv's.
371     smpi_mpi_startall(size - 1, requests);
372     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
373     xbt_free(requests);
374   }
375 }
376
377 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
378                       void *recvbuf, int *recvcounts, int *displs,
379                       MPI_Datatype recvtype, int root, MPI_Comm comm)
380 {
381   int system_tag = 666;
382   int rank, size, src, index, sendsize;
383   MPI_Request *requests;
384
385   rank = smpi_comm_rank(comm);
386   size = smpi_comm_size(comm);
387   if (rank != root) {
388     // Send buffer to root
389     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
390   } else {
391     sendsize = smpi_datatype_size(sendtype);
392     // Local copy from root
393     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
394            sendcount * sendsize * sizeof(char));
395     // Receive buffers from senders
396     requests = xbt_new(MPI_Request, size - 1);
397     index = 0;
398     for (src = 0; src < size; src++) {
399       if (src != root) {
400         requests[index] =
401             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
402                             recvcounts[src], recvtype, src, system_tag,
403                             comm);
404         index++;
405       }
406     }
407     // Wait for completion of irecv's.
408     smpi_mpi_startall(size - 1, requests);
409     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
410     xbt_free(requests);
411   }
412 }
413
414 void smpi_mpi_allgather(void *sendbuf, int sendcount,
415                         MPI_Datatype sendtype, void *recvbuf,
416                         int recvcount, MPI_Datatype recvtype,
417                         MPI_Comm comm)
418 {
419   int system_tag = 666;
420   int rank, size, other, index, sendsize, recvsize;
421   MPI_Request *requests;
422
423   rank = smpi_comm_rank(comm);
424   size = smpi_comm_size(comm);
425   sendsize = smpi_datatype_size(sendtype);
426   recvsize = smpi_datatype_size(recvtype);
427   // Local copy from self
428   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
429          sendcount * sendsize * sizeof(char));
430   // Send/Recv buffers to/from others;
431   requests = xbt_new(MPI_Request, 2 * (size - 1));
432   index = 0;
433   for (other = 0; other < size; other++) {
434     if (other != rank) {
435       requests[index] =
436           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
437                           comm);
438       index++;
439       requests[index] = smpi_irecv_init(&((char *) recvbuf)
440                                         [other * recvcount * recvsize],
441                                         recvcount, recvtype, other,
442                                         system_tag, comm);
443       index++;
444     }
445   }
446   // Wait for completion of all comms.
447   smpi_mpi_startall(2 * (size - 1), requests);
448   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
449   xbt_free(requests);
450 }
451
452 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
453                          MPI_Datatype sendtype, void *recvbuf,
454                          int *recvcounts, int *displs,
455                          MPI_Datatype recvtype, MPI_Comm comm)
456 {
457   int system_tag = 666;
458   int rank, size, other, index, sendsize, recvsize;
459   MPI_Request *requests;
460
461   rank = smpi_comm_rank(comm);
462   size = smpi_comm_size(comm);
463   sendsize = smpi_datatype_size(sendtype);
464   recvsize = smpi_datatype_size(recvtype);
465   // Local copy from self
466   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
467          sendcount * sendsize * sizeof(char));
468   // Send buffers to others;
469   requests = xbt_new(MPI_Request, 2 * (size - 1));
470   index = 0;
471   for (other = 0; other < size; other++) {
472     if (other != rank) {
473       requests[index] =
474           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
475                           comm);
476       index++;
477       requests[index] =
478           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
479                           recvcounts[other], recvtype, other, system_tag,
480                           comm);
481       index++;
482     }
483   }
484   // Wait for completion of all comms.
485   smpi_mpi_startall(2 * (size - 1), requests);
486   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
487   xbt_free(requests);
488 }
489
490 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
491                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
492                       int root, MPI_Comm comm)
493 {
494   int system_tag = 666;
495   int rank, size, dst, index, sendsize, recvsize;
496   MPI_Request *requests;
497
498   rank = smpi_comm_rank(comm);
499   size = smpi_comm_size(comm);
500   if (rank != root) {
501     // Recv buffer from root
502     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
503                   MPI_STATUS_IGNORE);
504   } else {
505     sendsize = smpi_datatype_size(sendtype);
506     recvsize = smpi_datatype_size(recvtype);
507     // Local copy from root
508     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
509            recvcount * recvsize * sizeof(char));
510     // Send buffers to receivers
511     requests = xbt_new(MPI_Request, size - 1);
512     index = 0;
513     for (dst = 0; dst < size; dst++) {
514       if (dst != root) {
515         requests[index] = smpi_isend_init(&((char *) sendbuf)
516                                           [dst * sendcount * sendsize],
517                                           sendcount, sendtype, dst,
518                                           system_tag, comm);
519         index++;
520       }
521     }
522     // Wait for completion of isend's.
523     smpi_mpi_startall(size - 1, requests);
524     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
525     xbt_free(requests);
526   }
527 }
528
529 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
530                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
531                        MPI_Datatype recvtype, int root, MPI_Comm comm)
532 {
533   int system_tag = 666;
534   int rank, size, dst, index, sendsize, recvsize;
535   MPI_Request *requests;
536
537   rank = smpi_comm_rank(comm);
538   size = smpi_comm_size(comm);
539   if (rank != root) {
540     // Recv buffer from root
541     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
542                   MPI_STATUS_IGNORE);
543   } else {
544     sendsize = smpi_datatype_size(sendtype);
545     recvsize = smpi_datatype_size(recvtype);
546     // Local copy from root
547     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
548            recvcount * recvsize * sizeof(char));
549     // Send buffers to receivers
550     requests = xbt_new(MPI_Request, size - 1);
551     index = 0;
552     for (dst = 0; dst < size; dst++) {
553       if (dst != root) {
554         requests[index] =
555             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
556                             sendcounts[dst], sendtype, dst, system_tag,
557                             comm);
558         index++;
559       }
560     }
561     // Wait for completion of isend's.
562     smpi_mpi_startall(size - 1, requests);
563     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
564     xbt_free(requests);
565   }
566 }
567
568 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
569                      MPI_Datatype datatype, MPI_Op op, int root,
570                      MPI_Comm comm)
571 {
572   int system_tag = 666;
573   int rank, size, src, index, datasize;
574   MPI_Request *requests;
575   void **tmpbufs;
576
577   rank = smpi_comm_rank(comm);
578   size = smpi_comm_size(comm);
579   if (rank != root) {
580     // Send buffer to root
581     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
582   } else {
583     datasize = smpi_datatype_size(datatype);
584     // Local copy from root
585     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
586     // Receive buffers from senders
587     //TODO: make a MPI_barrier here ?
588     requests = xbt_new(MPI_Request, size - 1);
589     tmpbufs = xbt_new(void *, size - 1);
590     index = 0;
591     for (src = 0; src < size; src++) {
592       if (src != root) {
593         tmpbufs[index] = xbt_malloc(count * datasize);
594         requests[index] =
595             smpi_irecv_init(tmpbufs[index], count, datatype, src,
596                             system_tag, comm);
597         index++;
598       }
599     }
600     // Wait for completion of irecv's.
601     smpi_mpi_startall(size - 1, requests);
602     for (src = 0; src < size - 1; src++) {
603       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
604       if (index == MPI_UNDEFINED) {
605         break;
606       }
607       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
608     }
609     for (index = 0; index < size - 1; index++) {
610       xbt_free(tmpbufs[index]);
611     }
612     xbt_free(tmpbufs);
613     xbt_free(requests);
614   }
615 }
616
617 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
618                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
619 {
620   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
621   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
622
623 /*
624 FIXME: buggy implementation
625
626   int system_tag = 666;
627   int rank, size, other, index, datasize;
628   MPI_Request* requests;
629   void** tmpbufs;
630
631   rank = smpi_comm_rank(comm);
632   size = smpi_comm_size(comm);
633   datasize = smpi_datatype_size(datatype);
634   // Local copy from self
635   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
636   // Send/Recv buffers to/from others;
637   //TODO: make a MPI_barrier here ?
638   requests = xbt_new(MPI_Request, 2 * (size - 1));
639   tmpbufs = xbt_new(void*, size - 1);
640   index = 0;
641   for(other = 0; other < size; other++) {
642     if(other != rank) {
643       tmpbufs[index / 2] = xbt_malloc(count * datasize);
644       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
645       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
646       index += 2;
647     }
648   }
649   // Wait for completion of all comms.
650   for(other = 0; other < 2 * (size - 1); other++) {
651     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
652     if(index == MPI_UNDEFINED) {
653       break;
654     }
655     if((index & 1) == 1) {
656       // Request is odd: it's a irecv
657       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
658     }
659   }
660   for(index = 0; index < size - 1; index++) {
661     xbt_free(tmpbufs[index]);
662   }
663   xbt_free(tmpbufs);
664   xbt_free(requests);
665 */
666 }
667
668 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
669                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
670 {
671   int system_tag = 666;
672   int rank, size, other, index, datasize;
673   int total;
674   MPI_Request *requests;
675   void **tmpbufs;
676
677   rank = smpi_comm_rank(comm);
678   size = smpi_comm_size(comm);
679   datasize = smpi_datatype_size(datatype);
680   // Local copy from self
681   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
682   // Send/Recv buffers to/from others;
683   total = rank + (size - (rank + 1));
684   requests = xbt_new(MPI_Request, total);
685   tmpbufs = xbt_new(void *, rank);
686   index = 0;
687   for (other = 0; other < rank; other++) {
688     tmpbufs[index] = xbt_malloc(count * datasize);
689     requests[index] =
690         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
691                         comm);
692     index++;
693   }
694   for (other = rank + 1; other < size; other++) {
695     requests[index] =
696         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
697     index++;
698   }
699   // Wait for completion of all comms.
700   smpi_mpi_startall(size - 1, requests);
701   for (other = 0; other < total; other++) {
702     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
703     if (index == MPI_UNDEFINED) {
704       break;
705     }
706     if (index < rank) {
707       // #Request is below rank: it's a irecv
708       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
709     }
710   }
711   for (index = 0; index < size - 1; index++) {
712     xbt_free(tmpbufs[index]);
713   }
714   xbt_free(tmpbufs);
715   xbt_free(requests);
716 }