Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
3.5 figlet
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 static MPI_Request build_request(void *buf, int count,
23                                  MPI_Datatype datatype, int src, int dst,
24                                  int tag, MPI_Comm comm, unsigned flags)
25 {
26   MPI_Request request;
27
28   request = xbt_new(s_smpi_mpi_request_t, 1);
29   request->buf = buf;
30   request->size = smpi_datatype_size(datatype) * count;
31   request->src = src;
32   request->dst = dst;
33   request->tag = tag;
34   request->comm = comm;
35   request->rdv = NULL;
36   request->pair = NULL;
37   request->complete = 0;
38   request->match = MPI_REQUEST_NULL;
39   request->flags = flags;
40 #ifdef HAVE_TRACING
41   request->send = 0;
42   request->recv = 0;
43 #endif
44   return request;
45 }
46
47 /* MPI Low level calls */
48 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
49                                int dst, int tag, MPI_Comm comm)
50 {
51   MPI_Request request =
52       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
53                     comm, PERSISTENT | SEND);
54
55   return request;
56 }
57
58 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
59                                int src, int tag, MPI_Comm comm)
60 {
61   MPI_Request request =
62       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
63                     comm, PERSISTENT | RECV);
64
65   return request;
66 }
67
68 void smpi_mpi_start(MPI_Request request)
69 {
70   xbt_assert0(request->complete == 0,
71               "Cannot start a non-finished communication");
72   if ((request->flags & RECV) == RECV) {
73     smpi_process_post_recv(request);
74     print_request("New recv", request);
75     request->pair =
76         SIMIX_network_irecv(request->rdv, request->buf, &request->size);
77   } else {
78     smpi_process_post_send(request->comm, request);     // FIXME
79     print_request("New send", request);
80     request->pair =
81         SIMIX_network_isend(request->rdv, request->size, -1.0,
82                             request->buf, request->size, NULL);
83   }
84 }
85
86 void smpi_mpi_startall(int count, MPI_Request * requests)
87 {
88   int i;
89
90   for (i = 0; i < count; i++) {
91     smpi_mpi_start(requests[i]);
92   }
93 }
94
95 void smpi_mpi_request_free(MPI_Request * request)
96 {
97   xbt_free(*request);
98   *request = MPI_REQUEST_NULL;
99 }
100
101 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
102                             int dst, int tag, MPI_Comm comm)
103 {
104   MPI_Request request =
105       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
106                     comm, NON_PERSISTENT | SEND);
107
108   return request;
109 }
110
111 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
112                            int dst, int tag, MPI_Comm comm)
113 {
114   MPI_Request request =
115       smpi_isend_init(buf, count, datatype, dst, tag, comm);
116
117   smpi_mpi_start(request);
118   return request;
119 }
120
121 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
122                             int src, int tag, MPI_Comm comm)
123 {
124   MPI_Request request =
125       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
126                     comm, NON_PERSISTENT | RECV);
127
128   return request;
129 }
130
131 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
132                            int src, int tag, MPI_Comm comm)
133 {
134   MPI_Request request =
135       smpi_irecv_init(buf, count, datatype, src, tag, comm);
136
137   smpi_mpi_start(request);
138   return request;
139 }
140
141 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
142                    int tag, MPI_Comm comm, MPI_Status * status)
143 {
144   MPI_Request request;
145
146   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
147   smpi_mpi_wait(&request, status);
148 }
149
150 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
151                    int tag, MPI_Comm comm)
152 {
153   MPI_Request request;
154
155   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
156   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
157 }
158
159 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
160                        int dst, int sendtag, void *recvbuf, int recvcount,
161                        MPI_Datatype recvtype, int src, int recvtag,
162                        MPI_Comm comm, MPI_Status * status)
163 {
164   MPI_Request requests[2];
165   MPI_Status stats[2];
166
167   requests[0] =
168       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
169   requests[1] =
170       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
171   smpi_mpi_startall(2, requests);
172   smpi_mpi_waitall(2, requests, stats);
173   if (status != MPI_STATUS_IGNORE) {
174     // Copy receive status
175     memcpy(status, &stats[1], sizeof(MPI_Status));
176   }
177 }
178
179 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
180 {
181   return status->count / smpi_datatype_size(datatype);
182 }
183
184 static void finish_wait(MPI_Request * request, MPI_Status * status)
185 {
186   if (status != MPI_STATUS_IGNORE) {
187     status->MPI_SOURCE = (*request)->src;
188     status->MPI_TAG = (*request)->tag;
189     status->MPI_ERROR = MPI_SUCCESS;
190     status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
191   }
192   SIMIX_communication_destroy((*request)->pair);
193   print_request("finishing wait", *request);
194   if ((*request)->complete == 1) {
195     SIMIX_rdv_destroy((*request)->rdv);
196   } else {
197     (*request)->match->complete = 1;
198     (*request)->match->match = MPI_REQUEST_NULL;
199   }
200   if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
201     smpi_mpi_request_free(request);
202   } else {
203     (*request)->rdv = NULL;
204     (*request)->pair = NULL;
205   }
206 }
207
208 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
209 {
210   int flag = (*request)->complete;
211
212   if (flag) {
213     smpi_mpi_wait(request, status);
214   }
215   return flag;
216 }
217
218 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
219                      MPI_Status * status)
220 {
221   int i, flag;
222
223   *index = MPI_UNDEFINED;
224   flag = 0;
225   for (i = 0; i < count; i++) {
226     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
227       smpi_mpi_wait(&requests[i], status);
228       *index = i;
229       flag = 1;
230       break;
231     }
232   }
233   return flag;
234 }
235
236 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
237 {
238   print_request("wait", *request);
239   SIMIX_network_wait((*request)->pair, -1.0);
240   finish_wait(request, status);
241 }
242
243 int smpi_mpi_waitany(int count, MPI_Request requests[],
244                      MPI_Status * status)
245 {
246   xbt_dynar_t comms;
247   int i, size, index;
248   int *map;
249
250   index = MPI_UNDEFINED;
251   if (count > 0) {
252     // First check for already completed requests
253     for (i = 0; i < count; i++) {
254       if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
255         index = i;
256         smpi_mpi_wait(&requests[index], status);
257         break;
258       }
259     }
260     if (index == MPI_UNDEFINED) {
261       // Otherwise, wait for a request to complete
262       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
263       map = xbt_new(int, count);
264       size = 0;
265       DEBUG0("Wait for one of");
266       for (i = 0; i < count; i++) {
267         if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
268           print_request("   ", requests[i]);
269           xbt_dynar_push(comms, &requests[i]->pair);
270           map[size] = i;
271           size++;
272         }
273       }
274       if (size > 0) {
275         index = SIMIX_network_waitany(comms);
276         index = map[index];
277         finish_wait(&requests[index], status);
278       }
279       xbt_free(map);
280       xbt_dynar_free(&comms);
281     }
282   }
283   return index;
284 }
285
286 void smpi_mpi_waitall(int count, MPI_Request requests[],
287                       MPI_Status status[])
288 {
289   int index, c;
290   MPI_Status stat;
291
292   c = count;
293   while (c > 0) {
294     index = smpi_mpi_waitany(count, requests, &stat);
295     if (index == MPI_UNDEFINED) {
296       break;
297     }
298     if (status != MPI_STATUS_IGNORE) {
299       memcpy(&status[index], &stat, sizeof(stat));
300     }
301     c--;
302   }
303 }
304
305 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
306                       MPI_Status status[])
307 {
308   int i, count;
309
310   count = 0;
311   for (i = 0; i < incount; i++) {
312     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
313       smpi_mpi_wait(&requests[i],
314                     status !=
315                     MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
316       indices[count] = i;
317       count++;
318     }
319   }
320   return count;
321 }
322
323 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
324                     MPI_Comm comm)
325 {
326   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
327   nary_tree_bcast(buf, count, datatype, root, comm, 4);
328 }
329
330 void smpi_mpi_barrier(MPI_Comm comm)
331 {
332   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
333   nary_tree_barrier(comm, 4);
334 }
335
336 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
337                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
338                      int root, MPI_Comm comm)
339 {
340   int system_tag = 666;
341   int rank, size, src, index, sendsize, recvsize;
342   MPI_Request *requests;
343
344   rank = smpi_comm_rank(comm);
345   size = smpi_comm_size(comm);
346   if (rank != root) {
347     // Send buffer to root
348     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
349   } else {
350     sendsize = smpi_datatype_size(sendtype);
351     recvsize = smpi_datatype_size(recvtype);
352     // Local copy from root
353     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
354            sendcount * sendsize * sizeof(char));
355     // Receive buffers from senders
356     requests = xbt_new(MPI_Request, size - 1);
357     index = 0;
358     for (src = 0; src < size; src++) {
359       if (src != root) {
360         requests[index] = smpi_irecv_init(&((char *) recvbuf)
361                                           [src * recvcount * recvsize],
362                                           recvcount, recvtype, src,
363                                           system_tag, comm);
364         index++;
365       }
366     }
367     // Wait for completion of irecv's.
368     smpi_mpi_startall(size - 1, requests);
369     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
370     xbt_free(requests);
371   }
372 }
373
374 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
375                       void *recvbuf, int *recvcounts, int *displs,
376                       MPI_Datatype recvtype, int root, MPI_Comm comm)
377 {
378   int system_tag = 666;
379   int rank, size, src, index, sendsize;
380   MPI_Request *requests;
381
382   rank = smpi_comm_rank(comm);
383   size = smpi_comm_size(comm);
384   if (rank != root) {
385     // Send buffer to root
386     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
387   } else {
388     sendsize = smpi_datatype_size(sendtype);
389     // Local copy from root
390     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
391            sendcount * sendsize * sizeof(char));
392     // Receive buffers from senders
393     requests = xbt_new(MPI_Request, size - 1);
394     index = 0;
395     for (src = 0; src < size; src++) {
396       if (src != root) {
397         requests[index] =
398             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
399                             recvcounts[src], recvtype, src, system_tag,
400                             comm);
401         index++;
402       }
403     }
404     // Wait for completion of irecv's.
405     smpi_mpi_startall(size - 1, requests);
406     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
407     xbt_free(requests);
408   }
409 }
410
411 void smpi_mpi_allgather(void *sendbuf, int sendcount,
412                         MPI_Datatype sendtype, void *recvbuf,
413                         int recvcount, MPI_Datatype recvtype,
414                         MPI_Comm comm)
415 {
416   int system_tag = 666;
417   int rank, size, other, index, sendsize, recvsize;
418   MPI_Request *requests;
419
420   rank = smpi_comm_rank(comm);
421   size = smpi_comm_size(comm);
422   sendsize = smpi_datatype_size(sendtype);
423   recvsize = smpi_datatype_size(recvtype);
424   // Local copy from self
425   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
426          sendcount * sendsize * sizeof(char));
427   // Send/Recv buffers to/from others;
428   requests = xbt_new(MPI_Request, 2 * (size - 1));
429   index = 0;
430   for (other = 0; other < size; other++) {
431     if (other != rank) {
432       requests[index] =
433           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
434                           comm);
435       index++;
436       requests[index] = smpi_irecv_init(&((char *) recvbuf)
437                                         [other * recvcount * recvsize],
438                                         recvcount, recvtype, other,
439                                         system_tag, comm);
440       index++;
441     }
442   }
443   // Wait for completion of all comms.
444   smpi_mpi_startall(2 * (size - 1), requests);
445   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
446   xbt_free(requests);
447 }
448
449 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
450                          MPI_Datatype sendtype, void *recvbuf,
451                          int *recvcounts, int *displs,
452                          MPI_Datatype recvtype, MPI_Comm comm)
453 {
454   int system_tag = 666;
455   int rank, size, other, index, sendsize, recvsize;
456   MPI_Request *requests;
457
458   rank = smpi_comm_rank(comm);
459   size = smpi_comm_size(comm);
460   sendsize = smpi_datatype_size(sendtype);
461   recvsize = smpi_datatype_size(recvtype);
462   // Local copy from self
463   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
464          sendcount * sendsize * sizeof(char));
465   // Send buffers to others;
466   requests = xbt_new(MPI_Request, 2 * (size - 1));
467   index = 0;
468   for (other = 0; other < size; other++) {
469     if (other != rank) {
470       requests[index] =
471           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
472                           comm);
473       index++;
474       requests[index] =
475           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
476                           recvcounts[other], recvtype, other, system_tag,
477                           comm);
478       index++;
479     }
480   }
481   // Wait for completion of all comms.
482   smpi_mpi_startall(2 * (size - 1), requests);
483   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
484   xbt_free(requests);
485 }
486
487 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
488                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
489                       int root, MPI_Comm comm)
490 {
491   int system_tag = 666;
492   int rank, size, dst, index, sendsize, recvsize;
493   MPI_Request *requests;
494
495   rank = smpi_comm_rank(comm);
496   size = smpi_comm_size(comm);
497   if (rank != root) {
498     // Recv buffer from root
499     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
500                   MPI_STATUS_IGNORE);
501   } else {
502     sendsize = smpi_datatype_size(sendtype);
503     recvsize = smpi_datatype_size(recvtype);
504     // Local copy from root
505     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
506            recvcount * recvsize * sizeof(char));
507     // Send buffers to receivers
508     requests = xbt_new(MPI_Request, size - 1);
509     index = 0;
510     for (dst = 0; dst < size; dst++) {
511       if (dst != root) {
512         requests[index] = smpi_isend_init(&((char *) sendbuf)
513                                           [dst * sendcount * sendsize],
514                                           sendcount, sendtype, dst,
515                                           system_tag, comm);
516         index++;
517       }
518     }
519     // Wait for completion of isend's.
520     smpi_mpi_startall(size - 1, requests);
521     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
522     xbt_free(requests);
523   }
524 }
525
526 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
527                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
528                        MPI_Datatype recvtype, int root, MPI_Comm comm)
529 {
530   int system_tag = 666;
531   int rank, size, dst, index, sendsize, recvsize;
532   MPI_Request *requests;
533
534   rank = smpi_comm_rank(comm);
535   size = smpi_comm_size(comm);
536   if (rank != root) {
537     // Recv buffer from root
538     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
539                   MPI_STATUS_IGNORE);
540   } else {
541     sendsize = smpi_datatype_size(sendtype);
542     recvsize = smpi_datatype_size(recvtype);
543     // Local copy from root
544     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
545            recvcount * recvsize * sizeof(char));
546     // Send buffers to receivers
547     requests = xbt_new(MPI_Request, size - 1);
548     index = 0;
549     for (dst = 0; dst < size; dst++) {
550       if (dst != root) {
551         requests[index] =
552             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
553                             sendcounts[dst], sendtype, dst, system_tag,
554                             comm);
555         index++;
556       }
557     }
558     // Wait for completion of isend's.
559     smpi_mpi_startall(size - 1, requests);
560     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
561     xbt_free(requests);
562   }
563 }
564
565 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
566                      MPI_Datatype datatype, MPI_Op op, int root,
567                      MPI_Comm comm)
568 {
569   int system_tag = 666;
570   int rank, size, src, index, datasize;
571   MPI_Request *requests;
572   void **tmpbufs;
573
574   rank = smpi_comm_rank(comm);
575   size = smpi_comm_size(comm);
576   if (rank != root) {
577     // Send buffer to root
578     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
579   } else {
580     datasize = smpi_datatype_size(datatype);
581     // Local copy from root
582     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
583     // Receive buffers from senders
584     //TODO: make a MPI_barrier here ?
585     requests = xbt_new(MPI_Request, size - 1);
586     tmpbufs = xbt_new(void *, size - 1);
587     index = 0;
588     for (src = 0; src < size; src++) {
589       if (src != root) {
590         tmpbufs[index] = xbt_malloc(count * datasize);
591         requests[index] =
592             smpi_irecv_init(tmpbufs[index], count, datatype, src,
593                             system_tag, comm);
594         index++;
595       }
596     }
597     // Wait for completion of irecv's.
598     smpi_mpi_startall(size - 1, requests);
599     for (src = 0; src < size - 1; src++) {
600       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
601       if (index == MPI_UNDEFINED) {
602         break;
603       }
604       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
605     }
606     for (index = 0; index < size - 1; index++) {
607       xbt_free(tmpbufs[index]);
608     }
609     xbt_free(tmpbufs);
610     xbt_free(requests);
611   }
612 }
613
614 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
615                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
616 {
617   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
618   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
619
620 /*
621 FIXME: buggy implementation
622
623   int system_tag = 666;
624   int rank, size, other, index, datasize;
625   MPI_Request* requests;
626   void** tmpbufs;
627
628   rank = smpi_comm_rank(comm);
629   size = smpi_comm_size(comm);
630   datasize = smpi_datatype_size(datatype);
631   // Local copy from self
632   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
633   // Send/Recv buffers to/from others;
634   //TODO: make a MPI_barrier here ?
635   requests = xbt_new(MPI_Request, 2 * (size - 1));
636   tmpbufs = xbt_new(void*, size - 1);
637   index = 0;
638   for(other = 0; other < size; other++) {
639     if(other != rank) {
640       tmpbufs[index / 2] = xbt_malloc(count * datasize);
641       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
642       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
643       index += 2;
644     }
645   }
646   // Wait for completion of all comms.
647   for(other = 0; other < 2 * (size - 1); other++) {
648     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
649     if(index == MPI_UNDEFINED) {
650       break;
651     }
652     if((index & 1) == 1) {
653       // Request is odd: it's a irecv
654       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
655     }
656   }
657   for(index = 0; index < size - 1; index++) {
658     xbt_free(tmpbufs[index]);
659   }
660   xbt_free(tmpbufs);
661   xbt_free(requests);
662 */
663 }
664
665 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
666                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
667 {
668   int system_tag = 666;
669   int rank, size, other, index, datasize;
670   int total;
671   MPI_Request *requests;
672   void **tmpbufs;
673
674   rank = smpi_comm_rank(comm);
675   size = smpi_comm_size(comm);
676   datasize = smpi_datatype_size(datatype);
677   // Local copy from self
678   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
679   // Send/Recv buffers to/from others;
680   total = rank + (size - (rank + 1));
681   requests = xbt_new(MPI_Request, total);
682   tmpbufs = xbt_new(void *, rank);
683   index = 0;
684   for (other = 0; other < rank; other++) {
685     tmpbufs[index] = xbt_malloc(count * datasize);
686     requests[index] =
687         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
688                         comm);
689     index++;
690   }
691   for (other = rank + 1; other < size; other++) {
692     requests[index] =
693         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
694     index++;
695   }
696   // Wait for completion of all comms.
697   smpi_mpi_startall(size - 1, requests);
698   for (other = 0; other < total; other++) {
699     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
700     if (index == MPI_UNDEFINED) {
701       break;
702     }
703     if (index < rank) {
704       // #Request is below rank: it's a irecv
705       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
706     }
707   }
708   for (index = 0; index < size - 1; index++) {
709     xbt_free(tmpbufs[index]);
710   }
711   xbt_free(tmpbufs);
712   xbt_free(requests);
713 }