Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update doc for asynchronous comm and pthread_mutexes of pthread_conds.
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 void smpi_process_init(int* argc, char*** argv) {
23   int index;
24   smpi_process_data_t data;
25   smx_process_t proc;
26
27   proc = SIMIX_process_self();
28   index = atoi((*argv)[1]);
29   data = smpi_process_remote_data(index);
30   SIMIX_process_set_data(proc, data);
31   if (*argc > 2) {
32     free((*argv)[1]);
33     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
34     (*argv)[(*argc) - 1] = NULL;
35   }
36   (*argc)--;
37   DEBUG2("<%d> New process in the game: %p", index, proc);
38 }
39
40 void smpi_process_destroy(void) {
41   int index = smpi_process_index();
42
43   DEBUG1("<%d> Process left the game", index);
44 }
45
46 static MPI_Request build_request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) {
47   MPI_Request request;
48
49   request = xbt_new(s_smpi_mpi_request_t, 1);
50   request->buf = buf;
51   request->size = smpi_datatype_size(datatype) * count;
52   request->src = src;
53   request->dst = dst;
54   request->tag = tag;
55   request->comm = comm;
56   request->rdv = NULL;
57   request->pair = NULL;
58   request->complete = 0;
59   request->match = MPI_REQUEST_NULL;
60   request->flags = flags;
61 #ifdef HAVE_TRACING
62   request->send = 0;
63   request->recv = 0;
64 #endif
65   return request;
66 }
67
68 /* MPI Low level calls */
69 MPI_Request smpi_mpi_send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
70   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, PERSISTENT | SEND);
71
72   return request;
73 }
74
75 MPI_Request smpi_mpi_recv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
76   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, PERSISTENT | RECV);
77
78   return request;
79 }
80
81 void smpi_mpi_start(MPI_Request request) {
82   xbt_assert0(request->complete == 0, "Cannot start a non-finished communication");
83   if((request->flags & RECV) == RECV) {
84     smpi_process_post_recv(request);
85     print_request("New recv", request);
86     request->pair = SIMIX_network_irecv(request->rdv, request->buf, &request->size);
87   } else {
88     smpi_process_post_send(request->comm, request); // FIXME
89     print_request("New send", request);
90     request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, request->buf, request->size, NULL);
91   }
92 }
93
94 void smpi_mpi_startall(int count, MPI_Request* requests) {
95   int i;
96
97   for(i = 0; i < count; i++) {
98     smpi_mpi_start(requests[i]);
99   }
100 }
101
102 void smpi_mpi_request_free(MPI_Request* request) {
103   xbt_free(*request);
104   *request = MPI_REQUEST_NULL;
105 }
106
107 MPI_Request smpi_isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
108   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, NON_PERSISTENT | SEND);
109
110   return request;
111 }
112
113 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
114   MPI_Request request = smpi_isend_init(buf, count, datatype, dst, tag, comm);
115
116   smpi_mpi_start(request);
117   return request;
118 }
119
120 MPI_Request smpi_irecv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
121   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, NON_PERSISTENT | RECV);
122
123   return request;
124 }
125
126 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
127   MPI_Request request = smpi_irecv_init(buf, count, datatype, src, tag, comm);
128
129   smpi_mpi_start(request);
130   return request;
131 }
132
133 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
134   MPI_Request request;
135
136   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
137   smpi_mpi_wait(&request, status);
138 }
139
140 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
141   MPI_Request request;
142
143   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
144   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
145 }
146
147 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
148   MPI_Request requests[2];
149   MPI_Status stats[2];
150
151   requests[0] = smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
152   requests[1] = smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
153   smpi_mpi_startall(2, requests);
154   smpi_mpi_waitall(2, requests, stats);
155   if(status != MPI_STATUS_IGNORE) {
156     // Copy receive status
157     memcpy(status, &stats[1], sizeof(MPI_Status));
158   }
159 }
160
161 int smpi_mpi_get_count(MPI_Status* status, MPI_Datatype datatype) {
162    return status->count / smpi_datatype_size(datatype);
163 }
164
165 static void finish_wait(MPI_Request* request, MPI_Status* status) {
166   if(status != MPI_STATUS_IGNORE) {
167     status->MPI_SOURCE = (*request)->src;
168     status->MPI_TAG = (*request)->tag;
169     status->MPI_ERROR = MPI_SUCCESS;
170     status->count = SIMIX_communication_get_dst_buf_size((*request)->pair);
171   }
172   print_request("finishing wait", *request);
173   if((*request)->complete == 1) {
174     SIMIX_rdv_destroy((*request)->rdv);
175   } else {
176     (*request)->match->complete = 1;
177     (*request)->match->match = MPI_REQUEST_NULL;
178   }
179   if(((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
180     smpi_mpi_request_free(request);
181   } else {
182     (*request)->rdv = NULL;
183     (*request)->pair = NULL;
184   }
185 }
186
187 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
188   int flag = (*request)->complete;
189
190   if(flag) {
191     smpi_mpi_wait(request, status);
192   }
193   return flag;
194 }
195
196 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
197   int i, flag;
198
199   *index = MPI_UNDEFINED;
200   flag = 0;
201   for(i = 0; i < count; i++) {
202     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
203       smpi_mpi_wait(&requests[i], status);
204       *index = i;
205       flag = 1;
206       break;
207     }
208   }
209   return flag;
210 }
211
212 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
213   print_request("wait", *request);
214   SIMIX_network_wait((*request)->pair, -1.0);
215   finish_wait(request, status);
216 }
217
218 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
219   xbt_dynar_t comms;
220   int i, size, index;
221   int* map;
222
223   index = MPI_UNDEFINED;
224   if(count > 0) {
225     // First check for already completed requests
226     for(i = 0; i < count; i++) {
227       if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
228         index = i;
229         smpi_mpi_wait(&requests[index], status);
230         break;
231       }
232     }
233     if(index == MPI_UNDEFINED) {
234       // Otherwise, wait for a request to complete
235       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
236       map = xbt_new(int, count);
237       size = 0;
238       DEBUG0("Wait for one of");
239       for(i = 0; i < count; i++) {
240         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
241           print_request("   ", requests[i]);
242           xbt_dynar_push(comms, &requests[i]->pair);
243           map[size] = i;
244           size++;
245         }
246       }
247       if(size > 0) {
248         index = SIMIX_network_waitany(comms);
249         index = map[index];
250         finish_wait(&requests[index], status);
251       }
252       xbt_free(map);
253       xbt_dynar_free(&comms);
254     }
255   }
256   return index;
257 }
258
259 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
260   int index;
261   MPI_Status stat;
262
263   while(count > 0) {
264     index = smpi_mpi_waitany(count, requests, &stat);
265     if(index == MPI_UNDEFINED) {
266       break;
267     }
268     if(status != MPI_STATUS_IGNORE) {
269       memcpy(&status[index], &stat, sizeof(stat));
270     }
271     // FIXME: check this -v
272     // Move the last request to the found position
273     requests[index] = requests[count - 1];
274     requests[count - 1] = MPI_REQUEST_NULL;
275     count--;
276   }
277 }
278
279 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
280   int i, count;
281
282   count = 0;
283   for(i = 0; i < incount; i++) {
284     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
285       smpi_mpi_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
286       indices[count] = i;
287       count++;
288     }
289   }
290   return count;
291 }
292
293 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
294   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
295   nary_tree_bcast(buf, count, datatype, root, comm, 4);
296 }
297
298 void smpi_mpi_barrier(MPI_Comm comm) {
299   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
300   nary_tree_barrier(comm, 4);
301 }
302
303 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
304   int system_tag = 666;
305   int rank, size, src, index, sendsize, recvsize;
306   MPI_Request* requests;
307
308   rank = smpi_comm_rank(comm);
309   size = smpi_comm_size(comm);
310   if(rank != root) {
311     // Send buffer to root
312     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
313   } else {
314     sendsize = smpi_datatype_size(sendtype);
315     recvsize = smpi_datatype_size(recvtype);
316     // Local copy from root
317     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
318     // Receive buffers from senders
319     requests = xbt_new(MPI_Request, size - 1);
320     index = 0;
321     for(src = 0; src < size; src++) {
322       if(src != root) {
323         requests[index] = smpi_irecv_init(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
324         index++;
325       }
326     }
327     // Wait for completion of irecv's.
328     smpi_mpi_startall(size - 1, requests);
329     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
330     xbt_free(requests);
331   }
332 }
333
334 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
335   int system_tag = 666;
336   int rank, size, src, index, sendsize;
337   MPI_Request* requests;
338
339   rank = smpi_comm_rank(comm);
340   size = smpi_comm_size(comm);
341   if(rank != root) {
342     // Send buffer to root
343     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
344   } else {
345     sendsize = smpi_datatype_size(sendtype);
346     // Local copy from root
347     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
348     // Receive buffers from senders
349     requests = xbt_new(MPI_Request, size - 1);
350     index = 0;
351     for(src = 0; src < size; src++) {
352       if(src != root) {
353         requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
354         index++;
355       }
356     }
357     // Wait for completion of irecv's.
358     smpi_mpi_startall(size - 1, requests);
359     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
360     xbt_free(requests);
361   }
362 }
363
364 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
365   int system_tag = 666;
366   int rank, size, other, index, sendsize, recvsize;
367   MPI_Request* requests;
368
369   rank = smpi_comm_rank(comm);
370   size = smpi_comm_size(comm);
371   sendsize = smpi_datatype_size(sendtype);
372   recvsize = smpi_datatype_size(recvtype);
373   // Local copy from self
374   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
375   // Send/Recv buffers to/from others;
376   requests = xbt_new(MPI_Request, 2 * (size - 1));
377   index = 0;
378   for(other = 0; other < size; other++) {
379     if(other != rank) {
380       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
381       index++;
382       requests[index] = smpi_irecv_init(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
383       index++;
384     }
385   }
386   // Wait for completion of all comms.
387   smpi_mpi_startall(2 * (size - 1), requests);
388   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
389   xbt_free(requests);
390 }
391
392 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
393   int system_tag = 666;
394   int rank, size, other, index, sendsize, recvsize;
395   MPI_Request* requests;
396
397   rank = smpi_comm_rank(comm);
398   size = smpi_comm_size(comm);
399   sendsize = smpi_datatype_size(sendtype);
400   recvsize = smpi_datatype_size(recvtype);
401   // Local copy from self
402   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
403   // Send buffers to others;
404   requests = xbt_new(MPI_Request, 2 * (size - 1));
405   index = 0;
406   for(other = 0; other < size; other++) {
407     if(other != rank) {
408       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
409       index++;
410       requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
411       index++;
412     }
413   }
414   // Wait for completion of all comms.
415   smpi_mpi_startall(2 * (size - 1), requests);
416   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
417   xbt_free(requests);
418 }
419
420 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
421   int system_tag = 666;
422   int rank, size, dst, index, sendsize, recvsize;
423   MPI_Request* requests;
424
425   rank = smpi_comm_rank(comm);
426   size = smpi_comm_size(comm);
427   if(rank != root) {
428     // Recv buffer from root
429     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
430   } else {
431     sendsize = smpi_datatype_size(sendtype);
432     recvsize = smpi_datatype_size(recvtype);
433     // Local copy from root
434     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
435     // Send buffers to receivers
436     requests = xbt_new(MPI_Request, size - 1);
437     index = 0;
438     for(dst = 0; dst < size; dst++) {
439       if(dst != root) {
440         requests[index] = smpi_isend_init(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
441         index++;
442       }
443     }
444     // Wait for completion of isend's.
445     smpi_mpi_startall(size - 1, requests);
446     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
447     xbt_free(requests);
448   }
449 }
450
451 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
452   int system_tag = 666;
453   int rank, size, dst, index, sendsize, recvsize;
454   MPI_Request* requests;
455
456   rank = smpi_comm_rank(comm);
457   size = smpi_comm_size(comm);
458   if(rank != root) {
459     // Recv buffer from root
460     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
461   } else {
462     sendsize = smpi_datatype_size(sendtype);
463     recvsize = smpi_datatype_size(recvtype);
464     // Local copy from root
465     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
466     // Send buffers to receivers
467     requests = xbt_new(MPI_Request, size - 1);
468     index = 0;
469     for(dst = 0; dst < size; dst++) {
470       if(dst != root) {
471         requests[index] = smpi_isend_init(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
472         index++;
473       }
474     }
475     // Wait for completion of isend's.
476     smpi_mpi_startall(size - 1, requests);
477     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
478     xbt_free(requests);
479   }
480 }
481
482 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
483   int system_tag = 666;
484   int rank, size, src, index, datasize;
485   MPI_Request* requests;
486   void** tmpbufs;
487
488   rank = smpi_comm_rank(comm);
489   size = smpi_comm_size(comm);
490   if(rank != root) {
491     // Send buffer to root
492     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
493   } else {
494     datasize = smpi_datatype_size(datatype);
495     // Local copy from root
496     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
497     // Receive buffers from senders
498     //TODO: make a MPI_barrier here ?
499     requests = xbt_new(MPI_Request, size - 1);
500     tmpbufs = xbt_new(void*, size - 1);
501     index = 0;
502     for(src = 0; src < size; src++) {
503       if(src != root) {
504         tmpbufs[index] = xbt_malloc(count * datasize);
505         requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
506         index++;
507       }
508     }
509     // Wait for completion of irecv's.
510     smpi_mpi_startall(size - 1, requests);
511     for(src = 0; src < size - 1; src++) {
512       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
513       if(index == MPI_UNDEFINED) {
514         break;
515       }
516       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
517     }
518     for(index = 0; index < size - 1; index++) {
519       xbt_free(tmpbufs[index]);
520     }
521     xbt_free(tmpbufs);
522     xbt_free(requests);
523   }
524 }
525
526 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
527   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
528   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
529
530 /*
531 FIXME: buggy implementation
532
533   int system_tag = 666;
534   int rank, size, other, index, datasize;
535   MPI_Request* requests;
536   void** tmpbufs;
537
538   rank = smpi_comm_rank(comm);
539   size = smpi_comm_size(comm);
540   datasize = smpi_datatype_size(datatype);
541   // Local copy from self
542   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
543   // Send/Recv buffers to/from others;
544   //TODO: make a MPI_barrier here ?
545   requests = xbt_new(MPI_Request, 2 * (size - 1));
546   tmpbufs = xbt_new(void*, size - 1);
547   index = 0;
548   for(other = 0; other < size; other++) {
549     if(other != rank) {
550       tmpbufs[index / 2] = xbt_malloc(count * datasize);
551       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
552       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
553       index += 2;
554     }
555   }
556   // Wait for completion of all comms.
557   for(other = 0; other < 2 * (size - 1); other++) {
558     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
559     if(index == MPI_UNDEFINED) {
560       break;
561     }
562     if((index & 1) == 1) {
563       // Request is odd: it's a irecv
564       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
565     }
566   }
567   for(index = 0; index < size - 1; index++) {
568     xbt_free(tmpbufs[index]);
569   }
570   xbt_free(tmpbufs);
571   xbt_free(requests);
572 */
573 }
574
575 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
576   int system_tag = 666;
577   int rank, size, other, index, datasize;
578   int total;
579   MPI_Request* requests;
580   void** tmpbufs;
581
582   rank = smpi_comm_rank(comm);
583   size = smpi_comm_size(comm);
584   datasize = smpi_datatype_size(datatype);
585   // Local copy from self
586   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
587   // Send/Recv buffers to/from others;
588   total = rank + (size - (rank + 1));
589   requests = xbt_new(MPI_Request, total);
590   tmpbufs = xbt_new(void*, rank);
591   index = 0;
592   for(other = 0; other < rank; other++) {
593     tmpbufs[index] = xbt_malloc(count * datasize);
594     requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
595     index++;
596   }
597   for(other = rank + 1; other < size; other++) {
598     requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
599     index++;
600   }
601   // Wait for completion of all comms.
602   smpi_mpi_startall(size - 1, requests);
603   for(other = 0; other < total; other++) {
604     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
605     if(index == MPI_UNDEFINED) {
606       break;
607     }
608     if(index < rank) {
609       // #Request is below rank: it's a irecv
610       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
611     }
612   }
613   for(index = 0; index < size - 1; index++) {
614     xbt_free(tmpbufs[index]);
615   }
616   xbt_free(tmpbufs);
617   xbt_free(requests);
618 }