1 /* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */
3 /* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
5 /* This program is free software; you can redistribute it and/or modify it
6 * under the terms of the license (GNU LGPL) which comes with this package. */
8 #include "colls_private.hpp"
9 #include "src/smpi/include/smpi_actor.hpp"
14 int colls::ibarrier(MPI_Comm comm, MPI_Request* request, int external)
16 int size = comm->size();
17 int rank = comm->rank();
18 int system_tag=COLL_TAG_BARRIER-external;
19 (*request) = new Request( nullptr, 0, MPI_BYTE,
20 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
22 MPI_Request* requests = new MPI_Request[2];
23 requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0,
26 requests[1] = Request::irecv (nullptr, 0, MPI_BYTE, 0,
29 (*request)->set_nbc_requests(requests, 2);
32 MPI_Request* requests = new MPI_Request[(size - 1) * 2];
33 for (int i = 1; i < 2 * size - 1; i += 2) {
34 requests[i - 1] = Request::irecv(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE, system_tag, comm);
35 requests[i] = Request::isend(nullptr, 0, MPI_BYTE, (i + 1) / 2, system_tag, comm);
37 (*request)->set_nbc_requests(requests, 2*(size-1));
42 int colls::ibcast(void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm, MPI_Request* request,
45 int size = comm->size();
46 int rank = comm->rank();
47 int system_tag=COLL_TAG_BCAST-external;
48 (*request) = new Request( nullptr, 0, MPI_BYTE,
49 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
51 MPI_Request* requests = new MPI_Request[1];
52 requests[0] = Request::irecv (buf, count, datatype, root,
55 (*request)->set_nbc_requests(requests, 1);
58 MPI_Request* requests = new MPI_Request[size - 1];
60 for (int i = 0; i < size; i++) {
62 requests[n] = Request::isend(buf, count, datatype, i,
69 (*request)->set_nbc_requests(requests, size-1);
74 int colls::iallgather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
75 MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
78 const int system_tag = COLL_TAG_ALLGATHER-external;
82 int rank = comm->rank();
83 int size = comm->size();
84 (*request) = new Request( nullptr, 0, MPI_BYTE,
85 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
86 // FIXME: check for errors
87 recvtype->extent(&lb, &recvext);
88 // Local copy from self
89 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount,
91 // Send/Recv buffers to/from others;
92 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
94 for (int other = 0; other < size; other++) {
96 requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm);
98 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + other * recvcount * recvext, recvcount, recvtype,
99 other, system_tag, comm);
103 Request::startall(2 * (size - 1), requests);
104 (*request)->set_nbc_requests(requests, 2 * (size - 1));
108 int colls::iscatter(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
109 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request, int external)
111 const int system_tag = COLL_TAG_SCATTER-external;
113 MPI_Aint sendext = 0;
115 int rank = comm->rank();
116 int size = comm->size();
117 (*request) = new Request( nullptr, 0, MPI_BYTE,
118 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
120 MPI_Request* requests = new MPI_Request[1];
121 // Recv buffer from root
122 requests[0] = Request::irecv(recvbuf, recvcount, recvtype, root, system_tag, comm);
123 (*request)->set_nbc_requests(requests, 1);
125 sendtype->extent(&lb, &sendext);
126 // Local copy from root
127 if(recvbuf!=MPI_IN_PLACE){
128 Datatype::copy(static_cast<const char *>(sendbuf) + root * sendcount * sendext,
129 sendcount, sendtype, recvbuf, recvcount, recvtype);
131 // Send buffers to receivers
132 MPI_Request* requests = new MPI_Request[size - 1];
134 for(int dst = 0; dst < size; dst++) {
136 requests[index] = Request::isend_init(static_cast<const char *>(sendbuf) + dst * sendcount * sendext, sendcount, sendtype,
137 dst, system_tag, comm);
141 // Wait for completion of isend's.
142 Request::startall(size - 1, requests);
143 (*request)->set_nbc_requests(requests, size - 1);
148 int colls::iallgatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, const int* recvcounts,
149 const int* displs, MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
151 const int system_tag = COLL_TAG_ALLGATHERV-external;
153 MPI_Aint recvext = 0;
155 int rank = comm->rank();
156 int size = comm->size();
157 (*request) = new Request( nullptr, 0, MPI_BYTE,
158 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
159 recvtype->extent(&lb, &recvext);
160 // Local copy from self
161 Datatype::copy(sendbuf, sendcount, sendtype,
162 static_cast<char *>(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype);
163 // Send buffers to others;
164 MPI_Request *requests = new MPI_Request[2 * (size - 1)];
166 for (int other = 0; other < size; other++) {
169 Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm);
171 requests[index] = Request::irecv_init(static_cast<char *>(recvbuf) + displs[other] * recvext, recvcounts[other],
172 recvtype, other, system_tag, comm);
176 // Wait for completion of all comms.
177 Request::startall(2 * (size - 1), requests);
178 (*request)->set_nbc_requests(requests, 2 * (size - 1));
182 int colls::ialltoall(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
183 MPI_Datatype recvtype, MPI_Comm comm, MPI_Request* request, int external)
185 int system_tag = COLL_TAG_ALLTOALL-external;
187 MPI_Aint sendext = 0;
188 MPI_Aint recvext = 0;
191 int rank = comm->rank();
192 int size = comm->size();
193 (*request) = new Request( nullptr, 0, MPI_BYTE,
194 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
195 sendtype->extent(&lb, &sendext);
196 recvtype->extent(&lb, &recvext);
197 /* simple optimization */
198 int err = Datatype::copy(static_cast<const char *>(sendbuf) + rank * sendcount * sendext, sendcount, sendtype,
199 static_cast<char *>(recvbuf) + rank * recvcount * recvext, recvcount, recvtype);
200 if (err == MPI_SUCCESS && size > 1) {
201 /* Initiate all send/recv to/from others. */
202 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
203 /* Post all receives first -- a simple optimization */
205 for (int i = (rank + 1) % size; i != rank; i = (i + 1) % size) {
206 requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + i * recvcount * recvext, recvcount,
207 recvtype, i, system_tag, comm);
210 /* Now post all sends in reverse order
211 * - We would like to minimize the search time through message queue
212 * when messages actually arrive in the order in which they were posted.
213 * TODO: check the previous assertion
215 for (int i = (rank + size - 1) % size; i != rank; i = (i + size - 1) % size) {
216 requests[count] = Request::isend_init(static_cast<const char *>(sendbuf) + i * sendcount * sendext, sendcount,
217 sendtype, i, system_tag, comm);
220 /* Wait for them all. */
221 Request::startall(count, requests);
222 (*request)->set_nbc_requests(requests, count);
227 int colls::ialltoallv(const void* sendbuf, const int* sendcounts, const int* senddisps, MPI_Datatype sendtype,
228 void* recvbuf, const int* recvcounts, const int* recvdisps, MPI_Datatype recvtype, MPI_Comm comm,
229 MPI_Request* request, int external)
231 const int system_tag = COLL_TAG_ALLTOALLV-external;
233 MPI_Aint sendext = 0;
234 MPI_Aint recvext = 0;
237 int rank = comm->rank();
238 int size = comm->size();
239 (*request) = new Request( nullptr, 0, MPI_BYTE,
240 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
241 sendtype->extent(&lb, &sendext);
242 recvtype->extent(&lb, &recvext);
243 /* Local copy from self */
244 int err = Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype,
245 static_cast<char *>(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype);
246 if (err == MPI_SUCCESS && size > 1) {
247 /* Initiate all send/recv to/from others. */
248 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
250 /* Create all receives that will be posted first */
251 for (int i = 0; i < size; ++i) {
253 requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i] * recvext,
254 recvcounts[i], recvtype, i, system_tag, comm);
257 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
260 /* Now create all sends */
261 for (int i = 0; i < size; ++i) {
263 requests[count] = Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] * sendext,
264 sendcounts[i], sendtype, i, system_tag, comm);
267 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
270 /* Wait for them all. */
271 Request::startall(count, requests);
272 (*request)->set_nbc_requests(requests, count);
277 int colls::ialltoallw(const void* sendbuf, const int* sendcounts, const int* senddisps, const MPI_Datatype* sendtypes,
278 void* recvbuf, const int* recvcounts, const int* recvdisps, const MPI_Datatype* recvtypes,
279 MPI_Comm comm, MPI_Request* request, int external)
281 const int system_tag = COLL_TAG_ALLTOALLW-external;
284 int rank = comm->rank();
285 int size = comm->size();
286 (*request) = new Request( nullptr, 0, MPI_BYTE,
287 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
288 /* Local copy from self */
289 int err = (sendcounts[rank]>0 && recvcounts[rank]) ? Datatype::copy(static_cast<const char *>(sendbuf) + senddisps[rank], sendcounts[rank], sendtypes[rank],
290 static_cast<char *>(recvbuf) + recvdisps[rank], recvcounts[rank], recvtypes[rank]): MPI_SUCCESS;
291 if (err == MPI_SUCCESS && size > 1) {
292 /* Initiate all send/recv to/from others. */
293 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
295 /* Create all receives that will be posted first */
296 for (int i = 0; i < size; ++i) {
298 requests[count] = Request::irecv_init(static_cast<char *>(recvbuf) + recvdisps[i],
299 recvcounts[i], recvtypes[i], i, system_tag, comm);
302 XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]);
305 /* Now create all sends */
306 for (int i = 0; i < size; ++i) {
308 requests[count] = Request::isend_init(static_cast<const char *>(sendbuf) + senddisps[i] ,
309 sendcounts[i], sendtypes[i], i, system_tag, comm);
312 XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]);
315 /* Wait for them all. */
316 Request::startall(count, requests);
317 (*request)->set_nbc_requests(requests, count);
322 int colls::igather(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
323 MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request, int external)
325 const int system_tag = COLL_TAG_GATHER-external;
327 MPI_Aint recvext = 0;
329 int rank = comm->rank();
330 int size = comm->size();
331 (*request) = new Request( nullptr, 0, MPI_BYTE,
332 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
334 // Send buffer to root
335 MPI_Request* requests = new MPI_Request[1];
336 requests[0]=Request::isend(sendbuf, sendcount, sendtype, root, system_tag, comm);
337 (*request)->set_nbc_requests(requests, 1);
339 recvtype->extent(&lb, &recvext);
340 // Local copy from root
341 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + root * recvcount * recvext,
342 recvcount, recvtype);
343 // Receive buffers from senders
344 MPI_Request* requests = new MPI_Request[size - 1];
346 for (int src = 0; src < size; src++) {
348 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + src * recvcount * recvext, recvcount, recvtype,
349 src, system_tag, comm);
353 // Wait for completion of irecv's.
354 Request::startall(size - 1, requests);
355 (*request)->set_nbc_requests(requests, size - 1);
360 int colls::igatherv(const void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, const int* recvcounts,
361 const int* displs, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request,
364 int system_tag = COLL_TAG_GATHERV-external;
366 MPI_Aint recvext = 0;
368 int rank = comm->rank();
369 int size = comm->size();
370 (*request) = new Request( nullptr, 0, MPI_BYTE,
371 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
373 // Send buffer to root
374 MPI_Request* requests = new MPI_Request[1];
375 requests[0]=Request::isend(sendbuf, sendcount, sendtype, root, system_tag, comm);
376 (*request)->set_nbc_requests(requests, 1);
378 recvtype->extent(&lb, &recvext);
379 // Local copy from root
380 Datatype::copy(sendbuf, sendcount, sendtype, static_cast<char*>(recvbuf) + displs[root] * recvext,
381 recvcounts[root], recvtype);
382 // Receive buffers from senders
383 MPI_Request* requests = new MPI_Request[size - 1];
385 for (int src = 0; src < size; src++) {
387 requests[index] = Request::irecv_init(static_cast<char*>(recvbuf) + displs[src] * recvext,
388 recvcounts[src], recvtype, src, system_tag, comm);
392 // Wait for completion of irecv's.
393 Request::startall(size - 1, requests);
394 (*request)->set_nbc_requests(requests, size - 1);
398 int colls::iscatterv(const void* sendbuf, const int* sendcounts, const int* displs, MPI_Datatype sendtype,
399 void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm, MPI_Request* request,
402 int system_tag = COLL_TAG_SCATTERV-external;
404 MPI_Aint sendext = 0;
406 int rank = comm->rank();
407 int size = comm->size();
408 (*request) = new Request( nullptr, 0, MPI_BYTE,
409 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
411 // Recv buffer from root
412 MPI_Request* requests = new MPI_Request[1];
413 requests[0]=Request::irecv(recvbuf, recvcount, recvtype, root, system_tag, comm);
414 (*request)->set_nbc_requests(requests, 1);
416 sendtype->extent(&lb, &sendext);
417 // Local copy from root
418 if(recvbuf!=MPI_IN_PLACE){
419 Datatype::copy(static_cast<const char *>(sendbuf) + displs[root] * sendext, sendcounts[root],
420 sendtype, recvbuf, recvcount, recvtype);
422 // Send buffers to receivers
423 MPI_Request *requests = new MPI_Request[size - 1];
425 for (int dst = 0; dst < size; dst++) {
427 requests[index] = Request::isend_init(static_cast<const char *>(sendbuf) + displs[dst] * sendext, sendcounts[dst],
428 sendtype, dst, system_tag, comm);
432 // Wait for completion of isend's.
433 Request::startall(size - 1, requests);
434 (*request)->set_nbc_requests(requests, size - 1);
439 int colls::ireduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root,
440 MPI_Comm comm, MPI_Request* request, int external)
442 const int system_tag = COLL_TAG_REDUCE-external;
444 MPI_Aint dataext = 0;
446 const void* real_sendbuf = sendbuf;
448 int rank = comm->rank();
449 int size = comm->size();
454 unsigned char* tmp_sendbuf = nullptr;
455 if( sendbuf == MPI_IN_PLACE ) {
456 tmp_sendbuf = smpi_get_tmp_sendbuffer(count * datatype->get_extent());
457 Datatype::copy(recvbuf, count, datatype, tmp_sendbuf, count, datatype);
458 real_sendbuf = tmp_sendbuf;
462 (*request) = new Request( recvbuf, count, datatype,
463 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
466 (*request) = new Request( nullptr, count, datatype,
467 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT);
470 // Send buffer to root
471 MPI_Request* requests = new MPI_Request[1];
472 requests[0] = Request::isend(real_sendbuf, count, datatype, root, system_tag, comm);
473 (*request)->set_nbc_requests(requests, 1);
475 datatype->extent(&lb, &dataext);
476 // Local copy from root
477 if (real_sendbuf != nullptr && recvbuf != nullptr)
478 Datatype::copy(real_sendbuf, count, datatype, recvbuf, count, datatype);
479 // Receive buffers from senders
480 MPI_Request *requests = new MPI_Request[size - 1];
482 for (int src = 0; src < size; src++) {
485 Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, src, system_tag, comm);
489 // Wait for completion of irecv's.
490 Request::startall(size - 1, requests);
491 (*request)->set_nbc_requests(requests, size - 1);
493 if( sendbuf == MPI_IN_PLACE ) {
494 smpi_free_tmp_buffer(tmp_sendbuf);
499 int colls::iallreduce(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
500 MPI_Request* request, int external)
503 const int system_tag = COLL_TAG_ALLREDUCE-external;
505 MPI_Aint dataext = 0;
507 int rank = comm->rank();
508 int size = comm->size();
509 (*request) = new Request( recvbuf, count, datatype,
510 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
511 // FIXME: check for errors
512 datatype->extent(&lb, &dataext);
513 // Local copy from self
514 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
515 // Send/Recv buffers to/from others;
516 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
518 for (int other = 0; other < size; other++) {
520 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag,comm);
522 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
523 other, system_tag, comm);
527 Request::startall(2 * (size - 1), requests);
528 (*request)->set_nbc_requests(requests, 2 * (size - 1));
532 int colls::iscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
533 MPI_Request* request, int external)
535 int system_tag = -888-external;
537 MPI_Aint dataext = 0;
539 int rank = comm->rank();
540 int size = comm->size();
541 (*request) = new Request( recvbuf, count, datatype,
542 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
543 datatype->extent(&lb, &dataext);
545 // Local copy from self
546 Datatype::copy(sendbuf, count, datatype, recvbuf, count, datatype);
548 // Send/Recv buffers to/from others
549 MPI_Request *requests = new MPI_Request[size - 1];
551 for (int other = 0; other < rank; other++) {
552 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm);
555 for (int other = rank + 1; other < size; other++) {
556 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
559 // Wait for completion of all comms.
560 Request::startall(size - 1, requests);
561 (*request)->set_nbc_requests(requests, size - 1);
565 int colls::iexscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
566 MPI_Request* request, int external)
568 int system_tag = -888-external;
570 MPI_Aint dataext = 0;
571 int rank = comm->rank();
572 int size = comm->size();
573 (*request) = new Request( recvbuf, count, datatype,
574 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
575 datatype->extent(&lb, &dataext);
577 memset(recvbuf, 0, count*dataext);
579 // Send/Recv buffers to/from others
580 MPI_Request *requests = new MPI_Request[size - 1];
582 for (int other = 0; other < rank; other++) {
583 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype, other, system_tag, comm);
586 for (int other = rank + 1; other < size; other++) {
587 requests[index] = Request::isend_init(sendbuf, count, datatype, other, system_tag, comm);
590 // Wait for completion of all comms.
591 Request::startall(size - 1, requests);
592 (*request)->set_nbc_requests(requests, size - 1);
596 int colls::ireduce_scatter(const void* sendbuf, void* recvbuf, const int* recvcounts, MPI_Datatype datatype, MPI_Op op,
597 MPI_Comm comm, MPI_Request* request, int external)
599 // Version where each process performs the reduce for its own part. Alltoall pattern for comms.
600 const int system_tag = COLL_TAG_REDUCE_SCATTER-external;
602 MPI_Aint dataext = 0;
604 int rank = comm->rank();
605 int size = comm->size();
606 int count=recvcounts[rank];
607 (*request) = new Request( recvbuf, count, datatype,
608 rank,rank, system_tag, comm, MPI_REQ_PERSISTENT, op);
609 datatype->extent(&lb, &dataext);
611 // Send/Recv buffers to/from others;
612 MPI_Request* requests = new MPI_Request[2 * (size - 1)];
615 for (int other = 0; other < size; other++) {
617 requests[index] = Request::isend_init(static_cast<const char *>(sendbuf) + recvdisp * dataext, recvcounts[other], datatype, other, system_tag,comm);
618 XBT_VERB("sending with recvdisp %d", recvdisp);
620 requests[index] = Request::irecv_init(smpi_get_tmp_sendbuffer(count * dataext), count, datatype,
621 other, system_tag, comm);
624 Datatype::copy(static_cast<const char *>(sendbuf) + recvdisp * dataext, count, datatype, recvbuf, count, datatype);
626 recvdisp+=recvcounts[other];
628 Request::startall(2 * (size - 1), requests);
629 (*request)->set_nbc_requests(requests, 2 * (size - 1));