X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/d656f9465e7b1c36319f86eeafdcec58fe9551a4..e78a6834f8614211434b49a8ab4802b5a75bf12d:/src/smpi/colls/reduce_scatter/reduce_scatter-ompi.cpp diff --git a/src/smpi/colls/reduce_scatter/reduce_scatter-ompi.cpp b/src/smpi/colls/reduce_scatter/reduce_scatter-ompi.cpp index 1bc913c448..6398ff37d9 100644 --- a/src/smpi/colls/reduce_scatter/reduce_scatter-ompi.cpp +++ b/src/smpi/colls/reduce_scatter/reduce_scatter-ompi.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2013-2017. The SimGrid Team. +/* Copyright (c) 2013-2022. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it @@ -41,22 +41,20 @@ * Returns: - MPI_SUCCESS or error code * Limitation: - Works only for commutative operations. */ -namespace simgrid{ -namespace smpi{ -int -Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, - void *rbuf, - int *rcounts, - MPI_Datatype dtype, - MPI_Op op, - MPI_Comm comm - ) +namespace simgrid { +namespace smpi { +int reduce_scatter__ompi_basic_recursivehalving(const void *sbuf, + void *rbuf, + const int *rcounts, + MPI_Datatype dtype, + MPI_Op op, + MPI_Comm comm + ) { int i, rank, size, count, err = MPI_SUCCESS; - int tmp_size=1, remain = 0, tmp_rank, *disps = NULL; + int tmp_size = 1, remain = 0, tmp_rank; ptrdiff_t true_lb, true_extent, lb, extent, buf_size; - char *recv_buf = NULL, *recv_buf_free = NULL; - char *result_buf = NULL, *result_buf_free = NULL; + unsigned char *result_buf = nullptr, *result_buf_free = nullptr; /* Initialize */ rank = comm->rank(); @@ -64,11 +62,11 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank); if ((op != MPI_OP_NULL && not op->is_commutative())) - THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! "); + throw std::invalid_argument( + "reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations!"); /* Find displacements and the like */ - disps = (int*) xbt_malloc(sizeof(int) * size); - if (NULL == disps) return MPI_ERR_OTHER; + int* disps = new int[size]; disps[0] = 0; for (i = 0; i < (size - 1); ++i) { @@ -78,8 +76,8 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, /* short cut the trivial case */ if (0 == count) { - xbt_free(disps); - return MPI_SUCCESS; + delete[] disps; + return MPI_SUCCESS; } /* get datatype information */ @@ -93,17 +91,15 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, } /* Allocate temporary receive buffer. */ - recv_buf_free = (char*) smpi_get_tmp_recvbuffer(buf_size); - - recv_buf = recv_buf_free - lb; - if (NULL == recv_buf_free) { - err = MPI_ERR_OTHER; - goto cleanup; + unsigned char* recv_buf_free = smpi_get_tmp_recvbuffer(buf_size); + unsigned char* recv_buf = recv_buf_free - lb; + if (nullptr == recv_buf_free) { + err = MPI_ERR_OTHER; + goto cleanup; } /* allocate temporary buffer for results */ - result_buf_free = (char*) smpi_get_tmp_sendbuffer(buf_size); - + result_buf_free = smpi_get_tmp_sendbuffer(buf_size); result_buf = result_buf_free - lb; /* copy local buffer into the temporary results */ @@ -147,23 +143,13 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, /* For ranks not kicked out by the above code, perform the recursive halving */ if (tmp_rank >= 0) { - int *tmp_disps = NULL, *tmp_rcounts = NULL; int mask, send_index, recv_index, last_index; /* recalculate disps and rcounts to account for the special "remainder" processes that are no longer doing anything */ - tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int)); - if (NULL == tmp_rcounts) { - err = MPI_ERR_OTHER; - goto cleanup; - } - tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int)); - if (NULL == tmp_disps) { - xbt_free(tmp_rcounts); - err = MPI_ERR_OTHER; - goto cleanup; - } + int* tmp_rcounts = new int[tmp_size]; + int* tmp_disps = new int[tmp_size]; for (i = 0 ; i < tmp_size ; ++i) { if (i < remain) { @@ -221,9 +207,9 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, COLL_TAG_REDUCE_SCATTER, comm); if (MPI_SUCCESS != err) { - xbt_free(tmp_rcounts); - xbt_free(tmp_disps); - goto cleanup; + delete[] tmp_rcounts; + delete[] tmp_disps; + goto cleanup; } } if (recv_count > 0 && send_count != 0) { @@ -232,9 +218,9 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, COLL_TAG_REDUCE_SCATTER, comm); if (MPI_SUCCESS != err) { - xbt_free(tmp_rcounts); - xbt_free(tmp_disps); - goto cleanup; + delete[] tmp_rcounts; + delete[] tmp_disps; + goto cleanup; } } if (send_count > 0 && recv_count != 0) { @@ -262,14 +248,14 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, rcounts[rank], dtype, rbuf, rcounts[rank], dtype); if (MPI_SUCCESS != err) { - xbt_free(tmp_rcounts); - xbt_free(tmp_disps); - goto cleanup; + delete[] tmp_rcounts; + delete[] tmp_disps; + goto cleanup; } } - xbt_free(tmp_rcounts); - xbt_free(tmp_disps); + delete[] tmp_rcounts; + delete[] tmp_disps; } /* Now fix up the non-power of two case, by having the odd @@ -292,9 +278,11 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, } cleanup: - if (NULL != disps) xbt_free(disps); - if (NULL != recv_buf_free) smpi_free_tmp_buffer(recv_buf_free); - if (NULL != result_buf_free) smpi_free_tmp_buffer(result_buf_free); + delete[] disps; + if (nullptr != recv_buf_free) + smpi_free_tmp_buffer(recv_buf_free); + if (nullptr != result_buf_free) + smpi_free_tmp_buffer(result_buf_free); return err; } @@ -361,19 +349,18 @@ Coll_reduce_scatter_ompi_basic_recursivehalving::reduce_scatter(void *sbuf, * DONE :) * */ -int -Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcounts, - MPI_Datatype dtype, - MPI_Op op, - MPI_Comm comm - ) +int reduce_scatter__ompi_ring(const void *sbuf, void *rbuf, const int *rcounts, + MPI_Datatype dtype, + MPI_Op op, + MPI_Comm comm + ) { int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count; - int inbi, *displs = NULL; - char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL; - char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL}; + int inbi; + unsigned char *tmpsend = nullptr, *tmprecv = nullptr, *accumbuf = nullptr, *accumbuf_free = nullptr; + unsigned char *inbuf_free[2] = {nullptr, nullptr}, *inbuf[2] = {nullptr, nullptr}; ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize; - MPI_Request reqs[2] = {NULL, NULL}; + MPI_Request reqs[2] = {nullptr, nullptr}; size = comm->size(); rank = comm->rank(); @@ -384,8 +371,8 @@ Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcoun /* Determine the maximum number of elements per node, corresponding block size, and displacements array. */ - displs = (int*) xbt_malloc(size * sizeof(int)); - if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; } + int* displs = new int[size]; + displs[0] = 0; total_count = rcounts[0]; max_block_count = rcounts[0]; @@ -401,7 +388,7 @@ Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcoun ret = Datatype::copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype); if (ret < 0) { line = __LINE__; goto error_hndl; } } - xbt_free(displs); + delete[] displs; return MPI_SUCCESS; } @@ -415,17 +402,29 @@ Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcoun max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent; - accumbuf_free = (char*)smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent); - if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; } + accumbuf_free = smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent); + if (nullptr == accumbuf_free) { + ret = -1; + line = __LINE__; + goto error_hndl; + } accumbuf = accumbuf_free - lb; - inbuf_free[0] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize); - if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; } + inbuf_free[0] = smpi_get_tmp_sendbuffer(max_real_segsize); + if (nullptr == inbuf_free[0]) { + ret = -1; + line = __LINE__; + goto error_hndl; + } inbuf[0] = inbuf_free[0] - lb; if (size > 2) { - inbuf_free[1] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize); - if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; } - inbuf[1] = inbuf_free[1] - lb; + inbuf_free[1] = smpi_get_tmp_sendbuffer(max_real_segsize); + if (nullptr == inbuf_free[1]) { + ret = -1; + line = __LINE__; + goto error_hndl; + } + inbuf[1] = inbuf_free[1] - lb; } /* Handle MPI_IN_PLACE for size > 1 */ @@ -450,7 +449,7 @@ Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcoun - wait on block (r) - compute on block (r) - copy block (r) to rbuf - Note that we must be careful when computing the begining of buffers and + Note that we must be careful when computing the beginning of buffers and for send operations and computation we must compute the exact block size. */ send_to = (rank + 1) % size; @@ -483,7 +482,8 @@ Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcoun rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock] */ tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent; - if(op!=MPI_OP_NULL) op->apply( inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), dtype); + if (op != MPI_OP_NULL) + op->apply(inbuf[inbi ^ 0x1], tmprecv, &rcounts[prevblock], dtype); /* send previous block to send_to */ Request::send(tmprecv, rcounts[prevblock], dtype, send_to, @@ -497,28 +497,311 @@ Coll_reduce_scatter_ompi_ring::reduce_scatter(void *sbuf, void *rbuf, int *rcoun /* Apply operation on the last block (my block) rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */ tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent; - if(op!=MPI_OP_NULL) op->apply( inbuf[inbi], tmprecv, &(rcounts[rank]), dtype); + if (op != MPI_OP_NULL) + op->apply(inbuf[inbi], tmprecv, &rcounts[rank], dtype); /* Copy result from tmprecv to rbuf */ ret = Datatype::copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype); if (ret < 0) { line = __LINE__; goto error_hndl; } - if (NULL != displs) xbt_free(displs); - if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free); - if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]); - if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]); + delete[] displs; + if (nullptr != accumbuf_free) + smpi_free_tmp_buffer(accumbuf_free); + if (nullptr != inbuf_free[0]) + smpi_free_tmp_buffer(inbuf_free[0]); + if (nullptr != inbuf_free[1]) + smpi_free_tmp_buffer(inbuf_free[1]); return MPI_SUCCESS; error_hndl: XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n", __FILE__, line, rank, ret); - if (NULL != displs) xbt_free(displs); - if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free); - if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]); - if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]); + delete[] displs; + if (nullptr != accumbuf_free) + smpi_free_tmp_buffer(accumbuf_free); + if (nullptr != inbuf_free[0]) + smpi_free_tmp_buffer(inbuf_free[0]); + if (nullptr != inbuf_free[1]) + smpi_free_tmp_buffer(inbuf_free[1]); return ret; } + +static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi) +{ + /* Adjust lo and hi for taking into account blocks of excluded processes */ + lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem; + hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem; + return displs[hi] + counts[hi] - displs[lo]; } + +/* + * ompi_mirror_perm: Returns mirror permutation of nbits low-order bits + * of x [*]. + * [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013. + * Chapter 7. Rearranging Bits and Bytes. + */ +static unsigned int ompi_mirror_perm(unsigned int x, int nbits) +{ + x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1)); + x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2)); + x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4)); + x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8)); + x = ((x >> 16) | (x << 16)); + return x >> (sizeof(x) * 8 - nbits); } +/* + * ompi_coll_base_reduce_scatter_intra_butterfly + * + * Function: Butterfly algorithm for reduce_scatter + * Accepts: Same as MPI_Reduce_scatter + * Returns: MPI_SUCCESS or error code + * + * Description: Implements butterfly algorithm for MPI_Reduce_scatter [*]. + * The algorithm can be used both by commutative and non-commutative + * operations, for power-of-two and non-power-of-two number of processes. + * + * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter + * with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137. + * + * Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma), + * where m = sum of rcounts[], p = comm_size + * Memory requirements (per process): 2 * m * typesize + comm_size + * + * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5] + * Step 1. Reduce the number of processes to 4 + * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1 + * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10] + * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1 + * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10] + * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5] + * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5] + * + * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks. + * Round 1 (mask=1, nblocks=2) + * 0: vrank -1 + * 1: vrank 0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*] + * 2: vrank -1 + * 3: vrank 1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20] + * 4: vrank 2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*] + * 5: vrank 3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10] + * + * Round 2 (mask=2, nblocks=1) + * 0: vrank -1 + * 1: vrank 0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*] + * 2: vrank -1 + * 3: vrank 1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*] + * 4: vrank 2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*] + * 5: vrank 3 [**|**|8|10] : exch with 1: send [2], recv [3]: [**|**|*|30] + * + * Step 3. Exchange with remote process according to a mirror permutation: + * mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3 + * 0: vrank -1: recv "0" from process 0 + * 1: vrank 0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0) + * 2: vrank -1: recv result "12" from process 4 + * 3: vrank 1 [**|**|24|*] + * 4: vrank 2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3 + * 5: vrank 3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3) + */ +int reduce_scatter__ompi_butterfly( + const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype, + MPI_Op op, MPI_Comm comm) +{ + char *tmpbuf[2] = {NULL, NULL}, *psend, *precv; + int *displs = NULL, index; + ptrdiff_t span, gap, totalcount, extent; + int err = MPI_SUCCESS; + int comm_size = comm->size(); + int rank = comm->rank(); + int vrank = -1; + int nprocs_rem = 0; + + XBT_DEBUG("coll:base:reduce_scatter_intra_butterfly: rank %d/%d", + rank, comm_size); + if (comm_size < 2) + return MPI_SUCCESS; + + displs = (int*)malloc(sizeof(*displs) * comm_size); + if (NULL == displs) { + err = MPI_ERR_OTHER; + goto cleanup_and_return; + } + displs[0] = 0; + for (int i = 1; i < comm_size; i++) { + displs[i] = displs[i - 1] + rcounts[i - 1]; + } + totalcount = displs[comm_size - 1] + rcounts[comm_size - 1]; + dtype->extent(&gap, &extent); + span = extent * totalcount; + tmpbuf[0] = (char*)malloc(span); + tmpbuf[1] = (char*)malloc(span); + if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) { + err = MPI_ERR_OTHER; + goto cleanup_and_return; + } + psend = tmpbuf[0] - gap; + precv = tmpbuf[1] - gap; + + if (sbuf != MPI_IN_PLACE) { + err = Datatype::copy(sbuf, totalcount, dtype, psend, totalcount, dtype); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } else { + err = Datatype::copy(rbuf, totalcount, dtype, psend, totalcount, dtype); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + + /* + * Step 1. Reduce the number of processes to the nearest lower power of two + * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes. + * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send + * the input vector to their neighbor (rank + 1) and all the odd ranks recv + * the input vector and perform local reduction. + * The odd ranks (0 to 2r - 1) contain the reduction with the input + * vector on their neighbors (the even ranks). The first r odd + * processes and the p - 2r last processes are renumbered from + * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the + * rest of the algorithm. + */ + + /* Find nearest power-of-two less than or equal to comm_size */ + int nprocs_pof2, size; + for( nprocs_pof2 = 1, size = comm_size; size > 0; size >>= 1, nprocs_pof2 <<= 1 ); + nprocs_pof2 >>= 1; + + nprocs_rem = comm_size - nprocs_pof2; + int log2_size; + for (log2_size = 0, size = 1; size < nprocs_pof2; ++log2_size, size <<= 1); + + if (rank < 2 * nprocs_rem) { + if ((rank % 2) == 0) { + /* Even process */ + Request::send(psend, totalcount, dtype, rank + 1, + COLL_TAG_REDUCE_SCATTER, comm); + /* This process does not participate in the rest of the algorithm */ + vrank = -1; + } else { + /* Odd process */ + Request::recv(precv, totalcount, dtype, rank - 1, + COLL_TAG_REDUCE_SCATTER, comm, MPI_STATUS_IGNORE); + op->apply(precv, psend, (int*)&totalcount, dtype); + /* Adjust rank to be the bottom "remain" ranks */ + vrank = rank / 2; + } + } else { + /* Adjust rank to show that the bottom "even remain" ranks dropped out */ + vrank = rank - nprocs_rem; + } + + if (vrank != -1) { + /* + * Now, psend vector of size totalcount is divided into nprocs_pof2 blocks: + * block 0: rcounts[0] and rcounts[1] -- for process 0 and 1 + * block 1: rcounts[2] and rcounts[3] -- for process 2 and 3 + * ... + * block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1] + * block r: rcounts[r+r] + * block r+1: rcounts[r+r+1] + * ... + * block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1] + */ + int nblocks = nprocs_pof2, send_index = 0, recv_index = 0; + for (int mask = 1; mask < nprocs_pof2; mask <<= 1) { + int vpeer = vrank ^ mask; + int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem; + + nblocks /= 2; + if ((vrank & mask) == 0) { + /* Send the upper half of reduction buffer, recv the lower half */ + send_index += nblocks; + } else { + /* Send the upper half of reduction buffer, recv the lower half */ + recv_index += nblocks; + } + + /* Send blocks: [send_index, send_index + nblocks - 1] */ + int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem, + send_index, send_index + nblocks - 1); + index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index; + ptrdiff_t sdispl = displs[index]; + + /* Recv blocks: [recv_index, recv_index + nblocks - 1] */ + int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem, + recv_index, recv_index + nblocks - 1); + index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index; + ptrdiff_t rdispl = displs[index]; + + Request::sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count, + dtype, peer, COLL_TAG_REDUCE_SCATTER, + precv + (ptrdiff_t)rdispl * extent, recv_count, + dtype, peer, COLL_TAG_REDUCE_SCATTER, + comm, MPI_STATUS_IGNORE); + + if (vrank < vpeer) { + /* precv = psend precv */ + op->apply(psend + (ptrdiff_t)rdispl * extent, + precv + (ptrdiff_t)rdispl * extent, &recv_count, dtype); + char *p = psend; + psend = precv; + precv = p; + } else { + /* psend = precv psend */ + op->apply(precv + (ptrdiff_t)rdispl * extent, + psend + (ptrdiff_t)rdispl * extent, &recv_count, dtype); + } + send_index = recv_index; + } + /* + * psend points to the result block [send_index] + * Exchange results with remote process according to a mirror permutation. + */ + int vpeer = ompi_mirror_perm(vrank, log2_size); + int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem; + index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index; + + if (vpeer < nprocs_rem) { + /* + * Process has two blocks: for excluded process and own. + * Send the first block to excluded process. + */ + Request::send(psend + (ptrdiff_t)displs[index] * extent, + rcounts[index], dtype, peer - 1, + COLL_TAG_REDUCE_SCATTER, + comm); + } + + /* If process has two blocks, then send the second block (own block) */ + if (vpeer < nprocs_rem) + index++; + if (vpeer != vrank) { + Request::sendrecv(psend + (ptrdiff_t)displs[index] * extent, + rcounts[index], dtype, peer, + COLL_TAG_REDUCE_SCATTER, + rbuf, rcounts[rank], dtype, peer, + COLL_TAG_REDUCE_SCATTER, + comm, MPI_STATUS_IGNORE); + } else { + err = Datatype::copy(psend + (ptrdiff_t)displs[rank] * extent, rcounts[rank], dtype, + rbuf, rcounts[rank], dtype); + if (MPI_SUCCESS != err) { goto cleanup_and_return; } + } + + } else { + /* Excluded process: receive result */ + int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size); + int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem; + Request::recv(rbuf, rcounts[rank], dtype, peer, + COLL_TAG_REDUCE_SCATTER, comm, + MPI_STATUS_IGNORE); + } +cleanup_and_return: + if (displs) + free(displs); + if (tmpbuf[0]) + free(tmpbuf[0]); + if (tmpbuf[1]) + free(tmpbuf[1]); + return err; +} +} +}