Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
tracing a sub-set of point-to-point mpi functions
[simgrid.git] / src / smpi / smpi_base.c
1 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5   * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "xbt/time.h"
9
10 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi,
11                                 "Logging specific to SMPI (base)");
12 XBT_LOG_EXTERNAL_CATEGORY(smpi_base);
13 XBT_LOG_EXTERNAL_CATEGORY(smpi_bench);
14 XBT_LOG_EXTERNAL_CATEGORY(smpi_kernel);
15 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi);
16 XBT_LOG_EXTERNAL_CATEGORY(smpi_mpi_dt);
17 XBT_LOG_EXTERNAL_CATEGORY(smpi_coll);
18 XBT_LOG_EXTERNAL_CATEGORY(smpi_receiver);
19 XBT_LOG_EXTERNAL_CATEGORY(smpi_sender);
20 XBT_LOG_EXTERNAL_CATEGORY(smpi_util);
21
22 void smpi_process_init(int* argc, char*** argv) {
23   int index;
24   smpi_process_data_t data;
25   smx_process_t proc;
26
27   proc = SIMIX_process_self();
28   index = atoi((*argv)[1]);
29   data = smpi_process_remote_data(index);
30   SIMIX_process_set_data(proc, data);
31   if (*argc > 2) {
32     free((*argv)[1]);
33     memmove(&(*argv)[1], &(*argv)[2], sizeof(char *) * (*argc - 2));
34     (*argv)[(*argc) - 1] = NULL;
35   }
36   (*argc)--;
37   DEBUG2("<%d> New process in the game: %p", index, proc);
38 }
39
40 void smpi_process_destroy(void) {
41   int index = smpi_process_index();
42
43   DEBUG1("<%d> Process left the game", index);
44 }
45
46 static MPI_Request build_request(void* buf, int count, MPI_Datatype datatype, int src, int dst, int tag, MPI_Comm comm, unsigned flags) {
47   MPI_Request request;
48
49   request = xbt_new(s_smpi_mpi_request_t, 1);
50   request->buf = buf;
51   request->size = smpi_datatype_size(datatype) * count;
52   request->src = src;
53   request->dst = dst;
54   request->tag = tag;
55   request->comm = comm;
56   request->rdv = NULL;
57   request->pair = NULL;
58   request->complete = 0;
59   request->match = MPI_REQUEST_NULL;
60   request->flags = flags;
61 #ifdef HAVE_TRACING
62   request->send = 0;
63   request->recv = 0;
64 #endif
65   return request;
66 }
67
68 /* MPI Low level calls */
69 MPI_Request smpi_mpi_send_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
70   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, PERSISTENT | SEND);
71
72   return request;
73 }
74
75 MPI_Request smpi_mpi_recv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
76   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, PERSISTENT | RECV);
77
78   return request;
79 }
80
81 void smpi_mpi_start(MPI_Request request) {
82   xbt_assert0(request->complete == 0, "Cannot start a non-finished communication");
83   if((request->flags & RECV) == RECV) {
84     smpi_process_post_recv(request);
85     print_request("New recv", request);
86     request->pair = SIMIX_network_irecv(request->rdv, request->buf, &request->size);
87   } else {
88     smpi_process_post_send(request->comm, request); // FIXME
89     print_request("New send", request);
90     request->pair = SIMIX_network_isend(request->rdv, request->size, -1.0, request->buf, request->size, NULL);
91   }
92 }
93
94 void smpi_mpi_startall(int count, MPI_Request* requests) {
95   int i;
96
97   for(i = 0; i < count; i++) {
98     smpi_mpi_start(requests[i]);
99   }
100 }
101
102 void smpi_mpi_request_free(MPI_Request* request) {
103   xbt_free(*request);
104   *request = MPI_REQUEST_NULL;
105 }
106
107 MPI_Request smpi_isend_init(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
108   MPI_Request request = build_request(buf, count, datatype, smpi_comm_rank(comm), dst, tag, comm, NON_PERSISTENT | SEND);
109
110   return request;
111 }
112
113 MPI_Request smpi_mpi_isend(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
114   MPI_Request request = smpi_isend_init(buf, count, datatype, dst, tag, comm);
115
116   smpi_mpi_start(request);
117   return request;
118 }
119
120 MPI_Request smpi_irecv_init(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
121   MPI_Request request = build_request(buf, count, datatype, src, smpi_comm_rank(comm), tag, comm, NON_PERSISTENT | RECV);
122
123   return request;
124 }
125
126 MPI_Request smpi_mpi_irecv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm) {
127   MPI_Request request = smpi_irecv_init(buf, count, datatype, src, tag, comm);
128
129   smpi_mpi_start(request);
130   return request;
131 }
132
133 void smpi_mpi_recv(void* buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status* status) {
134   MPI_Request request;
135
136   request = smpi_mpi_irecv(buf, count, datatype, src, tag, comm);
137   smpi_mpi_wait(&request, status);
138 }
139
140 void smpi_mpi_send(void* buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) {
141   MPI_Request request;
142
143   request = smpi_mpi_isend(buf, count, datatype, dst, tag, comm);
144   smpi_mpi_wait(&request, MPI_STATUS_IGNORE);
145 }
146
147 void smpi_mpi_sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dst, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, MPI_Comm comm, MPI_Status* status) {
148   MPI_Request requests[2];
149   MPI_Status stats[2];
150
151   requests[0] = smpi_isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm);
152   requests[1] = smpi_irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm);
153   smpi_mpi_startall(2, requests);
154   smpi_mpi_waitall(2, requests, stats);
155   if(status != MPI_STATUS_IGNORE) {
156     // Copy receive status
157     memcpy(status, &stats[1], sizeof(MPI_Status));
158   }
159 }
160
161 static void finish_wait(MPI_Request* request, MPI_Status* status) {
162   if(status != MPI_STATUS_IGNORE) {
163     status->MPI_SOURCE = (*request)->src;
164     status->MPI_TAG = (*request)->tag;
165     status->MPI_ERROR = MPI_SUCCESS;
166   }
167   print_request("finishing wait", *request);
168   if((*request)->complete == 1) {
169     SIMIX_rdv_destroy((*request)->rdv);
170   } else {
171     (*request)->match->complete = 1;
172     (*request)->match->match = MPI_REQUEST_NULL;
173   }
174   if(((*request)->flags & NON_PERSISTENT) == NON_PERSISTENT) {
175     smpi_mpi_request_free(request);
176   } else {
177     (*request)->rdv = NULL;
178     (*request)->pair = NULL;
179   }
180 }
181
182 int smpi_mpi_test(MPI_Request* request, MPI_Status* status) {
183   int flag = (*request)->complete;
184
185   if(flag) {
186     smpi_mpi_wait(request, status);
187   }
188   return flag;
189 }
190
191 int smpi_mpi_testany(int count, MPI_Request requests[], int* index, MPI_Status* status) {
192   int i, flag;
193
194   *index = MPI_UNDEFINED;
195   flag = 0;
196   for(i = 0; i < count; i++) {
197     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
198       smpi_mpi_wait(&requests[i], status);
199       *index = i;
200       flag = 1;
201       break;
202     }
203   }
204   return flag;
205 }
206
207 void smpi_mpi_wait(MPI_Request* request, MPI_Status* status) {
208   print_request("wait", *request);
209   SIMIX_network_wait((*request)->pair, -1.0);
210   finish_wait(request, status);
211 }
212
213 int smpi_mpi_waitany(int count, MPI_Request requests[], MPI_Status* status) {
214   xbt_dynar_t comms;
215   int i, size, index;
216   int* map;
217
218   index = MPI_UNDEFINED;
219   if(count > 0) {
220     // First check for already completed requests
221     for(i = 0; i < count; i++) {
222       if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
223         index = i;
224         smpi_mpi_wait(&requests[index], status);
225         break;
226       }
227     }
228     if(index == MPI_UNDEFINED) {
229       // Otherwise, wait for a request to complete
230       comms = xbt_dynar_new(sizeof(smx_comm_t), NULL);
231       map = xbt_new(int, count);
232       size = 0;
233       DEBUG0("Wait for one of");
234       for(i = 0; i < count; i++) {
235         if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete == 0) {
236           print_request("   ", requests[i]);
237           xbt_dynar_push(comms, &requests[i]->pair);
238           map[size] = i;
239           size++;
240         }
241       }
242       if(size > 0) {
243         index = SIMIX_network_waitany(comms);
244         index = map[index];
245         finish_wait(&requests[index], status);
246       }
247       xbt_free(map);
248       xbt_dynar_free(&comms);
249     }
250   }
251   return index;
252 }
253
254 void smpi_mpi_waitall(int count, MPI_Request requests[],  MPI_Status status[]) {
255   int index;
256   MPI_Status stat;
257
258   while(count > 0) {
259     index = smpi_mpi_waitany(count, requests, &stat);
260     if(index == MPI_UNDEFINED) {
261       break;
262     }
263     if(status != MPI_STATUS_IGNORE) {
264       memcpy(&status[index], &stat, sizeof(stat));
265     }
266     // FIXME: check this -v
267     // Move the last request to the found position
268     requests[index] = requests[count - 1];
269     requests[count - 1] = MPI_REQUEST_NULL;
270     count--;
271   }
272 }
273
274 int smpi_mpi_waitsome(int incount, MPI_Request requests[], int* indices, MPI_Status status[]) {
275   int i, count;
276
277   count = 0;
278   for(i = 0; i < incount; i++) {
279     if(requests[i] != MPI_REQUEST_NULL && requests[i]->complete) {
280       smpi_mpi_wait(&requests[i], status != MPI_STATUS_IGNORE ? &status[i] : MPI_STATUS_IGNORE);
281       indices[count] = i;
282       count++;
283     }
284   }
285   return count;
286 }
287
288 void smpi_mpi_bcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
289   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
290   nary_tree_bcast(buf, count, datatype, root, comm, 4);
291 }
292
293 void smpi_mpi_barrier(MPI_Comm comm) {
294   // arity=2: a binary tree, arity=4 seem to be a good setting (see P2P-MPI))
295   nary_tree_barrier(comm, 4);
296 }
297
298 void smpi_mpi_gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
299   int system_tag = 666;
300   int rank, size, src, index, sendsize, recvsize;
301   MPI_Request* requests;
302
303   rank = smpi_comm_rank(comm);
304   size = smpi_comm_size(comm);
305   if(rank != root) {
306     // Send buffer to root
307     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
308   } else {
309     sendsize = smpi_datatype_size(sendtype);
310     recvsize = smpi_datatype_size(recvtype);
311     // Local copy from root
312     memcpy(&((char*)recvbuf)[root * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
313     // Receive buffers from senders
314     requests = xbt_new(MPI_Request, size - 1);
315     index = 0;
316     for(src = 0; src < size; src++) {
317       if(src != root) {
318         requests[index] = smpi_irecv_init(&((char*)recvbuf)[src * recvcount * recvsize], recvcount, recvtype, src, system_tag, comm);
319         index++;
320       }
321     }
322     // Wait for completion of irecv's.
323     smpi_mpi_startall(size - 1, requests);
324     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
325     xbt_free(requests);
326   }
327 }
328
329 void smpi_mpi_gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm) {
330   int system_tag = 666;
331   int rank, size, src, index, sendsize;
332   MPI_Request* requests;
333
334   rank = smpi_comm_rank(comm);
335   size = smpi_comm_size(comm);
336   if(rank != root) {
337     // Send buffer to root
338     smpi_mpi_send(sendbuf, sendcount, sendtype, root, system_tag, comm);
339   } else {
340     sendsize = smpi_datatype_size(sendtype);
341     // Local copy from root
342     memcpy(&((char*)recvbuf)[displs[root]], sendbuf, sendcount * sendsize * sizeof(char));
343     // Receive buffers from senders
344     requests = xbt_new(MPI_Request, size - 1);
345     index = 0;
346     for(src = 0; src < size; src++) {
347       if(src != root) {
348         requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[src]], recvcounts[src], recvtype, src, system_tag, comm);
349         index++;
350       }
351     }
352     // Wait for completion of irecv's.
353     smpi_mpi_startall(size - 1, requests);
354     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
355     xbt_free(requests);
356   }
357 }
358
359 void smpi_mpi_allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) {
360   int system_tag = 666;
361   int rank, size, other, index, sendsize, recvsize;
362   MPI_Request* requests;
363
364   rank = smpi_comm_rank(comm);
365   size = smpi_comm_size(comm);
366   sendsize = smpi_datatype_size(sendtype);
367   recvsize = smpi_datatype_size(recvtype);
368   // Local copy from self
369   memcpy(&((char*)recvbuf)[rank * recvcount * recvsize], sendbuf, sendcount * sendsize * sizeof(char));
370   // Send/Recv buffers to/from others;
371   requests = xbt_new(MPI_Request, 2 * (size - 1));
372   index = 0;
373   for(other = 0; other < size; other++) {
374     if(other != rank) {
375       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
376       index++;
377       requests[index] = smpi_irecv_init(&((char*)recvbuf)[other * recvcount * recvsize], recvcount, recvtype, other, system_tag, comm);
378       index++;
379     }
380   }
381   // Wait for completion of all comms.
382   smpi_mpi_startall(2 * (size - 1), requests);
383   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
384   xbt_free(requests);
385 }
386
387 void smpi_mpi_allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, MPI_Comm comm) {
388   int system_tag = 666;
389   int rank, size, other, index, sendsize, recvsize;
390   MPI_Request* requests;
391
392   rank = smpi_comm_rank(comm);
393   size = smpi_comm_size(comm);
394   sendsize = smpi_datatype_size(sendtype);
395   recvsize = smpi_datatype_size(recvtype);
396   // Local copy from self
397   memcpy(&((char*)recvbuf)[displs[rank]], sendbuf, sendcount * sendsize * sizeof(char));
398   // Send buffers to others;
399   requests = xbt_new(MPI_Request, 2 * (size - 1));
400   index = 0;
401   for(other = 0; other < size; other++) {
402     if(other != rank) {
403       requests[index] = smpi_isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
404       index++;
405       requests[index] = smpi_irecv_init(&((char*)recvbuf)[displs[other]], recvcounts[other], recvtype, other, system_tag, comm);
406       index++;
407     }
408   }
409   // Wait for completion of all comms.
410   smpi_mpi_startall(2 * (size - 1), requests);
411   smpi_mpi_waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE);
412   xbt_free(requests);
413 }
414
415 void smpi_mpi_scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
416   int system_tag = 666;
417   int rank, size, dst, index, sendsize, recvsize;
418   MPI_Request* requests;
419
420   rank = smpi_comm_rank(comm);
421   size = smpi_comm_size(comm);
422   if(rank != root) {
423     // Recv buffer from root
424     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
425   } else {
426     sendsize = smpi_datatype_size(sendtype);
427     recvsize = smpi_datatype_size(recvtype);
428     // Local copy from root
429     memcpy(recvbuf, &((char*)sendbuf)[root * sendcount * sendsize], recvcount * recvsize * sizeof(char));
430     // Send buffers to receivers
431     requests = xbt_new(MPI_Request, size - 1);
432     index = 0;
433     for(dst = 0; dst < size; dst++) {
434       if(dst != root) {
435         requests[index] = smpi_isend_init(&((char*)sendbuf)[dst * sendcount * sendsize], sendcount, sendtype, dst, system_tag, comm);
436         index++;
437       }
438     }
439     // Wait for completion of isend's.
440     smpi_mpi_startall(size - 1, requests);
441     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
442     xbt_free(requests);
443   }
444 }
445
446 void smpi_mpi_scatterv(void* sendbuf, int* sendcounts, int* displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) {
447   int system_tag = 666;
448   int rank, size, dst, index, sendsize, recvsize;
449   MPI_Request* requests;
450
451   rank = smpi_comm_rank(comm);
452   size = smpi_comm_size(comm);
453   if(rank != root) {
454     // Recv buffer from root
455     smpi_mpi_recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE);
456   } else {
457     sendsize = smpi_datatype_size(sendtype);
458     recvsize = smpi_datatype_size(recvtype);
459     // Local copy from root
460     memcpy(recvbuf, &((char*)sendbuf)[displs[root]], recvcount * recvsize * sizeof(char));
461     // Send buffers to receivers
462     requests = xbt_new(MPI_Request, size - 1);
463     index = 0;
464     for(dst = 0; dst < size; dst++) {
465       if(dst != root) {
466         requests[index] = smpi_isend_init(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, dst, system_tag, comm);
467         index++;
468       }
469     }
470     // Wait for completion of isend's.
471     smpi_mpi_startall(size - 1, requests);
472     smpi_mpi_waitall(size - 1, requests, MPI_STATUS_IGNORE);
473     xbt_free(requests);
474   }
475 }
476
477 void smpi_mpi_reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) {
478   int system_tag = 666;
479   int rank, size, src, index, datasize;
480   MPI_Request* requests;
481   void** tmpbufs;
482
483   rank = smpi_comm_rank(comm);
484   size = smpi_comm_size(comm);
485   if(rank != root) {
486     // Send buffer to root
487     smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
488   } else {
489     datasize = smpi_datatype_size(datatype);
490     // Local copy from root
491     memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
492     // Receive buffers from senders
493     //TODO: make a MPI_barrier here ?
494     requests = xbt_new(MPI_Request, size - 1);
495     tmpbufs = xbt_new(void*, size - 1);
496     index = 0;
497     for(src = 0; src < size; src++) {
498       if(src != root) {
499         tmpbufs[index] = xbt_malloc(count * datasize);
500         requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm);
501         index++;
502       }
503     }
504     // Wait for completion of irecv's.
505     smpi_mpi_startall(size - 1, requests);
506     for(src = 0; src < size - 1; src++) {
507       index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
508       if(index == MPI_UNDEFINED) {
509         break;
510       }
511       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
512     }
513     for(index = 0; index < size - 1; index++) {
514       xbt_free(tmpbufs[index]);
515     }
516     xbt_free(tmpbufs);
517     xbt_free(requests);
518   }
519 }
520
521 void smpi_mpi_allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
522   smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
523   smpi_mpi_bcast(recvbuf, count, datatype, 0, comm);
524
525 /*
526 FIXME: buggy implementation
527
528   int system_tag = 666;
529   int rank, size, other, index, datasize;
530   MPI_Request* requests;
531   void** tmpbufs;
532
533   rank = smpi_comm_rank(comm);
534   size = smpi_comm_size(comm);
535   datasize = smpi_datatype_size(datatype);
536   // Local copy from self
537   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
538   // Send/Recv buffers to/from others;
539   //TODO: make a MPI_barrier here ?
540   requests = xbt_new(MPI_Request, 2 * (size - 1));
541   tmpbufs = xbt_new(void*, size - 1);
542   index = 0;
543   for(other = 0; other < size; other++) {
544     if(other != rank) {
545       tmpbufs[index / 2] = xbt_malloc(count * datasize);
546       requests[index] = smpi_mpi_isend(sendbuf, count, datatype, other, system_tag, comm);
547       requests[index + 1] = smpi_mpi_irecv(tmpbufs[index / 2], count, datatype, other, system_tag, comm);
548       index += 2;
549     }
550   }
551   // Wait for completion of all comms.
552   for(other = 0; other < 2 * (size - 1); other++) {
553     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
554     if(index == MPI_UNDEFINED) {
555       break;
556     }
557     if((index & 1) == 1) {
558       // Request is odd: it's a irecv
559       smpi_op_apply(op, tmpbufs[index / 2], recvbuf, &count, &datatype);
560     }
561   }
562   for(index = 0; index < size - 1; index++) {
563     xbt_free(tmpbufs[index]);
564   }
565   xbt_free(tmpbufs);
566   xbt_free(requests);
567 */
568 }
569
570 void smpi_mpi_scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) {
571   int system_tag = 666;
572   int rank, size, other, index, datasize;
573   int total;
574   MPI_Request* requests;
575   void** tmpbufs;
576
577   rank = smpi_comm_rank(comm);
578   size = smpi_comm_size(comm);
579   datasize = smpi_datatype_size(datatype);
580   // Local copy from self
581   memcpy(recvbuf, sendbuf, count * datasize * sizeof(char));
582   // Send/Recv buffers to/from others;
583   total = rank + (size - (rank + 1));
584   requests = xbt_new(MPI_Request, total);
585   tmpbufs = xbt_new(void*, rank);
586   index = 0;
587   for(other = 0; other < rank; other++) {
588     tmpbufs[index] = xbt_malloc(count * datasize);
589     requests[index] = smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, comm);
590     index++;
591   }
592   for(other = rank + 1; other < size; other++) {
593     requests[index] = smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm);
594     index++;
595   }
596   // Wait for completion of all comms.
597   smpi_mpi_startall(size - 1, requests);
598   for(other = 0; other < total; other++) {
599     index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE);
600     if(index == MPI_UNDEFINED) {
601       break;
602     }
603     if(index < rank) {
604       // #Request is below rank: it's a irecv
605       smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype);
606     }
607   }
608   for(index = 0; index < size - 1; index++) {
609     xbt_free(tmpbufs[index]);
610   }
611   xbt_free(tmpbufs);
612   xbt_free(requests);
613 }