Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make bprintf abort on error, and define bvprintf accordingly.
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 void smpi_process_init(int *argc, char ***argv)
23 {
24   int index;
25   smpi_process_data_t data;
26   smx_process_t proc;
27
28   proc = SIMIX_process_self();
29   index = atoi((*argv)[1]);
30   data = smpi_process_remote_data(index);
31   SIMIX_process_set_data(proc, data);
32   if (*argc > 2) {
33     free((*argv)[1]);
34     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
35     (*argv)[(*argc) - 1] = NULL;
36   }
37   (*argc)--;
38   DEBUG2("<%d> New process in the game: %p", index, proc);
39 }
40
41 void smpi_process_destroy(void)
42 {
43   int index = smpi_process_index();
44
45   DEBUG1("<%d> Process left the game", index);
46 }
47
48 static MPI_Request build_request(void *buf, int count,
49                                  MPI_Datatype datatype, int src, int dst,
50                                  int tag, MPI_Comm comm, unsigned flags)
51 {
52   MPI_Request request;
53
54   request = xbt_new(s_smpi_mpi_request_t, 1);
55   request->buf = buf;
56   request->size = smpi_datatype_size(datatype) * count;
57   request->src = src;
58   request->dst = dst;
59   request->tag = tag;
60   request->comm = comm;
61   request->rdv = NULL;
62   request->pair = NULL;
63   request->complete = 0;
64   request->match = MPI_REQUEST_NULL;
65   request->flags = flags;
66 #ifdef HAVE_TRACING
67   request->send = 0;
68   request->recv = 0;
69 #endif
70   return request;
71 }
72
73 /* MPI Low level calls */
74 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
75                                int dst, int tag, MPI_Comm comm)
76 {
77   MPI_Request request =
78       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
79                     comm, PERSISTENT | SEND);
80
81   return request;
82 }
83
84 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
85                                int src, int tag, MPI_Comm comm)
86 {
87   MPI_Request request =
88       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
89                     comm, PERSISTENT | RECV);
90
91   return request;
92 }
93
94 void smpi_mpi_start(MPI_Request request)
95 {
96   xbt_assert0(request->complete == 0,
97               "Cannot start a non-finished communication");
98   if ((request->flags & RECV) == RECV) {
99     smpi_process_post_recv(request);
100     print_request("New recv", request);
101     request->pair =
102         SIMIX_network_irecv(request->rdv, request->buf, &request->size);
103   } else {
104     smpi_process_post_send(request->comm, request);     // FIXME
105     print_request("New send", request);
106     request->pair =
107         SIMIX_network_isend(request->rdv, request->size, -1.0,
108                             request->buf, request->size, NULL);
109   }
110 }
111
112 void smpi_mpi_startall(int count, MPI_Request * requests)
113 {
114   int i;
115
116   for (i = 0; i < count; i++) {
117     smpi_mpi_start(requests[i]);
118   }
119 }
120
121 void smpi_mpi_request_free(MPI_Request * request)
122 {
123   xbt_free(*request);
124   *request = MPI_REQUEST_NULL;
125 }
126
127 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
128                             int dst, int tag, MPI_Comm comm)
129 {
130   MPI_Request request =
131       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
132                     comm, NON_PERSISTENT | SEND);
133
134   return request;
135 }
136
137 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
138                            int dst, int tag, MPI_Comm comm)
139 {
140   MPI_Request request =
141       smpi_isend_init(buf, count, datatype, dst, tag, comm);
142
143   smpi_mpi_start(request);
144   return request;
145 }
146
147 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
148                             int src, int tag, MPI_Comm comm)
149 {
150   MPI_Request request =
151       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
152                     comm, NON_PERSISTENT | RECV);
153
154   return request;
155 }
156
157 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
158                            int src, int tag, MPI_Comm comm)
159 {
160   MPI_Request request =
161       smpi_irecv_init(buf, count, datatype, src, tag, comm);
162
163   smpi_mpi_start(request);
164   return request;
165 }
166
167 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
168                    int tag, MPI_Comm comm, MPI_Status * status)
169 {
170   MPI_Request request;
171
172   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
173   smpi_mpi_wait(&request, status);
174 }
175
176 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
177                    int tag, MPI_Comm comm)
178 {
179   MPI_Request request;
180
181   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
182   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
183 }
184
185 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
186                        int dst, int sendtag, void *recvbuf, int recvcount,
187                        MPI_Datatype recvtype, int src, int recvtag,
188                        MPI_Comm comm, MPI_Status * status)
189 {
190   MPI_Request requests[2];
191   MPI_Status stats[2];
192
193   requests[0] =
194       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
195   requests[1] =
196       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
197   smpi_mpi_startall(2, requests);
198   smpi_mpi_waitall(2, requests, stats);
199   if (status != MPI_STATUS_IGNORE) {
200     // Copy receive status
201     memcpy(status, &stats[1], sizeof(MPI_Status));
202   }
203 }
204
205 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
206 {
207   return status->count / smpi_datatype_size(datatype);
208 }
209
210 static void finish_wait(MPI_Request * request, MPI_Status * status)
211 {
212   if (status != MPI_STATUS_IGNORE) {
213     status->MPI_SOURCE = (*request)->src;
214     status->MPI_TAG = (*request)->tag;
215     status->MPI_ERROR = MPI_SUCCESS;
216     status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
217   }
218   print_request("finishing wait", *request);
219   if ((*request)->complete == 1) {
220     SIMIX_rdv_destroy((*request)->rdv);
221   } else {
222     (*request)->match->complete = 1;
223     (*request)->match->match = MPI_REQUEST_NULL;
224   }
225   if (((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
226     smpi_mpi_request_free(request);
227   } else {
228     (*request)->rdv = NULL;
229     (*request)->pair = NULL;
230   }
231 }
232
233 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
234 {
235   int flag = (*request)->complete;
236
237   if (flag) {
238     smpi_mpi_wait(request, status);
239   }
240   return flag;
241 }
242
243 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
244                      MPI_Status * status)
245 {
246   int i, flag;
247
248   *index = MPI_UNDEFINED;
249   flag = 0;
250   for (i = 0; i < count; i++) {
251     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
252       smpi_mpi_wait(&requests[i], status);
253       *index = i;
254       flag = 1;
255       break;
256     }
257   }
258   return flag;
259 }
260
261 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
262 {
263   print_request("wait", *request);
264   SIMIX_network_wait((*request)->pair, -1.0);
265   finish_wait(request, status);
266 }
267
268 int smpi_mpi_waitany(int count, MPI_Request requests[],
269                      MPI_Status * status)
270 {
271   xbt_dynar_t comms;
272   int i, size, index;
273   int *map;
274
275   index = MPI_UNDEFINED;
276   if (count > 0) {
277     // First check for already completed requests
278     for (i = 0; i < count; i++) {
279       if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
280         index = i;
281         smpi_mpi_wait(&requests[index], status);
282         break;
283       }
284     }
285     if (index == MPI_UNDEFINED) {
286       // Otherwise, wait for a request to complete
287       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
288       map = xbt_new(int, count);
289       size = 0;
290       DEBUG0("Wait for one of");
291       for (i = 0; i < count; i++) {
292         if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
293           print_request("   ", requests[i]);
294           xbt_dynar_push(comms, &requests[i]->pair);
295           map[size] = i;
296           size++;
297         }
298       }
299       if (size > 0) {
300         index = SIMIX_network_waitany(comms);
301         index = map[index];
302         finish_wait(&requests[index], status);
303       }
304       xbt_free(map);
305       xbt_dynar_free(&comms);
306     }
307   }
308   return index;
309 }
310
311 void smpi_mpi_waitall(int count, MPI_Request requests[],
312                       MPI_Status status[])
313 {
314   int index;
315   MPI_Status stat;
316
317   while (count > 0) {
318     index = smpi_mpi_waitany(count, requests, &stat);
319     if (index == MPI_UNDEFINED) {
320       break;
321     }
322     if (status != MPI_STATUS_IGNORE) {
323       memcpy(&status[index], &stat, sizeof(stat));
324     }
325     // FIXME: check this -v
326     // Move the last request to the found position
327     requests[index] = requests[count - 1];
328     requests[count - 1] = MPI_REQUEST_NULL;
329     count--;
330   }
331 }
332
333 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
334                       MPI_Status status[])
335 {
336   int i, count;
337
338   count = 0;
339   for (i = 0; i < incount; i++) {
340     if (requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
341       smpi_mpi_wait(&requests[i],
342                     status !=
343                     MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
344       indices[count] = i;
345       count++;
346     }
347   }
348   return count;
349 }
350
351 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
352                     MPI_Comm comm)
353 {
354   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
355   nary_tree_bcast(buf, count, datatype, root, comm, 4);
356 }
357
358 void smpi_mpi_barrier(MPI_Comm comm)
359 {
360   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
361   nary_tree_barrier(comm, 4);
362 }
363
364 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
365                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
366                      int root, MPI_Comm comm)
367 {
368   int system_tag = 666;
369   int rank, size, src, index, sendsize, recvsize;
370   MPI_Request *requests;
371
372   rank = smpi_comm_rank(comm);
373   size = smpi_comm_size(comm);
374   if (rank != root) {
375     // Send buffer to root
376     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
377   } else {
378     sendsize = smpi_datatype_size(sendtype);
379     recvsize = smpi_datatype_size(recvtype);
380     // Local copy from root
381     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
382            sendcount * sendsize * sizeof(char));
383     // Receive buffers from senders
384     requests = xbt_new(MPI_Request, size - 1);
385     index = 0;
386     for (src = 0; src < size; src++) {
387       if (src != root) {
388         requests[index] = smpi_irecv_init(&((char *) recvbuf)
389                                           [src * recvcount * recvsize],
390                                           recvcount, recvtype, src,
391                                           system_tag, comm);
392         index++;
393       }
394     }
395     // Wait for completion of irecv's.
396     smpi_mpi_startall(size - 1, requests);
397     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
398     xbt_free(requests);
399   }
400 }
401
402 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
403                       void *recvbuf, int *recvcounts, int *displs,
404                       MPI_Datatype recvtype, int root, MPI_Comm comm)
405 {
406   int system_tag = 666;
407   int rank, size, src, index, sendsize;
408   MPI_Request *requests;
409
410   rank = smpi_comm_rank(comm);
411   size = smpi_comm_size(comm);
412   if (rank != root) {
413     // Send buffer to root
414     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
415   } else {
416     sendsize = smpi_datatype_size(sendtype);
417     // Local copy from root
418     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
419            sendcount * sendsize * sizeof(char));
420     // Receive buffers from senders
421     requests = xbt_new(MPI_Request, size - 1);
422     index = 0;
423     for (src = 0; src < size; src++) {
424       if (src != root) {
425         requests[index] =
426             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
427                             recvcounts[src], recvtype, src, system_tag,
428                             comm);
429         index++;
430       }
431     }
432     // Wait for completion of irecv's.
433     smpi_mpi_startall(size - 1, requests);
434     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
435     xbt_free(requests);
436   }
437 }
438
439 void smpi_mpi_allgather(void *sendbuf, int sendcount,
440                         MPI_Datatype sendtype, void *recvbuf,
441                         int recvcount, MPI_Datatype recvtype,
442                         MPI_Comm comm)
443 {
444   int system_tag = 666;
445   int rank, size, other, index, sendsize, recvsize;
446   MPI_Request *requests;
447
448   rank = smpi_comm_rank(comm);
449   size = smpi_comm_size(comm);
450   sendsize = smpi_datatype_size(sendtype);
451   recvsize = smpi_datatype_size(recvtype);
452   // Local copy from self
453   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
454          sendcount * sendsize * sizeof(char));
455   // Send/Recv buffers to/from others;
456   requests = xbt_new(MPI_Request, 2 * (size - 1));
457   index = 0;
458   for (other = 0; other < size; other++) {
459     if (other != rank) {
460       requests[index] =
461           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
462                           comm);
463       index++;
464       requests[index] = smpi_irecv_init(&((char *) recvbuf)
465                                         [other * recvcount * recvsize],
466                                         recvcount, recvtype, other,
467                                         system_tag, comm);
468       index++;
469     }
470   }
471   // Wait for completion of all comms.
472   smpi_mpi_startall(2 * (size - 1), requests);
473   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
474   xbt_free(requests);
475 }
476
477 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
478                          MPI_Datatype sendtype, void *recvbuf,
479                          int *recvcounts, int *displs,
480                          MPI_Datatype recvtype, MPI_Comm comm)
481 {
482   int system_tag = 666;
483   int rank, size, other, index, sendsize, recvsize;
484   MPI_Request *requests;
485
486   rank = smpi_comm_rank(comm);
487   size = smpi_comm_size(comm);
488   sendsize = smpi_datatype_size(sendtype);
489   recvsize = smpi_datatype_size(recvtype);
490   // Local copy from self
491   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
492          sendcount * sendsize * sizeof(char));
493   // Send buffers to others;
494   requests = xbt_new(MPI_Request, 2 * (size - 1));
495   index = 0;
496   for (other = 0; other < size; other++) {
497     if (other != rank) {
498       requests[index] =
499           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
500                           comm);
501       index++;
502       requests[index] =
503           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
504                           recvcounts[other], recvtype, other, system_tag,
505                           comm);
506       index++;
507     }
508   }
509   // Wait for completion of all comms.
510   smpi_mpi_startall(2 * (size - 1), requests);
511   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
512   xbt_free(requests);
513 }
514
515 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
516                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
517                       int root, MPI_Comm comm)
518 {
519   int system_tag = 666;
520   int rank, size, dst, index, sendsize, recvsize;
521   MPI_Request *requests;
522
523   rank = smpi_comm_rank(comm);
524   size = smpi_comm_size(comm);
525   if (rank != root) {
526     // Recv buffer from root
527     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
528                   MPI_STATUS_IGNORE);
529   } else {
530     sendsize = smpi_datatype_size(sendtype);
531     recvsize = smpi_datatype_size(recvtype);
532     // Local copy from root
533     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
534            recvcount * recvsize * sizeof(char));
535     // Send buffers to receivers
536     requests = xbt_new(MPI_Request, size - 1);
537     index = 0;
538     for (dst = 0; dst < size; dst++) {
539       if (dst != root) {
540         requests[index] = smpi_isend_init(&((char *) sendbuf)
541                                           [dst * sendcount * sendsize],
542                                           sendcount, sendtype, dst,
543                                           system_tag, comm);
544         index++;
545       }
546     }
547     // Wait for completion of isend's.
548     smpi_mpi_startall(size - 1, requests);
549     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
550     xbt_free(requests);
551   }
552 }
553
554 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
555                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
556                        MPI_Datatype recvtype, int root, MPI_Comm comm)
557 {
558   int system_tag = 666;
559   int rank, size, dst, index, sendsize, recvsize;
560   MPI_Request *requests;
561
562   rank = smpi_comm_rank(comm);
563   size = smpi_comm_size(comm);
564   if (rank != root) {
565     // Recv buffer from root
566     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
567                   MPI_STATUS_IGNORE);
568   } else {
569     sendsize = smpi_datatype_size(sendtype);
570     recvsize = smpi_datatype_size(recvtype);
571     // Local copy from root
572     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
573            recvcount * recvsize * sizeof(char));
574     // Send buffers to receivers
575     requests = xbt_new(MPI_Request, size - 1);
576     index = 0;
577     for (dst = 0; dst < size; dst++) {
578       if (dst != root) {
579         requests[index] =
580             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
581                             sendcounts[dst], sendtype, dst, system_tag,
582                             comm);
583         index++;
584       }
585     }
586     // Wait for completion of isend's.
587     smpi_mpi_startall(size - 1, requests);
588     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
589     xbt_free(requests);
590   }
591 }
592
593 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
594                      MPI_Datatype datatype, MPI_Op op, int root,
595                      MPI_Comm comm)
596 {
597   int system_tag = 666;
598   int rank, size, src, index, datasize;
599   MPI_Request *requests;
600   void **tmpbufs;
601
602   rank = smpi_comm_rank(comm);
603   size = smpi_comm_size(comm);
604   if (rank != root) {
605     // Send buffer to root
606     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
607   } else {
608     datasize = smpi_datatype_size(datatype);
609     // Local copy from root
610     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
611     // Receive buffers from senders
612     //TODO: make a MPI_barrier here ?
613     requests = xbt_new(MPI_Request, size - 1);
614     tmpbufs = xbt_new(void *, size - 1);
615     index = 0;
616     for (src = 0; src < size; src++) {
617       if (src != root) {
618         tmpbufs[index] = xbt_malloc(count * datasize);
619         requests[index] =
620             smpi_irecv_init(tmpbufs[index], count, datatype, src,
621                             system_tag, comm);
622         index++;
623       }
624     }
625     // Wait for completion of irecv's.
626     smpi_mpi_startall(size - 1, requests);
627     for (src = 0; src < size - 1; src++) {
628       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
629       if (index == MPI_UNDEFINED) {
630         break;
631       }
632       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
633     }
634     for (index = 0; index < size - 1; index++) {
635       xbt_free(tmpbufs[index]);
636     }
637     xbt_free(tmpbufs);
638     xbt_free(requests);
639   }
640 }
641
642 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
643                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
644 {
645   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
646   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
647
648 /*
649 FIXME: buggy implementation
650
651   int system_tag = 666;
652   int rank, size, other, index, datasize;
653   MPI_Request* requests;
654   void** tmpbufs;
655
656   rank = smpi_comm_rank(comm);
657   size = smpi_comm_size(comm);
658   datasize = smpi_datatype_size(datatype);
659   // Local copy from self
660   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
661   // Send/Recv buffers to/from others;
662   //TODO: make a MPI_barrier here ?
663   requests = xbt_new(MPI_Request, 2 * (size - 1));
664   tmpbufs = xbt_new(void*, size - 1);
665   index = 0;
666   for(other = 0; other < size; other++) {
667     if(other != rank) {
668       tmpbufs[index / 2] = xbt_malloc(count * datasize);
669       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
670       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
671       index += 2;
672     }
673   }
674   // Wait for completion of all comms.
675   for(other = 0; other < 2 * (size - 1); other++) {
676     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
677     if(index == MPI_UNDEFINED) {
678       break;
679     }
680     if((index & 1) == 1) {
681       // Request is odd: it's a irecv
682       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
683     }
684   }
685   for(index = 0; index < size - 1; index++) {
686     xbt_free(tmpbufs[index]);
687   }
688   xbt_free(tmpbufs);
689   xbt_free(requests);
690 */
691 }
692
693 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
694                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
695 {
696   int system_tag = 666;
697   int rank, size, other, index, datasize;
698   int total;
699   MPI_Request *requests;
700   void **tmpbufs;
701
702   rank = smpi_comm_rank(comm);
703   size = smpi_comm_size(comm);
704   datasize = smpi_datatype_size(datatype);
705   // Local copy from self
706   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
707   // Send/Recv buffers to/from others;
708   total = rank + (size - (rank + 1));
709   requests = xbt_new(MPI_Request, total);
710   tmpbufs = xbt_new(void *, rank);
711   index = 0;
712   for (other = 0; other < rank; other++) {
713     tmpbufs[index] = xbt_malloc(count * datasize);
714     requests[index] =
715         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
716                         comm);
717     index++;
718   }
719   for (other = rank + 1; other < size; other++) {
720     requests[index] =
721         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
722     index++;
723   }
724   // Wait for completion of all comms.
725   smpi_mpi_startall(size - 1, requests);
726   for (other = 0; other < total; other++) {
727     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
728     if (index == MPI_UNDEFINED) {
729       break;
730     }
731     if (index < rank) {
732       // #Request is below rank: it's a irecv
733       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
734     }
735   }
736   for (index = 0; index < size - 1; index++) {
737     xbt_free(tmpbufs[index]);
738   }
739   xbt_free(tmpbufs);
740   xbt_free(requests);
741 }