From: degomme Date: Wed, 12 Jun 2013 23:57:57 +0000 (+0200) Subject: add reduce scatter collectives from openmpi, and fix existing one X-Git-Tag: v3_9_90~274^2~15 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/b085fe247ae2cf2a6e36863a54f794deb4f342f5 add reduce scatter collectives from openmpi, and fix existing one and add tests, without doing separate commits because it is 2AM --- diff --git a/buildtools/Cmake/AddTests.cmake b/buildtools/Cmake/AddTests.cmake index 3ec061dff0..48724f4c93 100644 --- a/buildtools/Cmake/AddTests.cmake +++ b/buildtools/Cmake/AddTests.cmake @@ -415,6 +415,9 @@ if(NOT enable_memcheck) ADD_TEST(smpi-reduce-coll-${REDUCE_COLL} ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg smpi/reduce:${REDUCE_COLL} --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/reduce_coll.tesh) ENDFOREACH() + FOREACH (REDUCE_SCATTER_COLL default ompi ompi_basic_recursivehalving ompi_ring) + ADD_TEST(smpi-reduce_scatter-coll-${REDUCE_SCATTER_COLL} ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg smpi/reduce_scatter:${REDUCE_SCATTER_COLL} --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/reduce_scatter_coll.tesh) + ENDFOREACH() endif() # END TESH TESTS diff --git a/buildtools/Cmake/DefinePackages.cmake b/buildtools/Cmake/DefinePackages.cmake index eab571d577..93d21144e3 100644 --- a/buildtools/Cmake/DefinePackages.cmake +++ b/buildtools/Cmake/DefinePackages.cmake @@ -196,6 +196,7 @@ set(SMPI_SRC src/smpi/colls/reduce-scatter-gather.c src/smpi/colls/reduce-ompi.c src/smpi/colls/gather-ompi.c + src/smpi/colls/reduce_scatter-ompi.c ) if(SMPI_F2C) diff --git a/src/include/smpi/smpi_interface.h b/src/include/smpi/smpi_interface.h index 77736f9d8f..f90ef705c3 100644 --- a/src/include/smpi/smpi_interface.h +++ b/src/include/smpi/smpi_interface.h @@ -91,6 +91,13 @@ XBT_PUBLIC_DATA(int (*mpi_coll_reduce_fun) (void *buf, void *rbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm)); +/** \ingroup MPI reduce_scatter + * \brief The list of all available allgather collectives + */ +XBT_PUBLIC_DATA(s_mpi_coll_description_t) mpi_coll_reduce_scatter_description[]; +XBT_PUBLIC_DATA(int (*mpi_coll_reduce_scatter_fun) + (void *sbuf, void *rbuf, int *rcounts, + MPI_Datatype dtype, MPI_Op op,MPI_Comm comm)); XBT_PUBLIC(void) coll_help(const char *category, s_mpi_coll_description_t * table); diff --git a/src/simgrid/sg_config.c b/src/simgrid/sg_config.c index 797a0ae112..d600f19af7 100644 --- a/src/simgrid/sg_config.c +++ b/src/simgrid/sg_config.c @@ -276,6 +276,9 @@ static void _sg_cfg_cb__coll_reduce(const char *name, int pos) { _sg_cfg_cb__coll("reduce", mpi_coll_reduce_description, name, pos); } +static void _sg_cfg_cb__coll_reduce_scatter(const char *name, int pos){ + _sg_cfg_cb__coll("reduce_scatter", mpi_coll_reduce_scatter_description, name, pos); +} #endif /* callback of the inclusion path */ @@ -772,6 +775,11 @@ void sg_config_init(int *argc, char **argv) xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_allgather, NULL); + xbt_cfg_register(&_sg_cfg_set, "smpi/reduce_scatter", + "Which collective to use for reduce_scatter", + xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_reduce_scatter, + NULL); + xbt_cfg_register(&_sg_cfg_set, "smpi/allgatherv", "Which collective to use for allgatherv", xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_allgatherv, diff --git a/src/smpi/colls/colls.h b/src/smpi/colls/colls.h index 4495b4d9ba..55be878d14 100644 --- a/src/smpi/colls/colls.h +++ b/src/smpi/colls/colls.h @@ -213,4 +213,18 @@ COLL_APPLY(action, COLL_REDUCE_SIG, ompi_binomial) COLL_REDUCES(COLL_PROTO, COLL_NOsep) +/************* + * REDUCE_SCATTER * + *************/ +#define COLL_REDUCE_SCATTER_SIG reduce_scatter, int, \ + (void *sbuf, void *rbuf, int *rcounts,\ + MPI_Datatype dtype,MPI_Op op,MPI_Comm comm) + +#define COLL_REDUCE_SCATTERS(action, COLL_sep) \ +COLL_APPLY(action, COLL_REDUCE_SCATTER_SIG, ompi) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SCATTER_SIG, ompi_basic_recursivehalving) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SCATTER_SIG, ompi_ring) + +COLL_REDUCE_SCATTERS(COLL_PROTO, COLL_NOsep) + #endif diff --git a/src/smpi/colls/reduce_scatter-ompi.c b/src/smpi/colls/reduce_scatter-ompi.c new file mode 100644 index 0000000000..90978b88f3 --- /dev/null +++ b/src/smpi/colls/reduce_scatter-ompi.c @@ -0,0 +1,517 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2012 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved. + * Copyright (c) 2009 University of Houston. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "colls_private.h" +#include "coll_tuned_topo.h" + +#define MCA_COLL_BASE_TAG_REDUCE_SCATTER 222 +/* + * Recursive-halving function is (*mostly*) copied from the BASIC coll module. + * I have removed the part which handles "large" message sizes + * (non-overlapping version of reduce_Scatter). + */ + +/* copied function (with appropriate renaming) starts here */ + +/* + * reduce_scatter_ompi_basic_recursivehalving + * + * Function: - reduce scatter implementation using recursive-halving + * algorithm + * Accepts: - same as MPI_Reduce_scatter() + * Returns: - MPI_SUCCESS or error code + * Limitation: - Works only for commutative operations. + */ +int +smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf, + void *rbuf, + int *rcounts, + MPI_Datatype dtype, + MPI_Op op, + MPI_Comm comm + ) +{ + int i, rank, size, count, err = MPI_SUCCESS; + int tmp_size=1, remain = 0, tmp_rank, *disps = NULL; + ptrdiff_t true_lb, true_extent, lb, extent, buf_size; + char *recv_buf = NULL, *recv_buf_free = NULL; + char *result_buf = NULL, *result_buf_free = NULL; + + /* Initialize */ + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + + XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank); + + /* Find displacements and the like */ + disps = (int*) xbt_malloc(sizeof(int) * size); + if (NULL == disps) return MPI_ERR_OTHER; + + disps[0] = 0; + for (i = 0; i < (size - 1); ++i) { + disps[i + 1] = disps[i] + rcounts[i]; + } + count = disps[size - 1] + rcounts[size - 1]; + + /* short cut the trivial case */ + if (0 == count) { + xbt_free(disps); + return MPI_SUCCESS; + } + + /* get datatype information */ + smpi_datatype_extent(dtype, &lb, &extent); + smpi_datatype_extent(dtype, &true_lb, &true_extent); + buf_size = true_extent + (ptrdiff_t)(count - 1) * extent; + + /* Handle MPI_IN_PLACE */ + if (MPI_IN_PLACE == sbuf) { + sbuf = rbuf; + } + + /* Allocate temporary receive buffer. */ + recv_buf_free = (char*) xbt_malloc(buf_size); + recv_buf = recv_buf_free - lb; + if (NULL == recv_buf_free) { + err = MPI_ERR_OTHER; + goto cleanup; + } + + /* allocate temporary buffer for results */ + result_buf_free = (char*) xbt_malloc(buf_size); + result_buf = result_buf_free - lb; + + /* copy local buffer into the temporary results */ + err =smpi_datatype_copy(sbuf, count, dtype, result_buf, count, dtype); + if (MPI_SUCCESS != err) goto cleanup; + + /* figure out power of two mapping: grow until larger than + comm size, then go back one, to get the largest power of + two less than comm size */ + while (tmp_size <= size) tmp_size <<= 1; + tmp_size >>= 1; + remain = size - tmp_size; + + /* If comm size is not a power of two, have the first "remain" + procs with an even rank send to rank + 1, leaving a power of + two procs to do the rest of the algorithm */ + if (rank < 2 * remain) { + if ((rank & 1) == 0) { + smpi_mpi_send(result_buf, count, dtype, rank + 1, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm); + /* we don't participate from here on out */ + tmp_rank = -1; + } else { + smpi_mpi_recv(recv_buf, count, dtype, rank - 1, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm, MPI_STATUS_IGNORE); + + /* integrate their results into our temp results */ + smpi_op_apply(op, recv_buf, result_buf, &count, &dtype); + + /* adjust rank to be the bottom "remain" ranks */ + tmp_rank = rank / 2; + } + } else { + /* just need to adjust rank to show that the bottom "even + remain" ranks dropped out */ + tmp_rank = rank - remain; + } + + /* For ranks not kicked out by the above code, perform the + recursive halving */ + if (tmp_rank >= 0) { + int *tmp_disps = NULL, *tmp_rcounts = NULL; + int mask, send_index, recv_index, last_index; + + /* recalculate disps and rcounts to account for the + special "remainder" processes that are no longer doing + anything */ + tmp_rcounts = (int*) xbt_malloc(tmp_size * sizeof(int)); + if (NULL == tmp_rcounts) { + err = MPI_ERR_OTHER; + goto cleanup; + } + tmp_disps = (int*) xbt_malloc(tmp_size * sizeof(int)); + if (NULL == tmp_disps) { + xbt_free(tmp_rcounts); + err = MPI_ERR_OTHER; + goto cleanup; + } + + for (i = 0 ; i < tmp_size ; ++i) { + if (i < remain) { + /* need to include old neighbor as well */ + tmp_rcounts[i] = rcounts[i * 2 + 1] + rcounts[i * 2]; + } else { + tmp_rcounts[i] = rcounts[i + remain]; + } + } + + tmp_disps[0] = 0; + for (i = 0; i < tmp_size - 1; ++i) { + tmp_disps[i + 1] = tmp_disps[i] + tmp_rcounts[i]; + } + + /* do the recursive halving communication. Don't use the + dimension information on the communicator because I + think the information is invalidated by our "shrinking" + of the communicator */ + mask = tmp_size >> 1; + send_index = recv_index = 0; + last_index = tmp_size; + while (mask > 0) { + int tmp_peer, peer, send_count, recv_count; + MPI_Request request; + + tmp_peer = tmp_rank ^ mask; + peer = (tmp_peer < remain) ? tmp_peer * 2 + 1 : tmp_peer + remain; + + /* figure out if we're sending, receiving, or both */ + send_count = recv_count = 0; + if (tmp_rank < tmp_peer) { + send_index = recv_index + mask; + for (i = send_index ; i < last_index ; ++i) { + send_count += tmp_rcounts[i]; + } + for (i = recv_index ; i < send_index ; ++i) { + recv_count += tmp_rcounts[i]; + } + } else { + recv_index = send_index + mask; + for (i = send_index ; i < recv_index ; ++i) { + send_count += tmp_rcounts[i]; + } + for (i = recv_index ; i < last_index ; ++i) { + recv_count += tmp_rcounts[i]; + } + } + + /* actual data transfer. Send from result_buf, + receive into recv_buf */ + if (send_count > 0 && recv_count != 0) { + request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, + recv_count, dtype, peer, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm); + if (MPI_SUCCESS != err) { + xbt_free(tmp_rcounts); + xbt_free(tmp_disps); + goto cleanup; + } + } + if (recv_count > 0 && send_count != 0) { + smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent, + send_count, dtype, peer, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm); + if (MPI_SUCCESS != err) { + xbt_free(tmp_rcounts); + xbt_free(tmp_disps); + goto cleanup; + } + } + if (send_count > 0 && recv_count != 0) { + smpi_mpi_wait(&request, MPI_STATUS_IGNORE); + } + + /* if we received something on this step, push it into + the results buffer */ + if (recv_count > 0) { + smpi_op_apply(op, + recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, + result_buf + (ptrdiff_t)tmp_disps[recv_index] * extent, + &recv_count, &dtype); + } + + /* update for next iteration */ + send_index = recv_index; + last_index = recv_index + mask; + mask >>= 1; + } + + /* copy local results from results buffer into real receive buffer */ + if (0 != rcounts[rank]) { + err = smpi_datatype_copy(result_buf + disps[rank] * extent, + rcounts[rank], dtype, + rbuf, rcounts[rank], dtype); + if (MPI_SUCCESS != err) { + xbt_free(tmp_rcounts); + xbt_free(tmp_disps); + goto cleanup; + } + } + + xbt_free(tmp_rcounts); + xbt_free(tmp_disps); + } + + /* Now fix up the non-power of two case, by having the odd + procs send the even procs the proper results */ + if (rank < (2 * remain)) { + if ((rank & 1) == 0) { + if (rcounts[rank]) { + smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm, MPI_STATUS_IGNORE); + } + } else { + if (rcounts[rank - 1]) { + smpi_mpi_send(result_buf + disps[rank - 1] * extent, + rcounts[rank - 1], dtype, rank - 1, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm); + } + } + } + + cleanup: + if (NULL != disps) xbt_free(disps); + if (NULL != recv_buf_free) xbt_free(recv_buf_free); + if (NULL != result_buf_free) xbt_free(result_buf_free); + + return err; +} + +/* copied function (with appropriate renaming) ends here */ + + +/* + * smpi_coll_tuned_reduce_scatter_ompi_ring + * + * Function: Ring algorithm for reduce_scatter operation + * Accepts: Same as MPI_Reduce_scatter() + * Returns: MPI_SUCCESS or error code + * + * Description: Implements ring algorithm for reduce_scatter: + * the block sizes defined in rcounts are exchanged and + 8 updated until they reach proper destination. + * Algorithm requires 2 * max(rcounts) extra buffering + * + * Limitations: The algorithm DOES NOT preserve order of operations so it + * can be used only for commutative operations. + * Example on 5 nodes: + * Initial state + * # 0 1 2 3 4 + * [00] [10] -> [20] [30] [40] + * [01] [11] [21] -> [31] [41] + * [02] [12] [22] [32] -> [42] + * -> [03] [13] [23] [33] [43] --> .. + * [04] -> [14] [24] [34] [44] + * + * COMPUTATION PHASE + * Step 0: rank r sends block (r-1) to rank (r+1) and + * receives block (r+1) from rank (r-1) [with wraparound]. + * # 0 1 2 3 4 + * [00] [10] [10+20] -> [30] [40] + * [01] [11] [21] [21+31] -> [41] + * -> [02] [12] [22] [32] [32+42] -->.. + * [43+03] -> [13] [23] [33] [43] + * [04] [04+14] -> [24] [34] [44] + * + * Step 1: + * # 0 1 2 3 4 + * [00] [10] [10+20] [10+20+30] -> [40] + * -> [01] [11] [21] [21+31] [21+31+41] -> + * [32+42+02] -> [12] [22] [32] [32+42] + * [03] [43+03+13] -> [23] [33] [43] + * [04] [04+14] [04+14+24] -> [34] [44] + * + * Step 2: + * # 0 1 2 3 4 + * -> [00] [10] [10+20] [10+20+30] [10+20+30+40] -> + * [21+31+41+01]-> [11] [21] [21+31] [21+31+41] + * [32+42+02] [32+42+02+12]-> [22] [32] [32+42] + * [03] [43+03+13] [43+03+13+23]-> [33] [43] + * [04] [04+14] [04+14+24] [04+14+24+34] -> [44] + * + * Step 3: + * # 0 1 2 3 4 + * [10+20+30+40+00] [10] [10+20] [10+20+30] [10+20+30+40] + * [21+31+41+01] [21+31+41+01+11] [21] [21+31] [21+31+41] + * [32+42+02] [32+42+02+12] [32+42+02+12+22] [32] [32+42] + * [03] [43+03+13] [43+03+13+23] [43+03+13+23+33] [43] + * [04] [04+14] [04+14+24] [04+14+24+34] [04+14+24+34+44] + * DONE :) + * + */ +int +smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts, + MPI_Datatype dtype, + MPI_Op op, + MPI_Comm comm + ) +{ + int ret, line, rank, size, i, k, recv_from, send_to, total_count, max_block_count; + int inbi, *displs = NULL; + char *tmpsend = NULL, *tmprecv = NULL, *accumbuf = NULL, *accumbuf_free = NULL; + char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL}; + ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize; + MPI_Request reqs[2] = {NULL, NULL}; + size_t typelng; + + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + + XBT_DEBUG( "coll:tuned:reduce_scatter_ompi_ring rank %d, size %d", + rank, size); + + /* Determine the maximum number of elements per node, + corresponding block size, and displacements array. + */ + displs = (int*) xbt_malloc(size * sizeof(int)); + if (NULL == displs) { ret = -1; line = __LINE__; goto error_hndl; } + displs[0] = 0; + total_count = rcounts[0]; + max_block_count = rcounts[0]; + for (i = 1; i < size; i++) { + displs[i] = total_count; + total_count += rcounts[i]; + if (max_block_count < rcounts[i]) max_block_count = rcounts[i]; + } + + /* Special case for size == 1 */ + if (1 == size) { + if (MPI_IN_PLACE != sbuf) { + ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, (char*)rbuf, total_count, dtype); + if (ret < 0) { line = __LINE__; goto error_hndl; } + } + xbt_free(displs); + return MPI_SUCCESS; + } + + /* Allocate and initialize temporary buffers, we need: + - a temporary buffer to perform reduction (size total_count) since + rbuf can be of rcounts[rank] size. + - up to two temporary buffers used for communication/computation overlap. + */ + smpi_datatype_extent(dtype, &lb, &extent); + smpi_datatype_extent(dtype, &true_lb, &true_extent); + typelng = smpi_datatype_size(dtype); + + max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent; + + accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent); + if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; } + accumbuf = accumbuf_free - lb; + + inbuf_free[0] = (char*)xbt_malloc(max_real_segsize); + if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; } + inbuf[0] = inbuf_free[0] - lb; + if (size > 2) { + inbuf_free[1] = (char*)xbt_malloc(max_real_segsize); + if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; } + inbuf[1] = inbuf_free[1] - lb; + } + + /* Handle MPI_IN_PLACE for size > 1 */ + if (MPI_IN_PLACE == sbuf) { + sbuf = rbuf; + } + + ret = smpi_datatype_copy((char*)sbuf, total_count, dtype, accumbuf, total_count, dtype); + if (ret < 0) { line = __LINE__; goto error_hndl; } + + /* Computation loop */ + + /* + For each of the remote nodes: + - post irecv for block (r-2) from (r-1) with wrap around + - send block (r-1) to (r+1) + - in loop for every step k = 2 .. n + - post irecv for block (r - 1 + n - k) % n + - wait on block (r + n - k) % n to arrive + - compute on block (r + n - k ) % n + - send block (r + n - k) % n + - wait on block (r) + - compute on block (r) + - copy block (r) to rbuf + Note that we must be careful when computing the begining of buffers and + for send operations and computation we must compute the exact block size. + */ + send_to = (rank + 1) % size; + recv_from = (rank + size - 1) % size; + + inbi = 0; + /* Initialize first receive from the neighbor on the left */ + reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm + ); + tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent; + smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm); + + for (k = 2; k < size; k++) { + const int prevblock = (rank + size - k) % size; + + inbi = inbi ^ 0x1; + + /* Post irecv for the current block */ + reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm + ); + + /* Wait on previous block to arrive */ + smpi_mpi_wait(&reqs[inbi ^ 0x1], MPI_STATUS_IGNORE); + + /* Apply operation on previous block: result goes to rbuf + rbuf[prevblock] = inbuf[inbi ^ 0x1] (op) rbuf[prevblock] + */ + tmprecv = accumbuf + (ptrdiff_t)displs[prevblock] * extent; + smpi_op_apply(op, inbuf[inbi ^ 0x1], tmprecv, &(rcounts[prevblock]), &dtype); + + /* send previous block to send_to */ + smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to, + MCA_COLL_BASE_TAG_REDUCE_SCATTER, + comm); + } + + /* Wait on the last block to arrive */ + smpi_mpi_wait(&reqs[inbi], MPI_STATUS_IGNORE); + + /* Apply operation on the last block (my block) + rbuf[rank] = inbuf[inbi] (op) rbuf[rank] */ + tmprecv = accumbuf + (ptrdiff_t)displs[rank] * extent; + smpi_op_apply(op, inbuf[inbi], tmprecv, &(rcounts[rank]), &dtype); + + /* Copy result from tmprecv to rbuf */ + ret = smpi_datatype_copy(tmprecv, rcounts[rank], dtype, (char*)rbuf, rcounts[rank], dtype); + if (ret < 0) { line = __LINE__; goto error_hndl; } + + if (NULL != displs) xbt_free(displs); + if (NULL != accumbuf_free) xbt_free(accumbuf_free); + if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]); + if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]); + + return MPI_SUCCESS; + + error_hndl: + XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n", + __FILE__, line, rank, ret); + if (NULL != displs) xbt_free(displs); + if (NULL != accumbuf_free) xbt_free(accumbuf_free); + if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]); + if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]); + return ret; +} + diff --git a/src/smpi/colls/smpi_openmpi_selector.c b/src/smpi/colls/smpi_openmpi_selector.c index 72c3699278..15dac048e7 100644 --- a/src/smpi/colls/smpi_openmpi_selector.c +++ b/src/smpi/colls/smpi_openmpi_selector.c @@ -324,11 +324,11 @@ int smpi_coll_tuned_reduce_ompi( void *sendbuf, void *recvbuf, #endif /* 0 */ } -/*int smpi_coll_tuned_reduce_scatter_ompi( void *sbuf, void *rbuf, +int smpi_coll_tuned_reduce_scatter_ompi( void *sbuf, void *rbuf, int *rcounts, MPI_Datatype dtype, MPI_Op op, - MPI_Comm comm, + MPI_Comm comm ) { int comm_size, i, pow2; @@ -337,25 +337,26 @@ int smpi_coll_tuned_reduce_ompi( void *sendbuf, void *recvbuf, const double b = 8.0; const size_t small_message_size = 12 * 1024; const size_t large_message_size = 256 * 1024; - bool zerocounts = false; - - OPAL_OUTPUT((smpi_coll_tuned_stream, "smpi_coll_tuned_reduce_scatter_ompi")); + int zerocounts = 0; + XBT_DEBUG("smpi_coll_tuned_reduce_scatter_ompi"); + comm_size = smpi_comm_size(comm); // We need data size for decision function - ompi_datatype_type_size(dtype, &dsize); + dsize=smpi_datatype_size(dtype); total_message_size = 0; for (i = 0; i < comm_size; i++) { total_message_size += rcounts[i]; if (0 == rcounts[i]) { - zerocounts = true; + zerocounts = 1; } } - if( !ompi_op_is_commute(op) || (zerocounts)) { - return smpi_coll_tuned_reduce_scatter_intra_nonoverlapping (sbuf, rbuf, rcounts, + if( !smpi_op_is_commute(op) || (zerocounts)) { + smpi_mpi_reduce_scatter (sbuf, rbuf, rcounts, dtype, op, - comm, module); + comm); + return MPI_SUCCESS; } total_message_size *= dsize; @@ -367,20 +368,17 @@ int smpi_coll_tuned_reduce_ompi( void *sendbuf, void *recvbuf, ((total_message_size <= large_message_size) && (pow2 == comm_size)) || (comm_size >= a * total_message_size + b)) { return - smpi_coll_tuned_reduce_scatter_intra_basic_recursivehalving(sbuf, rbuf, rcounts, + smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(sbuf, rbuf, rcounts, dtype, op, - comm, module); + comm); } - return smpi_coll_tuned_reduce_scatter_intra_ring(sbuf, rbuf, rcounts, + return smpi_coll_tuned_reduce_scatter_ompi_ring(sbuf, rbuf, rcounts, dtype, op, - comm, module); + comm); - - return smpi_coll_tuned_reduce_scatter(sbuf, rbuf, rcounts, - dtype, op, - comm; -}*/ + +} int smpi_coll_tuned_allgather_ompi(void *sbuf, int scount, MPI_Datatype sdtype, diff --git a/src/smpi/private.h b/src/smpi/private.h index cf468565ca..156337f827 100644 --- a/src/smpi/private.h +++ b/src/smpi/private.h @@ -211,6 +211,8 @@ void smpi_mpi_barrier(MPI_Comm comm); void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); +void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, + MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm); diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index baf4891735..067cd82af0 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -941,6 +941,27 @@ void smpi_mpi_gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, } } + +void smpi_mpi_reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, + MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) +{ + int i, size, count; + int *displs; + int rank = smpi_process_index(); + /* arbitrarily choose root as rank 0 */ + size = smpi_comm_size(comm); + count = 0; + displs = xbt_new(int, size); + for (i = 0; i < size; i++) { + displs[i] = count; + count += recvcounts[i]; + } + mpi_coll_reduce_fun(sendbuf, recvbuf, count, datatype, op, 0, comm); + smpi_mpi_scatterv(recvbuf, recvcounts, displs, datatype, recvbuf, + recvcounts[rank], datatype, 0, comm); + xbt_free(displs); +} + void smpi_mpi_gatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm) diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c index f242c77232..9140f3ad70 100644 --- a/src/smpi/smpi_coll.c +++ b/src/smpi/smpi_coll.c @@ -47,6 +47,15 @@ COLL_ALLREDUCES(COLL_DESCRIPTION, COLL_COMMA), {NULL, NULL, NULL} /* this array must be NULL terminated */ }; +s_mpi_coll_description_t mpi_coll_reduce_scatter_description[] = { + {"default", + "reduce_scatter default collective", + smpi_mpi_reduce_scatter}, +COLL_REDUCE_SCATTERS(COLL_DESCRIPTION, COLL_COMMA), + {NULL, NULL, NULL} /* this array must be NULL terminated */ +}; + + s_mpi_coll_description_t mpi_coll_alltoall_description[] = { {"default", "Ompi alltoall default collective", @@ -143,7 +152,7 @@ int (*mpi_coll_alltoall_fun)(void *, int, MPI_Datatype, void*, int, MPI_Datatype int (*mpi_coll_alltoallv_fun)(void *, int*, int*, MPI_Datatype, void*, int*, int*, MPI_Datatype, MPI_Comm); int (*mpi_coll_bcast_fun)(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm com); int (*mpi_coll_reduce_fun)(void *buf, void *rbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); - +int (*mpi_coll_reduce_scatter_fun)(void *sbuf, void *rbuf, int *rcounts,MPI_Datatype dtype,MPI_Op op,MPI_Comm comm); struct s_proc_tree { int PROCTREE_A; int numChildren; diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index 189bac1afd..5b05cfe912 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -408,6 +408,11 @@ int smpi_main(int (*realmain) (int argc, char *argv[]),int argc, char *argv[]) MPI_Op op, int root, MPI_Comm comm)) mpi_coll_reduce_description[reduce_id].coll; + int reduce_scatter_id = find_coll_description(mpi_coll_reduce_scatter_description, + sg_cfg_get_string("smpi/reduce_scatter")); + mpi_coll_reduce_scatter_fun = (int (*)(void *sbuf, void *rbuf, int *rcounts,\ + MPI_Datatype dtype,MPI_Op op,MPI_Comm comm)) + mpi_coll_reduce_scatter_description[reduce_scatter_id].coll; smpi_global_init(); /* Clean IO before the run */ diff --git a/src/smpi/smpi_pmpi.c b/src/smpi/smpi_pmpi.c index 783395b2ef..23621eac21 100644 --- a/src/smpi/smpi_pmpi.c +++ b/src/smpi/smpi_pmpi.c @@ -1869,10 +1869,8 @@ int PMPI_Scan(void *sendbuf, void *recvbuf, int count, int PMPI_Reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { - int retval, i, size, count; - int *displs; + int retval; int rank = comm != MPI_COMM_NULL ? smpi_process_index() : -1; - smpi_bench_end(); #ifdef HAVE_TRACING TRACE_smpi_computing_out(rank); @@ -1887,19 +1885,9 @@ int PMPI_Reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, } else if (recvcounts == NULL) { retval = MPI_ERR_ARG; } else { - /* arbitrarily choose root as rank 0 */ - /* TODO: faster direct implementation ? */ - size = smpi_comm_size(comm); - count = 0; - displs = xbt_new(int, size); - for (i = 0; i < size; i++) { - count += recvcounts[i]; - displs[i] = 0; - } - mpi_coll_reduce_fun(sendbuf, recvbuf, count, datatype, op, 0, comm); - smpi_mpi_scatterv(recvbuf, recvcounts, displs, datatype, recvbuf, - recvcounts[rank], datatype, 0, comm); - xbt_free(displs); + + mpi_coll_reduce_scatter_fun(sendbuf, recvbuf, recvcounts, + datatype, op, comm); retval = MPI_SUCCESS; } #ifdef HAVE_TRACING diff --git a/teshsuite/smpi/CMakeLists.txt b/teshsuite/smpi/CMakeLists.txt index 78de3f413f..c0d0ea3c33 100644 --- a/teshsuite/smpi/CMakeLists.txt +++ b/teshsuite/smpi/CMakeLists.txt @@ -28,6 +28,7 @@ if(enable_smpi) add_executable(scatter scatter.c) add_executable(reduce reduce.c) add_executable(reduce_coll reduce_coll.c) + add_executable(reduce_scatter_coll reduce_scatter_coll.c) add_executable(split split.c) add_executable(smpi_sendrecv sendrecv.c) add_executable(ttest01 ttest01.c) @@ -54,6 +55,7 @@ if(enable_smpi) target_link_libraries(scatter simgrid) target_link_libraries(reduce simgrid) target_link_libraries(reduce_coll simgrid) + target_link_libraries(reduce_scatter_coll simgrid) target_link_libraries(split simgrid) target_link_libraries(smpi_sendrecv simgrid) target_link_libraries(ttest01 simgrid) @@ -88,6 +90,7 @@ set(tesh_files ${CMAKE_CURRENT_SOURCE_DIR}/pt2pt.tesh ${CMAKE_CURRENT_SOURCE_DIR}/reduce.tesh ${CMAKE_CURRENT_SOURCE_DIR}/reduce_coll.tesh + ${CMAKE_CURRENT_SOURCE_DIR}/reduce_scatter_coll.tesh ${CMAKE_CURRENT_SOURCE_DIR}/struct.tesh ${CMAKE_CURRENT_SOURCE_DIR}/vector.tesh PARENT_SCOPE @@ -106,6 +109,7 @@ set(examples_src ${CMAKE_CURRENT_SOURCE_DIR}/alltoallv_coll.c ${CMAKE_CURRENT_SOURCE_DIR}/bcast_coll.c ${CMAKE_CURRENT_SOURCE_DIR}/reduce_coll.c + ${CMAKE_CURRENT_SOURCE_DIR}/reduce_scatter_coll.c ${CMAKE_CURRENT_SOURCE_DIR}/alltoallv_coll.c ${CMAKE_CURRENT_SOURCE_DIR}/get_processor_name.c ${CMAKE_CURRENT_SOURCE_DIR}/pingpong.c diff --git a/teshsuite/smpi/mpich-test/coll/redscat.c b/teshsuite/smpi/mpich-test/coll/redscat.c index 3cb057da76..9cb824ae30 100644 --- a/teshsuite/smpi/mpich-test/coll/redscat.c +++ b/teshsuite/smpi/mpich-test/coll/redscat.c @@ -31,10 +31,8 @@ int main( int argc, char **argv ) recvcounts = (int *)malloc( size * sizeof(int) ); recvbuf = (int *)malloc( size * sizeof(int) ); for (i=0; i +#include + +int main( int argc, char **argv ) +{ + int err = 0, toterr; + int *sendbuf, *recvbuf, *recvcounts; + int size, rank, i, sumval; + MPI_Comm comm; + + + MPI_Init( &argc, &argv ); + comm = MPI_COMM_WORLD; + + MPI_Comm_size( comm, &size ); + MPI_Comm_rank( comm, &rank ); + sendbuf = (int *) malloc( size * sizeof(int) ); + for (i=0; i No Errors +> You requested to use 16 processes, but there is only 5 processes in your hostfile... +> [0.000000] [surf_config/INFO] Switching workstation model to compound since you changed the network and/or cpu model(s) +> [rank 0] -> Tremblay +> [rank 10] -> Tremblay +> [rank 11] -> Jupiter +> [rank 12] -> Fafard +> [rank 13] -> Ginette +> [rank 14] -> Bourassa +> [rank 15] -> Tremblay +> [rank 1] -> Jupiter +> [rank 2] -> Fafard +> [rank 3] -> Ginette +> [rank 4] -> Bourassa +> [rank 5] -> Tremblay +> [rank 6] -> Jupiter +> [rank 7] -> Fafard +> [rank 8] -> Ginette +> [rank 9] -> Bourassa