1 /* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */
3 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "colls_private.hpp"
9 #include "src/smpi/include/smpi_actor.hpp"
15 int Colls::ibarrier(MPI_Comm comm, MPI_Request* request, int external)
17 int size = comm->size();
18 int rank = comm->rank();
19 int system_tag=COLL_TAG_BARRIER-external;
20 (*request) = new Request( nullptr, 0, MPI_BYTE,
21 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
23 MPI_Request* requests = new MPI_Request[2];
24 requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0,
27 requests[1] = Request::irecv (nullptr, 0, MPI_BYTE, 0,
30 (*request)->set_nbc_requests(requests, 2);
33 MPI_Request* requests = new MPI_Request[(size - 1) * 2];
34 for (int i = 1; i < 2 * size - 1; i += 2) {
35 requests[i - 1] = Request::irecv(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE, system_tag, comm);
36 requests[i] = Request::isend(nullptr, 0, MPI_BYTE, (i + 1) / 2, system_tag, comm);
38 (*request)->set_nbc_requests(requests, 2*(size-1));
43 int Colls::ibcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, MPI_Request* request, int external)
45 int size = comm->size();
46 int rank = comm->rank();
47 int system_tag=COLL_TAG_BCAST-external;
48 (*request) = new Request( nullptr, 0, MPI_BYTE,
49 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
51 MPI_Request* requests = new MPI_Request[1];
52 requests[0] = Request::irecv (buf, count, datatype, root,
55 (*request)->set_nbc_requests(requests, 1);
58 MPI_Request* requests = new MPI_Request[size - 1];
60 for (int i = 0; i < size; i++) {
62 requests[n] = Request::isend(buf, count, datatype, i,
69 (*request)->set_nbc_requests(requests, size-1);
74 int Colls::iallgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
75 void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
78 const int system_tag = COLL_TAG_ALLGATHER-external;
82 int rank = comm->rank();
83 int size = comm->size();
84 (*request) = new Request( nullptr, 0, MPI_BYTE,
85 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
86 // FIXME: check for errors
87 recvtype->extent(&lb, &recvext);
88 // Local copy from self
89 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
91 // Send/Recv buffers to/from others;
92 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
94 for (int other = 0; other < size; other++) {
96 requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
98 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
99 other, system_tag, comm);
103 Request::startall(2 * (size - 1), requests);
104 (*request)->set_nbc_requests(requests, 2 * (size - 1));
108 int Colls::iscatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
109 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request, int external)
111 const int system_tag = COLL_TAG_SCATTER-external;
113 MPI_Aint sendext = 0;
115 int rank = comm->rank();
116 int size = comm->size();
117 (*request) = new Request( nullptr, 0, MPI_BYTE,
118 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
120 MPI_Request* requests = new MPI_Request[1];
121 // Recv buffer from root
122 requests[0] = Request::irecv(recvbuf, recvcount, recvtype, root, system_tag, comm);
123 (*request)->set_nbc_requests(requests, 1);
125 sendtype->extent(&lb, &sendext);
126 // Local copy from root
127 if(recvbuf!=MPI_IN_PLACE){
128 Datatype::copy(static_cast<const char *>(sendbuf) + root * sendcount * sendext,
129 sendcount, sendtype, recvbuf, recvcount, recvtype);
131 // Send buffers to receivers
132 MPI_Request* requests = new MPI_Request[size - 1];
134 for(int dst = 0; dst < size; dst++) {
136 requests[index] = Request::isend_init(static_cast<const char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
137 dst, system_tag, comm);
141 // Wait for completion of isend's.
142 Request::startall(size - 1, requests);
143 (*request)->set_nbc_requests(requests, size - 1);
148 int Colls::iallgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
149 const int *recvcounts, const int *displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
151 const int system_tag = COLL_TAG_ALLGATHERV-external;
153 MPI_Aint recvext = 0;
155 int rank = comm->rank();
156 int size = comm->size();
157 (*request) = new Request( nullptr, 0, MPI_BYTE,
158 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
159 recvtype->extent(&lb, &recvext);
160 // Local copy from self
161 Datatype::copy(sendbuf, sendcount, sendtype,
162 static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
163 // Send buffers to others;
164 MPI_Request *requests = new MPI_Request[2 * (size - 1)];
166 for (int other = 0; other < size; other++) {
169 Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
171 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
172 recvtype, other, system_tag, comm);
176 // Wait for completion of all comms.
177 Request::startall(2 * (size - 1), requests);
178 (*request)->set_nbc_requests(requests, 2 * (size - 1));
182 int Colls::ialltoall( const void *sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external){
183 int system_tag = COLL_TAG_ALLTOALL-external;
185 MPI_Aint sendext = 0;
186 MPI_Aint recvext = 0;
189 int rank = comm->rank();
190 int size = comm->size();
191 (*request) = new Request( nullptr, 0, MPI_BYTE,
192 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
193 sendtype->extent(&lb, &sendext);
194 recvtype->extent(&lb, &recvext);
195 /* simple optimization */
196 int err = Datatype::copy(static_cast<const char *>(sendbuf) + rank * sendcount * sendext, sendcount, sendtype,
197 static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount, recvtype);
198 if (err == MPI_SUCCESS && size > 1) {
199 /* Initiate all send/recv to/from others. */
200 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
201 /* Post all receives first -- a simple optimization */
203 for (int i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
204 requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + i * recvcount * recvext, recvcount,
205 recvtype, i, system_tag, comm);
208 /* Now post all sends in reverse order
209 * - We would like to minimize the search time through message queue
210 * when messages actually arrive in the order in which they were posted.
211 * TODO: check the previous assertion
213 for (int i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size) {
214 requests[count] = Request::isend_init(static_cast<const char *>(sendbuf) + i * sendcount * sendext, sendcount,
215 sendtype, i, system_tag, comm);
218 /* Wait for them all. */
219 Request::startall(count, requests);
220 (*request)->set_nbc_requests(requests, count);
225 int Colls::ialltoallv(const void *sendbuf, const int *sendcounts, const int *senddisps, MPI_Datatype sendtype,
226 void *recvbuf, const int *recvcounts, const int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request *request, int external){
227 const int system_tag = COLL_TAG_ALLTOALLV-external;
229 MPI_Aint sendext = 0;
230 MPI_Aint recvext = 0;
233 int rank = comm->rank();
234 int size = comm->size();
235 (*request) = new Request( nullptr, 0, MPI_BYTE,
236 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
237 sendtype->extent(&lb, &sendext);
238 recvtype->extent(&lb, &recvext);
239 /* Local copy from self */
240 int err = Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
241 static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
242 if (err == MPI_SUCCESS && size > 1) {
243 /* Initiate all send/recv to/from others. */
244 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
246 /* Create all receives that will be posted first */
247 for (int i = 0; i < size; ++i) {
249 requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
250 recvcounts[i], recvtype, i, system_tag, comm);
253 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
256 /* Now create all sends */
257 for (int i = 0; i < size; ++i) {
259 requests[count] = Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] * sendext,
260 sendcounts[i], sendtype, i, system_tag, comm);
263 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
266 /* Wait for them all. */
267 Request::startall(count, requests);
268 (*request)->set_nbc_requests(requests, count);
273 int Colls::ialltoallw(const void *sendbuf, const int *sendcounts, const int *senddisps, const MPI_Datatype* sendtypes,
274 void *recvbuf, const int *recvcounts, const int *recvdisps, const MPI_Datatype* recvtypes, MPI_Comm comm, MPI_Request *request, int external){
275 const int system_tag = COLL_TAG_ALLTOALLW-external;
278 int rank = comm->rank();
279 int size = comm->size();
280 (*request) = new Request( nullptr, 0, MPI_BYTE,
281 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
282 /* Local copy from self */
283 int err = (sendcounts[rank]>0 && recvcounts[rank]) ? Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank], sendcounts[rank], sendtypes[rank],
284 static_cast<char *>(recvbuf) + recvdisps[rank], recvcounts[rank], recvtypes[rank]): MPI_SUCCESS;
285 if (err == MPI_SUCCESS && size > 1) {
286 /* Initiate all send/recv to/from others. */
287 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
289 /* Create all receives that will be posted first */
290 for (int i = 0; i < size; ++i) {
292 requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i],
293 recvcounts[i], recvtypes[i], i, system_tag, comm);
296 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
299 /* Now create all sends */
300 for (int i = 0; i < size; ++i) {
302 requests[count] = Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] ,
303 sendcounts[i], sendtypes[i], i, system_tag, comm);
306 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
309 /* Wait for them all. */
310 Request::startall(count, requests);
311 (*request)->set_nbc_requests(requests, count);
316 int Colls::igather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
317 void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request, int external)
319 const int system_tag = COLL_TAG_GATHER-external;
321 MPI_Aint recvext = 0;
323 int rank = comm->rank();
324 int size = comm->size();
325 (*request) = new Request( nullptr, 0, MPI_BYTE,
326 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
328 // Send buffer to root
329 MPI_Request* requests = new MPI_Request[1];
330 requests[0]=Request::isend(sendbuf, sendcount, sendtype, root, system_tag, comm);
331 (*request)->set_nbc_requests(requests, 1);
333 recvtype->extent(&lb, &recvext);
334 // Local copy from root
335 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
336 recvcount, recvtype);
337 // Receive buffers from senders
338 MPI_Request* requests = new MPI_Request[size - 1];
340 for (int src = 0; src < size; src++) {
342 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
343 src, system_tag, comm);
347 // Wait for completion of irecv's.
348 Request::startall(size - 1, requests);
349 (*request)->set_nbc_requests(requests, size - 1);
354 int Colls::igatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int *recvcounts, const int *displs,
355 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request, int external)
357 int system_tag = COLL_TAG_GATHERV-external;
359 MPI_Aint recvext = 0;
361 int rank = comm->rank();
362 int size = comm->size();
363 (*request) = new Request( nullptr, 0, MPI_BYTE,
364 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
366 // Send buffer to root
367 MPI_Request* requests = new MPI_Request[1];
368 requests[0]=Request::isend(sendbuf, sendcount, sendtype, root, system_tag, comm);
369 (*request)->set_nbc_requests(requests, 1);
371 recvtype->extent(&lb, &recvext);
372 // Local copy from root
373 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
374 recvcounts[root], recvtype);
375 // Receive buffers from senders
376 MPI_Request* requests = new MPI_Request[size - 1];
378 for (int src = 0; src < size; src++) {
380 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
381 recvcounts[src], recvtype, src, system_tag, comm);
385 // Wait for completion of irecv's.
386 Request::startall(size - 1, requests);
387 (*request)->set_nbc_requests(requests, size - 1);
391 int Colls::iscatterv(const void *sendbuf, const int *sendcounts, const int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount,
392 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request *request, int external)
394 int system_tag = COLL_TAG_SCATTERV-external;
396 MPI_Aint sendext = 0;
398 int rank = comm->rank();
399 int size = comm->size();
400 (*request) = new Request( nullptr, 0, MPI_BYTE,
401 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
403 // Recv buffer from root
404 MPI_Request* requests = new MPI_Request[1];
405 requests[0]=Request::irecv(recvbuf, recvcount, recvtype, root, system_tag, comm);
406 (*request)->set_nbc_requests(requests, 1);
408 sendtype->extent(&lb, &sendext);
409 // Local copy from root
410 if(recvbuf!=MPI_IN_PLACE){
411 Datatype::copy(static_cast<const char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
412 sendtype, recvbuf, recvcount, recvtype);
414 // Send buffers to receivers
415 MPI_Request *requests = new MPI_Request[size - 1];
417 for (int dst = 0; dst < size; dst++) {
419 requests[index] = Request::isend_init(static_cast<const char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
420 sendtype, dst, system_tag, comm);
424 // Wait for completion of isend's.
425 Request::startall(size - 1, requests);
426 (*request)->set_nbc_requests(requests, size - 1);
431 int Colls::ireduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
432 MPI_Comm comm, MPI_Request* request, int external)
434 const int system_tag = COLL_TAG_REDUCE-external;
436 MPI_Aint dataext = 0;
438 const void* real_sendbuf = sendbuf;
440 int rank = comm->rank();
441 int size = comm->size();
446 unsigned char* tmp_sendbuf = nullptr;
447 if( sendbuf == MPI_IN_PLACE ) {
448 tmp_sendbuf = smpi_get_tmp_sendbuffer(count * datatype->get_extent());
449 Datatype::copy(recvbuf, count, datatype, tmp_sendbuf, count, datatype);
450 real_sendbuf = tmp_sendbuf;
454 (*request) = new Request( recvbuf, count, datatype,
455 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
458 (*request) = new Request( nullptr, count, datatype,
459 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
462 // Send buffer to root
463 MPI_Request* requests = new MPI_Request[1];
464 requests[0] = Request::isend(real_sendbuf, count, datatype, root, system_tag, comm);
465 (*request)->set_nbc_requests(requests, 1);
467 datatype->extent(&lb, &dataext);
468 // Local copy from root
469 if (real_sendbuf != nullptr && recvbuf != nullptr)
470 Datatype::copy(real_sendbuf, count, datatype, recvbuf, count, datatype);
471 // Receive buffers from senders
472 MPI_Request *requests = new MPI_Request[size - 1];
474 for (int src = 0; src < size; src++) {
477 Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, src, system_tag, comm);
481 // Wait for completion of irecv's.
482 Request::startall(size - 1, requests);
483 (*request)->set_nbc_requests(requests, size - 1);
485 if( sendbuf == MPI_IN_PLACE ) {
486 smpi_free_tmp_buffer(tmp_sendbuf);
491 int Colls::iallreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
492 MPI_Op op, MPI_Comm comm, MPI_Request* request, int external)
495 const int system_tag = COLL_TAG_ALLREDUCE-external;
497 MPI_Aint dataext = 0;
499 int rank = comm->rank();
500 int size = comm->size();
501 (*request) = new Request( recvbuf, count, datatype,
502 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
503 // FIXME: check for errors
504 datatype->extent(&lb, &dataext);
505 // Local copy from self
506 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
507 // Send/Recv buffers to/from others;
508 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
510 for (int other = 0; other < size; other++) {
512 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag,comm);
514 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
515 other, system_tag, comm);
519 Request::startall(2 * (size - 1), requests);
520 (*request)->set_nbc_requests(requests, 2 * (size - 1));
524 int Colls::iscan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request* request, int external)
526 int system_tag = -888-external;
528 MPI_Aint dataext = 0;
530 int rank = comm->rank();
531 int size = comm->size();
532 (*request) = new Request( recvbuf, count, datatype,
533 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
534 datatype->extent(&lb, &dataext);
536 // Local copy from self
537 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
539 // Send/Recv buffers to/from others
540 MPI_Request *requests = new MPI_Request[size - 1];
542 for (int other = 0; other < rank; other++) {
543 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm);
546 for (int other = rank + 1; other < size; other++) {
547 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
550 // Wait for completion of all comms.
551 Request::startall(size - 1, requests);
552 (*request)->set_nbc_requests(requests, size - 1);
556 int Colls::iexscan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm, MPI_Request* request, int external)
558 int system_tag = -888-external;
560 MPI_Aint dataext = 0;
561 int rank = comm->rank();
562 int size = comm->size();
563 (*request) = new Request( recvbuf, count, datatype,
564 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
565 datatype->extent(&lb, &dataext);
567 memset(recvbuf, 0, count*dataext);
569 // Send/Recv buffers to/from others
570 MPI_Request *requests = new MPI_Request[size - 1];
572 for (int other = 0; other < rank; other++) {
573 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm);
576 for (int other = rank + 1; other < size; other++) {
577 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
580 // Wait for completion of all comms.
581 Request::startall(size - 1, requests);
582 (*request)->set_nbc_requests(requests, size - 1);
586 int Colls::ireduce_scatter(const void *sendbuf, void *recvbuf, const int *recvcounts, MPI_Datatype datatype, MPI_Op op,
587 MPI_Comm comm, MPI_Request* request, int external){
588 //Version where each process performs the reduce for its own part. Alltoall pattern for comms.
589 const int system_tag = COLL_TAG_REDUCE_SCATTER-external;
591 MPI_Aint dataext = 0;
593 int rank = comm->rank();
594 int size = comm->size();
595 int count=recvcounts[rank];
596 (*request) = new Request( recvbuf, count, datatype,
597 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
598 datatype->extent(&lb, &dataext);
600 // Send/Recv buffers to/from others;
601 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
604 for (int other = 0; other < size; other++) {
606 requests[index] = Request::isend_init(static_cast<const char *>(sendbuf) + recvdisp * dataext, recvcounts[other], datatype, other, system_tag,comm);
607 XBT_VERB("sending with recvdisp %d", recvdisp);
609 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
610 other, system_tag, comm);
613 Datatype::copy(static_cast<const char *>(sendbuf) + recvdisp * dataext, count, datatype, recvbuf, count, datatype);
615 recvdisp+=recvcounts[other];
617 Request::startall(2 * (size - 1), requests);
618 (*request)->set_nbc_requests(requests, 2 * (size - 1));