Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[trace] setting the tracing category for exec and comm done by smpi
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 static MPI_Request build_request(void *buf, int count,
23                                  MPI_Datatype datatype, int src, int dst,
24                                  int tag, MPI_Comm comm, unsigned flags)
25 {
26   MPI_Request request;
27
28   request = xbt_new(s_smpi_mpi_request_t, 1);
29   request->buf = buf;
30   request->size = smpi_datatype_size(datatype) * count;
31   request->src = src;
32   request->dst = dst;
33   request->tag = tag;
34   request->comm = comm;
35   request->rdv = NULL;
36   request->pair = NULL;
37   request->complete = 0;
38   request->match = MPI_REQUEST_NULL;
39   request->flags = flags;
40 #ifdef HAVE_TRACING
41   request->send = 0;
42   request->recv = 0;
43 #endif
44   return request;
45 }
46
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49                                int dst, int tag, MPI_Comm comm)
50 {
51   MPI_Request request =
52       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53                     comm, PERSISTENT | SEND);
54
55   return request;
56 }
57
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59                                int src, int tag, MPI_Comm comm)
60 {
61   MPI_Request request =
62       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63                     comm, PERSISTENT | RECV);
64
65   return request;
66 }
67
68 void smpi_mpi_start(MPI_Request request)
69 {
70   xbt_assert0(request->complete == 0,
71               "Cannot start a non-finished communication");
72   if ((request->flags & RECV) == RECV) {
73     smpi_process_post_recv(request);
74     print_request("New recv", request);
75     request->pair =
76         SIMIX_req_comm_irecv(request->rdv, request->buf, &request->size);
77   } else {
78     smpi_process_post_send(request->comm, request);     // FIXME
79     print_request("New send", request);
80     request->pair =
81         SIMIX_req_comm_isend(request->rdv, request->size, -1.0,
82                             request->buf, request->size, NULL);
83 #ifdef HAVE_TRACING
84     SIMIX_req_set_category (request->pair, TRACE_internal_smpi_get_category());
85 #endif
86   }
87 }
88
89 void smpi_mpi_startall(int count, MPI_Request * requests)
90 {
91   int i;
92
93   for (i = 0; i < count; i++) {
94     smpi_mpi_start(requests[i]);
95   }
96 }
97
98 void smpi_mpi_request_free(MPI_Request * request)
99 {
100   xbt_free(*request);
101   *request = MPI_REQUEST_NULL;
102 }
103
104 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
105                             int dst, int tag, MPI_Comm comm)
106 {
107   MPI_Request request =
108       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
109                     comm, NON_PERSISTENT | SEND);
110
111   return request;
112 }
113
114 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
115                            int dst, int tag, MPI_Comm comm)
116 {
117   MPI_Request request =
118       smpi_isend_init(buf, count, datatype, dst, tag, comm);
119
120   smpi_mpi_start(request);
121   return request;
122 }
123
124 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
125                             int src, int tag, MPI_Comm comm)
126 {
127   MPI_Request request =
128       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
129                     comm, NON_PERSISTENT | RECV);
130
131   return request;
132 }
133
134 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
135                            int src, int tag, MPI_Comm comm)
136 {
137   MPI_Request request =
138       smpi_irecv_init(buf, count, datatype, src, tag, comm);
139
140   smpi_mpi_start(request);
141   return request;
142 }
143
144 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
145                    int tag, MPI_Comm comm, MPI_Status * status)
146 {
147   MPI_Request request;
148
149   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
150   smpi_mpi_wait(&request, status);
151 }
152
153 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
154                    int tag, MPI_Comm comm)
155 {
156   MPI_Request request;
157
158   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
159   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
160 }
161
162 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
163                        int dst, int sendtag, void *recvbuf, int recvcount,
164                        MPI_Datatype recvtype, int src, int recvtag,
165                        MPI_Comm comm, MPI_Status * status)
166 {
167   MPI_Request requests[2];
168   MPI_Status stats[2];
169
170   requests[0] =
171       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
172   requests[1] =
173       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
174   smpi_mpi_startall(2, requests);
175   smpi_mpi_waitall(2, requests, stats);
176   if (status != MPI_STATUS_IGNORE) {
177     // Copy receive status
178     memcpy(status, &stats[1], sizeof(MPI_Status));
179   }
180 }
181
182 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
183 {
184   return status->count / smpi_datatype_size(datatype);
185 }
186
187 static void finish_wait(MPI_Request * request, MPI_Status * status)
188 {
189   if (status != MPI_STATUS_IGNORE) {
190     status->MPI_SOURCE = (*request)->src;
191     status->MPI_TAG = (*request)->tag;
192     status->MPI_ERROR = MPI_SUCCESS;
193     status->count = SIMIX_req_comm_get_dst_buff_size((*request)->pair);
194   }
195   SIMIX_req_comm_destroy((*request)->pair);
196   print_request("finishing wait", *request);
197   if ((*request)->complete == 1) {
198     SIMIX_req_rdv_destroy((*request)->rdv);
199   } else {
200     (*request)->match->complete = 1;
201     (*request)->match->match = MPI_REQUEST_NULL;
202   }
203   if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
204     smpi_mpi_request_free(request);
205   } else {
206     (*request)->rdv = NULL;
207     (*request)->pair = NULL;
208   }
209 }
210
211 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
212 {
213   int flag = (*request)->complete;
214
215   if (flag) {
216     smpi_mpi_wait(request, status);
217   }
218   return flag;
219 }
220
221 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
222                      MPI_Status * status)
223 {
224   int i, flag;
225
226   *index = MPI_UNDEFINED;
227   flag = 0;
228   for (i = 0; i < count; i++) {
229     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
230       smpi_mpi_wait(&requests[i], status);
231       *index = i;
232       flag = 1;
233       break;
234     }
235   }
236   return flag;
237 }
238
239 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
240 {
241   print_request("wait", *request);
242   SIMIX_req_comm_wait((*request)->pair, -1.0);
243   finish_wait(request, status);
244 }
245
246 int smpi_mpi_waitany(int count, MPI_Request requests[],
247                      MPI_Status * status)
248 {
249   xbt_dynar_t comms;
250   int i, size, index;
251   int *map;
252
253   index = MPI_UNDEFINED;
254   if (count > 0) {
255     // First check for already completed requests
256     for (i = 0; i < count; i++) {
257       if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
258         index = i;
259         smpi_mpi_wait(&requests[index], status);
260         break;
261       }
262     }
263     if (index == MPI_UNDEFINED) {
264       // Otherwise, wait for a request to complete
265       comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
266       map = xbt_new(int, count);
267       size = 0;
268       DEBUG0("Wait for one of");
269       for (i = 0; i < count; i++) {
270         if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
271           print_request("   ", requests[i]);
272           xbt_dynar_push(comms, &requests[i]->pair);
273           map[size] = i;
274           size++;
275         }
276       }
277       if (size > 0) {
278         index = SIMIX_req_comm_waitany(comms);
279         index = map[index];
280         finish_wait(&requests[index], status);
281       }
282       xbt_free(map);
283       xbt_dynar_free(&comms);
284     }
285   }
286   return index;
287 }
288
289 void smpi_mpi_waitall(int count, MPI_Request requests[],
290                       MPI_Status status[])
291 {
292   int index, c;
293   MPI_Status stat;
294   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
295
296   c = count;
297   while (c > 0) {
298     index = smpi_mpi_waitany(count, requests, pstat);
299     if (index == MPI_UNDEFINED) {
300       break;
301     }
302     if (status != MPI_STATUS_IGNORE) {
303       memcpy(&status[index], pstat, sizeof *pstat);
304     }
305     c--;
306   }
307 }
308
309 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
310                       MPI_Status status[])
311 {
312   int i, count;
313
314   count = 0;
315   for (i = 0; i < incount; i++) {
316     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
317       smpi_mpi_wait(&requests[i],
318                     status !=
319                     MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
320       indices[count] = i;
321       count++;
322     }
323   }
324   return count;
325 }
326
327 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
328                     MPI_Comm comm)
329 {
330   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
331   nary_tree_bcast(buf, count, datatype, root, comm, 4);
332 }
333
334 void smpi_mpi_barrier(MPI_Comm comm)
335 {
336   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
337   nary_tree_barrier(comm, 4);
338 }
339
340 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
341                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
342                      int root, MPI_Comm comm)
343 {
344   int system_tag = 666;
345   int rank, size, src, index, sendsize, recvsize;
346   MPI_Request *requests;
347
348   rank = smpi_comm_rank(comm);
349   size = smpi_comm_size(comm);
350   if (rank != root) {
351     // Send buffer to root
352     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
353   } else {
354     sendsize = smpi_datatype_size(sendtype);
355     recvsize = smpi_datatype_size(recvtype);
356     // Local copy from root
357     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
358            sendcount * sendsize * sizeof(char));
359     // Receive buffers from senders
360     requests = xbt_new(MPI_Request, size - 1);
361     index = 0;
362     for (src = 0; src < size; src++) {
363       if (src != root) {
364         requests[index] = smpi_irecv_init(&((char *) recvbuf)
365                                           [src * recvcount * recvsize],
366                                           recvcount, recvtype, src,
367                                           system_tag, comm);
368         index++;
369       }
370     }
371     // Wait for completion of irecv's.
372     smpi_mpi_startall(size - 1, requests);
373     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
374     xbt_free(requests);
375   }
376 }
377
378 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
379                       void *recvbuf, int *recvcounts, int *displs,
380                       MPI_Datatype recvtype, int root, MPI_Comm comm)
381 {
382   int system_tag = 666;
383   int rank, size, src, index, sendsize;
384   MPI_Request *requests;
385
386   rank = smpi_comm_rank(comm);
387   size = smpi_comm_size(comm);
388   if (rank != root) {
389     // Send buffer to root
390     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
391   } else {
392     sendsize = smpi_datatype_size(sendtype);
393     // Local copy from root
394     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
395            sendcount * sendsize * sizeof(char));
396     // Receive buffers from senders
397     requests = xbt_new(MPI_Request, size - 1);
398     index = 0;
399     for (src = 0; src < size; src++) {
400       if (src != root) {
401         requests[index] =
402             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
403                             recvcounts[src], recvtype, src, system_tag,
404                             comm);
405         index++;
406       }
407     }
408     // Wait for completion of irecv's.
409     smpi_mpi_startall(size - 1, requests);
410     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
411     xbt_free(requests);
412   }
413 }
414
415 void smpi_mpi_allgather(void *sendbuf, int sendcount,
416                         MPI_Datatype sendtype, void *recvbuf,
417                         int recvcount, MPI_Datatype recvtype,
418                         MPI_Comm comm)
419 {
420   int system_tag = 666;
421   int rank, size, other, index, sendsize, recvsize;
422   MPI_Request *requests;
423
424   rank = smpi_comm_rank(comm);
425   size = smpi_comm_size(comm);
426   sendsize = smpi_datatype_size(sendtype);
427   recvsize = smpi_datatype_size(recvtype);
428   // Local copy from self
429   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
430          sendcount * sendsize * sizeof(char));
431   // Send/Recv buffers to/from others;
432   requests = xbt_new(MPI_Request, 2 * (size - 1));
433   index = 0;
434   for (other = 0; other < size; other++) {
435     if (other != rank) {
436       requests[index] =
437           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
438                           comm);
439       index++;
440       requests[index] = smpi_irecv_init(&((char *) recvbuf)
441                                         [other * recvcount * recvsize],
442                                         recvcount, recvtype, other,
443                                         system_tag, comm);
444       index++;
445     }
446   }
447   // Wait for completion of all comms.
448   smpi_mpi_startall(2 * (size - 1), requests);
449   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
450   xbt_free(requests);
451 }
452
453 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
454                          MPI_Datatype sendtype, void *recvbuf,
455                          int *recvcounts, int *displs,
456                          MPI_Datatype recvtype, MPI_Comm comm)
457 {
458   int system_tag = 666;
459   int rank, size, other, index, sendsize, recvsize;
460   MPI_Request *requests;
461
462   rank = smpi_comm_rank(comm);
463   size = smpi_comm_size(comm);
464   sendsize = smpi_datatype_size(sendtype);
465   recvsize = smpi_datatype_size(recvtype);
466   // Local copy from self
467   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
468          sendcount * sendsize * sizeof(char));
469   // Send buffers to others;
470   requests = xbt_new(MPI_Request, 2 * (size - 1));
471   index = 0;
472   for (other = 0; other < size; other++) {
473     if (other != rank) {
474       requests[index] =
475           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
476                           comm);
477       index++;
478       requests[index] =
479           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
480                           recvcounts[other], recvtype, other, system_tag,
481                           comm);
482       index++;
483     }
484   }
485   // Wait for completion of all comms.
486   smpi_mpi_startall(2 * (size - 1), requests);
487   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
488   xbt_free(requests);
489 }
490
491 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
492                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
493                       int root, MPI_Comm comm)
494 {
495   int system_tag = 666;
496   int rank, size, dst, index, sendsize, recvsize;
497   MPI_Request *requests;
498
499   rank = smpi_comm_rank(comm);
500   size = smpi_comm_size(comm);
501   if (rank != root) {
502     // Recv buffer from root
503     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
504                   MPI_STATUS_IGNORE);
505   } else {
506     sendsize = smpi_datatype_size(sendtype);
507     recvsize = smpi_datatype_size(recvtype);
508     // Local copy from root
509     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
510            recvcount * recvsize * sizeof(char));
511     // Send buffers to receivers
512     requests = xbt_new(MPI_Request, size - 1);
513     index = 0;
514     for (dst = 0; dst < size; dst++) {
515       if (dst != root) {
516         requests[index] = smpi_isend_init(&((char *) sendbuf)
517                                           [dst * sendcount * sendsize],
518                                           sendcount, sendtype, dst,
519                                           system_tag, comm);
520         index++;
521       }
522     }
523     // Wait for completion of isend's.
524     smpi_mpi_startall(size - 1, requests);
525     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
526     xbt_free(requests);
527   }
528 }
529
530 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
531                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
532                        MPI_Datatype recvtype, int root, MPI_Comm comm)
533 {
534   int system_tag = 666;
535   int rank, size, dst, index, sendsize, recvsize;
536   MPI_Request *requests;
537
538   rank = smpi_comm_rank(comm);
539   size = smpi_comm_size(comm);
540   if (rank != root) {
541     // Recv buffer from root
542     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
543                   MPI_STATUS_IGNORE);
544   } else {
545     sendsize = smpi_datatype_size(sendtype);
546     recvsize = smpi_datatype_size(recvtype);
547     // Local copy from root
548     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
549            recvcount * recvsize * sizeof(char));
550     // Send buffers to receivers
551     requests = xbt_new(MPI_Request, size - 1);
552     index = 0;
553     for (dst = 0; dst < size; dst++) {
554       if (dst != root) {
555         requests[index] =
556             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
557                             sendcounts[dst], sendtype, dst, system_tag,
558                             comm);
559         index++;
560       }
561     }
562     // Wait for completion of isend's.
563     smpi_mpi_startall(size - 1, requests);
564     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
565     xbt_free(requests);
566   }
567 }
568
569 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
570                      MPI_Datatype datatype, MPI_Op op, int root,
571                      MPI_Comm comm)
572 {
573   int system_tag = 666;
574   int rank, size, src, index, datasize;
575   MPI_Request *requests;
576   void **tmpbufs;
577
578   rank = smpi_comm_rank(comm);
579   size = smpi_comm_size(comm);
580   if (rank != root) {
581     // Send buffer to root
582     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
583   } else {
584     datasize = smpi_datatype_size(datatype);
585     // Local copy from root
586     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
587     // Receive buffers from senders
588     //TODO: make a MPI_barrier here ?
589     requests = xbt_new(MPI_Request, size - 1);
590     tmpbufs = xbt_new(void *, size - 1);
591     index = 0;
592     for (src = 0; src < size; src++) {
593       if (src != root) {
594         tmpbufs[index] = xbt_malloc(count * datasize);
595         requests[index] =
596             smpi_irecv_init(tmpbufs[index], count, datatype, src,
597                             system_tag, comm);
598         index++;
599       }
600     }
601     // Wait for completion of irecv's.
602     smpi_mpi_startall(size - 1, requests);
603     for (src = 0; src < size - 1; src++) {
604       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
605       if (index == MPI_UNDEFINED) {
606         break;
607       }
608       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
609     }
610     for (index = 0; index < size - 1; index++) {
611       xbt_free(tmpbufs[index]);
612     }
613     xbt_free(tmpbufs);
614     xbt_free(requests);
615   }
616 }
617
618 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
619                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
620 {
621   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
622   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
623
624 /*
625 FIXME: buggy implementation
626
627   int system_tag = 666;
628   int rank, size, other, index, datasize;
629   MPI_Request* requests;
630   void** tmpbufs;
631
632   rank = smpi_comm_rank(comm);
633   size = smpi_comm_size(comm);
634   datasize = smpi_datatype_size(datatype);
635   // Local copy from self
636   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
637   // Send/Recv buffers to/from others;
638   //TODO: make a MPI_barrier here ?
639   requests = xbt_new(MPI_Request, 2 * (size - 1));
640   tmpbufs = xbt_new(void*, size - 1);
641   index = 0;
642   for(other = 0; other < size; other++) {
643     if(other != rank) {
644       tmpbufs[index / 2] = xbt_malloc(count * datasize);
645       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
646       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
647       index += 2;
648     }
649   }
650   // Wait for completion of all comms.
651   for(other = 0; other < 2 * (size - 1); other++) {
652     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
653     if(index == MPI_UNDEFINED) {
654       break;
655     }
656     if((index & 1) == 1) {
657       // Request is odd: it's a irecv
658       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
659     }
660   }
661   for(index = 0; index < size - 1; index++) {
662     xbt_free(tmpbufs[index]);
663   }
664   xbt_free(tmpbufs);
665   xbt_free(requests);
666 */
667 }
668
669 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
670                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
671 {
672   int system_tag = 666;
673   int rank, size, other, index, datasize;
674   int total;
675   MPI_Request *requests;
676   void **tmpbufs;
677
678   rank = smpi_comm_rank(comm);
679   size = smpi_comm_size(comm);
680   datasize = smpi_datatype_size(datatype);
681   // Local copy from self
682   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
683   // Send/Recv buffers to/from others;
684   total = rank + (size - (rank + 1));
685   requests = xbt_new(MPI_Request, total);
686   tmpbufs = xbt_new(void *, rank);
687   index = 0;
688   for (other = 0; other < rank; other++) {
689     tmpbufs[index] = xbt_malloc(count * datasize);
690     requests[index] =
691         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
692                         comm);
693     index++;
694   }
695   for (other = rank + 1; other < size; other++) {
696     requests[index] =
697         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
698     index++;
699   }
700   // Wait for completion of all comms.
701   smpi_mpi_startall(size - 1, requests);
702   for (other = 0; other < total; other++) {
703     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
704     if (index == MPI_UNDEFINED) {
705       break;
706     }
707     if (index < rank) {
708       // #Request is below rank: it's a irecv
709       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
710     }
711   }
712   for (index = 0; index < size - 1; index++) {
713     xbt_free(tmpbufs[index]);
714   }
715   xbt_free(tmpbufs);
716   xbt_free(requests);
717 }