Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Sorry, for this stupid error
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9 #include "mc/mc.h"
10
11 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
12                                 "Logging specific to SMPI (base)");
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
21 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
22
23 static int match_recv(void* a, void* b) {
24    MPI_Request ref = (MPI_Request)a;
25    MPI_Request req = (MPI_Request)b;
26
27    xbt_assert(ref, "Cannot match recv against null reference");
28    xbt_assert(req, "Cannot match recv against null request");
29    return req->comm == ref->comm
30           && (ref->src == MPI_ANY_SOURCE || req->src == ref->src)
31           && (ref->tag == MPI_ANY_TAG || req->tag == ref->tag);
32 }
33
34 static int match_send(void* a, void* b) {
35    MPI_Request ref = (MPI_Request)a;
36    MPI_Request req = (MPI_Request)b;
37
38    xbt_assert(ref, "Cannot match send against null reference");
39    xbt_assert(req, "Cannot match send against null request");
40    return req->comm == ref->comm
41           && (req->src == MPI_ANY_SOURCE || req->src == ref->src)
42           && (req->tag == MPI_ANY_TAG || req->tag == ref->tag);
43 }
44
45 static MPI_Request build_request(void *buf, int count,
46                                  MPI_Datatype datatype, int src, int dst,
47                                  int tag, MPI_Comm comm, unsigned flags)
48 {
49   MPI_Request request;
50
51   request = xbt_new(s_smpi_mpi_request_t, 1);
52   request->buf = buf;
53   request->size = smpi_datatype_size(datatype) * count;
54   request->src = src;
55   request->dst = dst;
56   request->tag = tag;
57   request->comm = comm;
58   request->action = NULL;
59   request->flags = flags;
60 #ifdef HAVE_TRACING
61   request->send = 0;
62   request->recv = 0;
63 #endif
64   return request;
65 }
66
67 /* MPI Low level calls */
68 MPI_Request smpi_mpi_send_init(void *buf, int count, MPI_Datatype datatype,
69                                int dst, int tag, MPI_Comm comm)
70 {
71   MPI_Request request =
72       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
73                     comm, PERSISTENT | SEND);
74
75   return request;
76 }
77
78 MPI_Request smpi_mpi_recv_init(void *buf, int count, MPI_Datatype datatype,
79                                int src, int tag, MPI_Comm comm)
80 {
81   MPI_Request request =
82       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
83                     comm, PERSISTENT | RECV);
84
85   return request;
86 }
87
88 void smpi_mpi_start(MPI_Request request)
89 {
90   smx_rdv_t mailbox;
91
92   xbt_assert(!request->action,
93               "Cannot (re)start a non-finished communication");
94   if(request->flags & RECV) {
95     print_request("New recv", request);
96     mailbox = smpi_process_mailbox();
97     request->action = SIMIX_req_comm_irecv(mailbox, request->buf, &request->size, &match_recv, request);
98   } else {
99     print_request("New send", request);
100     mailbox = smpi_process_remote_mailbox(request->dst);
101     request->action = SIMIX_req_comm_isend(mailbox, request->size, -1.0,
102                                            request->buf, request->size, &match_send, request, 0);
103 #ifdef HAVE_TRACING
104     SIMIX_req_set_category (request->action, TRACE_internal_smpi_get_category());
105 #endif
106   }
107 }
108
109 void smpi_mpi_startall(int count, MPI_Request * requests)
110 {
111   int i;
112
113   for(i = 0; i < count; i++) {
114     smpi_mpi_start(requests[i]);
115   }
116 }
117
118 void smpi_mpi_request_free(MPI_Request * request)
119 {
120   xbt_free(*request);
121   *request = MPI_REQUEST_NULL;
122 }
123
124 MPI_Request smpi_isend_init(void *buf, int count, MPI_Datatype datatype,
125                             int dst, int tag, MPI_Comm comm)
126 {
127   MPI_Request request =
128       build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag,
129                     comm, NON_PERSISTENT | SEND);
130
131   return request;
132 }
133
134 MPI_Request smpi_mpi_isend(void *buf, int count, MPI_Datatype datatype,
135                            int dst, int tag, MPI_Comm comm)
136 {
137   MPI_Request request =
138       smpi_isend_init(buf, count, datatype, dst, tag, comm);
139
140   smpi_mpi_start(request);
141   return request;
142 }
143
144 MPI_Request smpi_irecv_init(void *buf, int count, MPI_Datatype datatype,
145                             int src, int tag, MPI_Comm comm)
146 {
147   MPI_Request request =
148       build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag,
149                     comm, NON_PERSISTENT | RECV);
150
151   return request;
152 }
153
154 MPI_Request smpi_mpi_irecv(void *buf, int count, MPI_Datatype datatype,
155                            int src, int tag, MPI_Comm comm)
156 {
157   MPI_Request request =
158       smpi_irecv_init(buf, count, datatype, src, tag, comm);
159
160   smpi_mpi_start(request);
161   return request;
162 }
163
164 void smpi_mpi_recv(void *buf, int count, MPI_Datatype datatype, int src,
165                    int tag, MPI_Comm comm, MPI_Status * status)
166 {
167   MPI_Request request;
168
169   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
170   smpi_mpi_wait(&request, status);
171 }
172
173 void smpi_mpi_send(void *buf, int count, MPI_Datatype datatype, int dst,
174                    int tag, MPI_Comm comm)
175 {
176   MPI_Request request;
177
178   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
179   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
180 }
181
182 void smpi_mpi_sendrecv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
183                        int dst, int sendtag, void *recvbuf, int recvcount,
184                        MPI_Datatype recvtype, int src, int recvtag,
185                        MPI_Comm comm, MPI_Status * status)
186 {
187   MPI_Request requests[2];
188   MPI_Status stats[2];
189
190   requests[0] =
191       smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
192   requests[1] =
193       smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
194   smpi_mpi_startall(2, requests);
195   smpi_mpi_waitall(2, requests, stats);
196   if(status != MPI_STATUS_IGNORE) {
197     // Copy receive status
198     memcpy(status, &stats[1], sizeof(MPI_Status));
199   }
200 }
201
202 int smpi_mpi_get_count(MPI_Status * status, MPI_Datatype datatype)
203 {
204   return status->count / smpi_datatype_size(datatype);
205 }
206
207 static void finish_wait(MPI_Request * request, MPI_Status * status)
208 {
209   MPI_Request req = *request;
210
211   if(status != MPI_STATUS_IGNORE) {
212     status->MPI_SOURCE = req->src;
213     status->MPI_TAG = req->tag;
214     status->MPI_ERROR = MPI_SUCCESS;
215     status->count = req->size;
216   }
217   print_request("Finishing", req);
218   if(req->flags & NON_PERSISTENT) {
219     smpi_mpi_request_free(request);
220   } else {
221     req->action = NULL;
222   }
223 }
224
225 int smpi_mpi_test(MPI_Request * request, MPI_Status * status)
226 {
227    int flag = SIMIX_req_comm_test((*request)->action);
228
229    if(flag) {
230       smpi_mpi_wait(request, status);
231    }
232    return flag;
233 }
234
235 int smpi_mpi_testany(int count, MPI_Request requests[], int *index,
236                      MPI_Status * status)
237 {
238   xbt_dynar_t comms;
239   int i, flag, size;
240   int* map;
241
242   *index = MPI_UNDEFINED;
243   flag = 0;
244   if(count > 0) {
245     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
246     map = xbt_new(int, count);
247     size = 0;
248     for(i = 0; i < count; i++) {
249       if(requests[i]->action) {
250          xbt_dynar_push(comms, &requests[i]->action);
251          map[size] = i;
252          size++;
253       }
254     }
255     if(size > 0) {
256       *index = SIMIX_req_comm_testany(comms);
257       *index = map[*index];
258       if(*index != MPI_UNDEFINED) {
259         smpi_mpi_wait(&requests[*index], status);
260         flag = 1;
261       }
262     }
263     xbt_free(map);
264     xbt_dynar_free(&comms);
265   }
266   return flag;
267 }
268
269 void smpi_mpi_wait(MPI_Request * request, MPI_Status * status)
270 {
271   print_request("Waiting", *request);
272   SIMIX_req_comm_wait((*request)->action, -1.0);
273   finish_wait(request, status);
274 }
275
276 int smpi_mpi_waitany(int count, MPI_Request requests[],
277                      MPI_Status * status)
278 {
279   xbt_dynar_t comms;
280   int i, size, index;
281   int *map;
282
283   index = MPI_UNDEFINED;
284   if(count > 0) {
285     // Wait for a request to complete
286     comms = xbt_dynar_new(sizeof(smx_action_t), NULL);
287     map = xbt_new(int, count);
288     size = 0;
289     XBT_DEBUG("Wait for one of");
290     for(i = 0; i < count; i++) {
291       if(requests[i] != MPI_REQUEST_NULL) {
292         print_request("   ", requests[i]);
293         xbt_dynar_push(comms, &requests[i]->action);
294         map[size] = i;
295         size++;
296       }
297     }
298     if(size > 0) {
299       index = SIMIX_req_comm_waitany(comms);
300       index = map[index];
301       finish_wait(&requests[index], status);
302     }
303     xbt_free(map);
304     xbt_dynar_free(&comms);
305   }
306   return index;
307 }
308
309 void smpi_mpi_waitall(int count, MPI_Request requests[],
310                       MPI_Status status[])
311 {
312   int index, c;
313   MPI_Status stat;
314   MPI_Status *pstat = status == MPI_STATUS_IGNORE ? MPI_STATUS_IGNORE : &stat;
315
316   for(c = 0; c < count; c++) {
317     if(MC_IS_ENABLED) {
318       smpi_mpi_wait(&requests[c], pstat);
319       index = c;
320     } else {
321       index = smpi_mpi_waitany(count, requests, pstat);
322       if(index == MPI_UNDEFINED) {
323         break;
324       }
325     }
326     if(status != MPI_STATUS_IGNORE) {
327       memcpy(&status[index], pstat, sizeof(*pstat));
328     }
329   }
330 }
331
332 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int *indices,
333                       MPI_Status status[])
334 {
335   int i, count, index;
336
337   count = 0;
338   for(i = 0; i < incount; i++) {
339      if(smpi_mpi_testany(incount, requests, &index, status)) {
340        indices[count] = index;
341        count++;
342      }
343   }
344   return count;
345 }
346
347 void smpi_mpi_bcast(void *buf, int count, MPI_Datatype datatype, int root,
348                     MPI_Comm comm)
349 {
350   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
351   nary_tree_bcast(buf, count, datatype, root, comm, 4);
352 }
353
354 void smpi_mpi_barrier(MPI_Comm comm)
355 {
356   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
357   nary_tree_barrier(comm, 4);
358 }
359
360 void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
361                      void *recvbuf, int recvcount, MPI_Datatype recvtype,
362                      int root, MPI_Comm comm)
363 {
364   int system_tag = 666;
365   int rank, size, src, index, sendsize, recvsize;
366   MPI_Request *requests;
367
368   rank = smpi_comm_rank(comm);
369   size = smpi_comm_size(comm);
370   if(rank != root) {
371     // Send buffer to root
372     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
373   } else {
374     sendsize = smpi_datatype_size(sendtype);
375     recvsize = smpi_datatype_size(recvtype);
376     // Local copy from root
377     memcpy(&((char *) recvbuf)[root * recvcount * recvsize], sendbuf,
378            sendcount * sendsize * sizeof(char));
379     // Receive buffers from senders
380     requests = xbt_new(MPI_Request, size - 1);
381     index = 0;
382     for(src = 0; src < size; src++) {
383       if(src != root) {
384         requests[index] = smpi_irecv_init(&((char *) recvbuf)
385                                           [src * recvcount * recvsize],
386                                           recvcount, recvtype, src,
387                                           system_tag, comm);
388         index++;
389       }
390     }
391     // Wait for completion of irecv's.
392     smpi_mpi_startall(size - 1, requests);
393     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
394     xbt_free(requests);
395   }
396 }
397
398 void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype,
399                       void *recvbuf, int *recvcounts, int *displs,
400                       MPI_Datatype recvtype, int root, MPI_Comm comm)
401 {
402   int system_tag = 666;
403   int rank, size, src, index, sendsize;
404   MPI_Request *requests;
405
406   rank = smpi_comm_rank(comm);
407   size = smpi_comm_size(comm);
408   if(rank != root) {
409     // Send buffer to root
410     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
411   } else {
412     sendsize = smpi_datatype_size(sendtype);
413     // Local copy from root
414     memcpy(&((char *) recvbuf)[displs[root]], sendbuf,
415            sendcount * sendsize * sizeof(char));
416     // Receive buffers from senders
417     requests = xbt_new(MPI_Request, size - 1);
418     index = 0;
419     for(src = 0; src < size; src++) {
420       if(src != root) {
421         requests[index] =
422             smpi_irecv_init(&((char *) recvbuf)[displs[src]],
423                             recvcounts[src], recvtype, src, system_tag,
424                             comm);
425         index++;
426       }
427     }
428     // Wait for completion of irecv's.
429     smpi_mpi_startall(size - 1, requests);
430     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
431     xbt_free(requests);
432   }
433 }
434
435 void smpi_mpi_allgather(void *sendbuf, int sendcount,
436                         MPI_Datatype sendtype, void *recvbuf,
437                         int recvcount, MPI_Datatype recvtype,
438                         MPI_Comm comm)
439 {
440   int system_tag = 666;
441   int rank, size, other, index, sendsize, recvsize;
442   MPI_Request *requests;
443
444   rank = smpi_comm_rank(comm);
445   size = smpi_comm_size(comm);
446   sendsize = smpi_datatype_size(sendtype);
447   recvsize = smpi_datatype_size(recvtype);
448   // Local copy from self
449   memcpy(&((char *) recvbuf)[rank * recvcount * recvsize], sendbuf,
450          sendcount * sendsize * sizeof(char));
451   // Send/Recv buffers to/from others;
452   requests = xbt_new(MPI_Request, 2 * (size - 1));
453   index = 0;
454   for(other = 0; other < size; other++) {
455     if(other != rank) {
456       requests[index] =
457           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
458                           comm);
459       index++;
460       requests[index] = smpi_irecv_init(&((char *) recvbuf)
461                                         [other * recvcount * recvsize],
462                                         recvcount, recvtype, other,
463                                         system_tag, comm);
464       index++;
465     }
466   }
467   // Wait for completion of all comms.
468   smpi_mpi_startall(2 * (size - 1), requests);
469   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
470   xbt_free(requests);
471 }
472
473 void smpi_mpi_allgatherv(void *sendbuf, int sendcount,
474                          MPI_Datatype sendtype, void *recvbuf,
475                          int *recvcounts, int *displs,
476                          MPI_Datatype recvtype, MPI_Comm comm)
477 {
478   int system_tag = 666;
479   int rank, size, other, index, sendsize;
480   MPI_Request *requests;
481
482   rank = smpi_comm_rank(comm);
483   size = smpi_comm_size(comm);
484   sendsize = smpi_datatype_size(sendtype);
485   // Local copy from self
486   memcpy(&((char *) recvbuf)[displs[rank]], sendbuf,
487          sendcount * sendsize * sizeof(char));
488   // Send buffers to others;
489   requests = xbt_new(MPI_Request, 2 * (size - 1));
490   index = 0;
491   for(other = 0; other < size; other++) {
492     if(other != rank) {
493       requests[index] =
494           smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag,
495                           comm);
496       index++;
497       requests[index] =
498           smpi_irecv_init(&((char *) recvbuf)[displs[other]],
499                           recvcounts[other], recvtype, other, system_tag,
500                           comm);
501       index++;
502     }
503   }
504   // Wait for completion of all comms.
505   smpi_mpi_startall(2 * (size - 1), requests);
506   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
507   xbt_free(requests);
508 }
509
510 void smpi_mpi_scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype,
511                       void *recvbuf, int recvcount, MPI_Datatype recvtype,
512                       int root, MPI_Comm comm)
513 {
514   int system_tag = 666;
515   int rank, size, dst, index, sendsize, recvsize;
516   MPI_Request *requests;
517
518   rank = smpi_comm_rank(comm);
519   size = smpi_comm_size(comm);
520   if(rank != root) {
521     // Recv buffer from root
522     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
523                   MPI_STATUS_IGNORE);
524   } else {
525     sendsize = smpi_datatype_size(sendtype);
526     recvsize = smpi_datatype_size(recvtype);
527     // Local copy from root
528     memcpy(recvbuf, &((char *) sendbuf)[root * sendcount * sendsize],
529            recvcount * recvsize * sizeof(char));
530     // Send buffers to receivers
531     requests = xbt_new(MPI_Request, size - 1);
532     index = 0;
533     for(dst = 0; dst < size; dst++) {
534       if(dst != root) {
535         requests[index] = smpi_isend_init(&((char *) sendbuf)
536                                           [dst * sendcount * sendsize],
537                                           sendcount, sendtype, dst,
538                                           system_tag, comm);
539         index++;
540       }
541     }
542     // Wait for completion of isend's.
543     smpi_mpi_startall(size - 1, requests);
544     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
545     xbt_free(requests);
546   }
547 }
548
549 void smpi_mpi_scatterv(void *sendbuf, int *sendcounts, int *displs,
550                        MPI_Datatype sendtype, void *recvbuf, int recvcount,
551                        MPI_Datatype recvtype, int root, MPI_Comm comm)
552 {
553   int system_tag = 666;
554   int rank, size, dst, index, recvsize;
555   MPI_Request *requests;
556
557   rank = smpi_comm_rank(comm);
558   size = smpi_comm_size(comm);
559   if(rank != root) {
560     // Recv buffer from root
561     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm,
562                   MPI_STATUS_IGNORE);
563   } else {
564     recvsize = smpi_datatype_size(recvtype);
565     // Local copy from root
566     memcpy(recvbuf, &((char *) sendbuf)[displs[root]],
567            recvcount * recvsize * sizeof(char));
568     // Send buffers to receivers
569     requests = xbt_new(MPI_Request, size - 1);
570     index = 0;
571     for(dst = 0; dst < size; dst++) {
572       if(dst != root) {
573         requests[index] =
574             smpi_isend_init(&((char *) sendbuf)[displs[dst]],
575                             sendcounts[dst], sendtype, dst, system_tag,
576                             comm);
577         index++;
578       }
579     }
580     // Wait for completion of isend's.
581     smpi_mpi_startall(size - 1, requests);
582     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
583     xbt_free(requests);
584   }
585 }
586
587 void smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count,
588                      MPI_Datatype datatype, MPI_Op op, int root,
589                      MPI_Comm comm)
590 {
591   int system_tag = 666;
592   int rank, size, src, index, datasize;
593   MPI_Request *requests;
594   void **tmpbufs;
595
596   rank = smpi_comm_rank(comm);
597   size = smpi_comm_size(comm);
598   if(rank != root) {
599     // Send buffer to root
600     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
601   } else {
602     datasize = smpi_datatype_size(datatype);
603     // Local copy from root
604     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
605     // Receive buffers from senders
606     //TODO: make a MPI_barrier here ?
607     requests = xbt_new(MPI_Request, size - 1);
608     tmpbufs = xbt_new(void *, size - 1);
609     index = 0;
610     for(src = 0; src < size; src++) {
611       if(src != root) {
612         tmpbufs[index] = xbt_malloc(count * datasize);
613         requests[index] =
614             smpi_irecv_init(tmpbufs[index], count, datatype, src,
615                             system_tag, comm);
616         index++;
617       }
618     }
619     // Wait for completion of irecv's.
620     smpi_mpi_startall(size - 1, requests);
621     for(src = 0; src < size - 1; src++) {
622       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
623       if(index == MPI_UNDEFINED) {
624         break;
625       }
626       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
627     }
628     for(index = 0; index < size - 1; index++) {
629       xbt_free(tmpbufs[index]);
630     }
631     xbt_free(tmpbufs);
632     xbt_free(requests);
633   }
634 }
635
636 void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count,
637                         MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
638 {
639   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
640   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
641
642 /*
643 FIXME: buggy implementation
644
645   int system_tag = 666;
646   int rank, size, other, index, datasize;
647   MPI_Request* requests;
648   void** tmpbufs;
649
650   rank = smpi_comm_rank(comm);
651   size = smpi_comm_size(comm);
652   datasize = smpi_datatype_size(datatype);
653   // Local copy from self
654   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
655   // Send/Recv buffers to/from others;
656   //TODO: make a MPI_barrier here ?
657   requests = xbt_new(MPI_Request, 2 * (size - 1));
658   tmpbufs = xbt_new(void*, size - 1);
659   index = 0;
660   for(other = 0; other < size; other++) {
661     if(other != rank) {
662       tmpbufs[index / 2] = xbt_malloc(count * datasize);
663       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
664       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
665       index += 2;
666     }
667   }
668   // Wait for completion of all comms.
669   for(other = 0; other < 2 * (size - 1); other++) {
670     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
671     if(index == MPI_UNDEFINED) {
672       break;
673     }
674     if((index & 1) == 1) {
675       // Request is odd: it's a irecv
676       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
677     }
678   }
679   for(index = 0; index < size - 1; index++) {
680     xbt_free(tmpbufs[index]);
681   }
682   xbt_free(tmpbufs);
683   xbt_free(requests);
684 */
685 }
686
687 void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count,
688                    MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
689 {
690   int system_tag = 666;
691   int rank, size, other, index, datasize;
692   int total;
693   MPI_Request *requests;
694   void **tmpbufs;
695
696   rank = smpi_comm_rank(comm);
697   size = smpi_comm_size(comm);
698   datasize = smpi_datatype_size(datatype);
699   // Local copy from self
700   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
701   // Send/Recv buffers to/from others;
702   total = rank + (size - (rank + 1));
703   requests = xbt_new(MPI_Request, total);
704   tmpbufs = xbt_new(void *, rank);
705   index = 0;
706   for(other = 0; other < rank; other++) {
707     tmpbufs[index] = xbt_malloc(count * datasize);
708     requests[index] =
709         smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag,
710                         comm);
711     index++;
712   }
713   for(other = rank + 1; other < size; other++) {
714     requests[index] =
715         smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
716     index++;
717   }
718   // Wait for completion of all comms.
719   smpi_mpi_startall(size - 1, requests);
720   for(other = 0; other < total; other++) {
721     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
722     if(index == MPI_UNDEFINED) {
723       break;
724     }
725     if(index < rank) {
726       // #Request is below rank: it's a irecv
727       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
728     }
729   }
730   for(index = 0; index < size - 1; index++) {
731     xbt_free(tmpbufs[index]);
732   }
733   xbt_free(tmpbufs);
734   xbt_free(requests);
735 }