From: degomme Date: Thu, 13 Jun 2013 09:36:27 +0000 (+0200) Subject: end of ompi collectives addition, with barrier X-Git-Tag: v3_9_90~274^2~10 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/e6557f5a455aa071d62f977185ea49ed1437003a end of ompi collectives addition, with barrier --- diff --git a/buildtools/Cmake/DefinePackages.cmake b/buildtools/Cmake/DefinePackages.cmake index 4b5c74efcb..6d5586ff42 100644 --- a/buildtools/Cmake/DefinePackages.cmake +++ b/buildtools/Cmake/DefinePackages.cmake @@ -198,6 +198,7 @@ set(SMPI_SRC src/smpi/colls/gather-ompi.c src/smpi/colls/reduce_scatter-ompi.c src/smpi/colls/scatter-ompi.c + src/smpi/colls/barrier-ompi.c ) if(SMPI_F2C) diff --git a/src/include/smpi/smpi_interface.h b/src/include/smpi/smpi_interface.h index 7ba0fdc34a..04fc1da637 100644 --- a/src/include/smpi/smpi_interface.h +++ b/src/include/smpi/smpi_interface.h @@ -108,6 +108,14 @@ XBT_PUBLIC_DATA(int (*mpi_coll_scatter_fun) void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)); +/** \ingroup MPI barrier + * \brief The list of all available allgather collectives + */ +XBT_PUBLIC_DATA(s_mpi_coll_description_t) mpi_coll_barrier_description[]; +XBT_PUBLIC_DATA(int (*mpi_coll_barrier_fun) + (MPI_Comm comm)); + + XBT_PUBLIC(void) coll_help(const char *category, s_mpi_coll_description_t * table); XBT_PUBLIC(int) find_coll_description(s_mpi_coll_description_t * table, diff --git a/src/simgrid/sg_config.c b/src/simgrid/sg_config.c index c29f5e8841..3022b610d4 100644 --- a/src/simgrid/sg_config.c +++ b/src/simgrid/sg_config.c @@ -282,6 +282,9 @@ static void _sg_cfg_cb__coll_reduce_scatter(const char *name, int pos){ static void _sg_cfg_cb__coll_scatter(const char *name, int pos){ _sg_cfg_cb__coll("scatter", mpi_coll_scatter_description, name, pos); } +static void _sg_cfg_cb__coll_barrier(const char *name, int pos){ + _sg_cfg_cb__coll("barrier", mpi_coll_barrier_description, name, pos); +} #endif /* callback of the inclusion path */ @@ -778,6 +781,11 @@ void sg_config_init(int *argc, char **argv) xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_allgather, NULL); + xbt_cfg_register(&_sg_cfg_set, "smpi/barrier", + "Which collective to use for barrier", + xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_barrier, + NULL); + xbt_cfg_register(&_sg_cfg_set, "smpi/reduce_scatter", "Which collective to use for reduce_scatter", xbt_cfgelm_string, NULL, 1, 1, &_sg_cfg_cb__coll_reduce_scatter, diff --git a/src/smpi/colls/barrier-ompi.c b/src/smpi/colls/barrier-ompi.c new file mode 100644 index 0000000000..ad5b748042 --- /dev/null +++ b/src/smpi/colls/barrier-ompi.c @@ -0,0 +1,344 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "colls_private.h" +#include "coll_tuned_topo.h" + + +#define MCA_COLL_BASE_TAG_BARRIER 100 +/* + * Barrier is ment to be a synchronous operation, as some BTLs can mark + * a request done before its passed to the NIC and progress might not be made + * elsewhere we cannot allow a process to exit the barrier until its last + * [round of] sends are completed. + * + * It is last round of sends rather than 'last' individual send as each pair of + * peers can use different channels/devices/btls and the receiver of one of + * these sends might be forced to wait as the sender + * leaves the collective and does not make progress until the next mpi call + * + */ + +/* + * Simple double ring version of barrier + * + * synchronous gurantee made by last ring of sends are synchronous + * + */ +int smpi_coll_tuned_barrier_ompi_doublering(MPI_Comm comm + ) +{ + int rank, size; + int left, right; + + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + + XBT_DEBUG("ompi_coll_tuned_barrier_ompi_doublering rank %d", rank); + + left = ((rank-1)%size); + right = ((rank+1)%size); + + if (rank > 0) { /* receive message from the left */ + smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, + MCA_COLL_BASE_TAG_BARRIER, comm, + MPI_STATUS_IGNORE); + } + + /* Send message to the right */ + smpi_mpi_send((void*)NULL, 0, MPI_BYTE, right, + MCA_COLL_BASE_TAG_BARRIER, + comm); + + /* root needs to receive from the last node */ + if (rank == 0) { + smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, + MCA_COLL_BASE_TAG_BARRIER, comm, + MPI_STATUS_IGNORE); + } + + /* Allow nodes to exit */ + if (rank > 0) { /* post Receive from left */ + smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, + MCA_COLL_BASE_TAG_BARRIER, comm, + MPI_STATUS_IGNORE); + } + + /* send message to the right one */ + smpi_mpi_send((void*)NULL, 0, MPI_BYTE, right, + MCA_COLL_BASE_TAG_BARRIER, + comm); + + /* rank 0 post receive from the last node */ + if (rank == 0) { + smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, left, + MCA_COLL_BASE_TAG_BARRIER, comm, + MPI_STATUS_IGNORE); + } + + return MPI_SUCCESS; + +} + + +/* + * To make synchronous, uses sync sends and sync sendrecvs + */ + +int smpi_coll_tuned_barrier_ompi_recursivedoubling(MPI_Comm comm + ) +{ + int rank, size, adjsize; + int mask, remote; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + XBT_DEBUG( + "ompi_coll_tuned_barrier_ompi_recursivedoubling rank %d", + rank); + + /* do nearest power of 2 less than size calc */ + for( adjsize = 1; adjsize <= size; adjsize <<= 1 ); + adjsize >>= 1; + + /* if size is not exact power of two, perform an extra step */ + if (adjsize != size) { + if (rank >= adjsize) { + /* send message to lower ranked node */ + remote = rank - adjsize; + smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, remote, + MCA_COLL_BASE_TAG_BARRIER, + NULL, 0, MPI_BYTE, remote, + MCA_COLL_BASE_TAG_BARRIER, + comm, MPI_STATUS_IGNORE); + + } else if (rank < (size - adjsize)) { + + /* receive message from high level rank */ + smpi_mpi_recv((void*)NULL, 0, MPI_BYTE, rank+adjsize, + MCA_COLL_BASE_TAG_BARRIER, comm, + MPI_STATUS_IGNORE); + + } + } + + /* exchange messages */ + if ( rank < adjsize ) { + mask = 0x1; + while ( mask < adjsize ) { + remote = rank ^ mask; + mask <<= 1; + if (remote >= adjsize) continue; + + /* post receive from the remote node */ + smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, remote, + MCA_COLL_BASE_TAG_BARRIER, + NULL, 0, MPI_BYTE, remote, + MCA_COLL_BASE_TAG_BARRIER, + comm, MPI_STATUS_IGNORE); + } + } + + /* non-power of 2 case */ + if (adjsize != size) { + if (rank < (size - adjsize)) { + /* send enter message to higher ranked node */ + remote = rank + adjsize; + smpi_mpi_send((void*)NULL, 0, MPI_BYTE, remote, + MCA_COLL_BASE_TAG_BARRIER, + comm); + + } + } + + return MPI_SUCCESS; + +} + + +/* + * To make synchronous, uses sync sends and sync sendrecvs + */ + +int smpi_coll_tuned_barrier_ompi_bruck(MPI_Comm comm + ) +{ + int rank, size; + int distance, to, from; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + XBT_DEBUG( + "ompi_coll_tuned_barrier_ompi_bruck rank %d", rank); + + /* exchange data with rank-2^k and rank+2^k */ + for (distance = 1; distance < size; distance <<= 1) { + from = (rank + size - distance) % size; + to = (rank + distance) % size; + + /* send message to lower ranked node */ + smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, to, + MCA_COLL_BASE_TAG_BARRIER, + NULL, 0, MPI_BYTE, from, + MCA_COLL_BASE_TAG_BARRIER, + comm, MPI_STATUS_IGNORE); + } + + return MPI_SUCCESS; + +} + + +/* + * To make synchronous, uses sync sends and sync sendrecvs + */ +/* special case for two processes */ +int smpi_coll_tuned_barrier_ompi_two_procs(MPI_Comm comm + ) +{ + int remote; + + remote = smpi_comm_rank(comm); + XBT_DEBUG( + "ompi_coll_tuned_barrier_ompi_two_procs rank %d", remote); + remote = (remote + 1) & 0x1; + + smpi_mpi_sendrecv(NULL, 0, MPI_BYTE, remote, + MCA_COLL_BASE_TAG_BARRIER, + NULL, 0, MPI_BYTE, remote, + MCA_COLL_BASE_TAG_BARRIER, + comm, MPI_STATUS_IGNORE); + return (MPI_SUCCESS); +} + + +/* + * Linear functions are copied from the BASIC coll module + * they do not segment the message and are simple implementations + * but for some small number of nodes and/or small data sizes they + * are just as fast as tuned/tree based segmenting operations + * and as such may be selected by the decision functions + * These are copied into this module due to the way we select modules + * in V1. i.e. in V2 we will handle this differently and so will not + * have to duplicate code. + * GEF Oct05 after asking Jeff. + */ + +/* copied function (with appropriate renaming) starts here */ + +int smpi_coll_tuned_barrier_ompi_basic_linear(MPI_Comm comm) +{ + int i; + int size = smpi_comm_size(comm); + int rank = smpi_comm_rank(comm); + + /* All non-root send & receive zero-length message. */ + + if (rank > 0) { + smpi_mpi_send (NULL, 0, MPI_BYTE, 0, + MCA_COLL_BASE_TAG_BARRIER, + comm); + + smpi_mpi_recv (NULL, 0, MPI_BYTE, 0, + MCA_COLL_BASE_TAG_BARRIER, + comm, MPI_STATUS_IGNORE); + } + + /* The root collects and broadcasts the messages. */ + + else { + MPI_Request* requests; + + requests = (MPI_Request*)malloc( size * sizeof(MPI_Request) ); + for (i = 1; i < size; ++i) { + requests[i] = smpi_mpi_irecv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, + MCA_COLL_BASE_TAG_BARRIER, comm + ); + } + smpi_mpi_waitall( size-1, requests+1, MPI_STATUSES_IGNORE ); + + for (i = 1; i < size; ++i) { + requests[i] = smpi_mpi_isend(NULL, 0, MPI_BYTE, i, + MCA_COLL_BASE_TAG_BARRIER, + comm + ); + } + smpi_mpi_waitall( size-1, requests+1, MPI_STATUSES_IGNORE ); + free( requests ); + } + + /* All done */ + + return MPI_SUCCESS; + +} +/* copied function (with appropriate renaming) ends here */ + +/* + * Another recursive doubling type algorithm, but in this case + * we go up the tree and back down the tree. + */ +int smpi_coll_tuned_barrier_ompi_tree(MPI_Comm comm) +{ + int rank, size, depth; + int jump, partner; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + XBT_DEBUG( + "ompi_coll_tuned_barrier_ompi_tree %d", + rank); + + /* Find the nearest power of 2 of the communicator size. */ + for(depth = 1; depth < size; depth <<= 1 ); + + for (jump=1; jump rank) { + smpi_mpi_recv (NULL, 0, MPI_BYTE, partner, + MCA_COLL_BASE_TAG_BARRIER, comm, + MPI_STATUS_IGNORE); + } else if (partner < rank) { + smpi_mpi_send (NULL, 0, MPI_BYTE, partner, + MCA_COLL_BASE_TAG_BARRIER, + comm); + } + } + } + + depth>>=1; + for (jump = depth; jump>0; jump>>=1) { + partner = rank ^ jump; + if (!(partner & (jump-1)) && partner < size) { + if (partner > rank) { + smpi_mpi_send (NULL, 0, MPI_BYTE, partner, + MCA_COLL_BASE_TAG_BARRIER, + comm); + } else if (partner < rank) { + smpi_mpi_recv (NULL, 0, MPI_BYTE, partner, + MCA_COLL_BASE_TAG_BARRIER, comm, + MPI_STATUS_IGNORE); + } + } + } + + return MPI_SUCCESS; +} diff --git a/src/smpi/colls/colls.h b/src/smpi/colls/colls.h index 12627c06c8..09e47ce818 100644 --- a/src/smpi/colls/colls.h +++ b/src/smpi/colls/colls.h @@ -229,7 +229,7 @@ COLL_REDUCE_SCATTERS(COLL_PROTO, COLL_NOsep) /************* - * REDUCE_SCATTER * + * SCATTER * *************/ #define COLL_SCATTER_SIG scatter, int, \ (void *sendbuf, int sendcount, MPI_Datatype sendtype,\ @@ -243,4 +243,22 @@ COLL_APPLY(action, COLL_SCATTER_SIG, ompi_binomial) COLL_SCATTERS(COLL_PROTO, COLL_NOsep) +/************* + * SCATTER * + *************/ +#define COLL_BARRIER_SIG barrier, int, \ + (MPI_Comm comm) + +#define COLL_BARRIERS(action, COLL_sep) \ +COLL_APPLY(action, COLL_BARRIER_SIG, ompi) COLL_sep \ +COLL_APPLY(action, COLL_BARRIER_SIG, ompi_basic_linear) COLL_sep \ +COLL_APPLY(action, COLL_BARRIER_SIG, ompi_two_procs) COLL_sep \ +COLL_APPLY(action, COLL_BARRIER_SIG, ompi_tree) COLL_sep \ +COLL_APPLY(action, COLL_BARRIER_SIG, ompi_bruck) COLL_sep \ +COLL_APPLY(action, COLL_BARRIER_SIG, ompi_recursivedoubling) COLL_sep \ +COLL_APPLY(action, COLL_BARRIER_SIG, ompi_doublering) + +COLL_BARRIERS(COLL_PROTO, COLL_NOsep) + + #endif diff --git a/src/smpi/colls/smpi_openmpi_selector.c b/src/smpi/colls/smpi_openmpi_selector.c index f5a23c4d92..819242fb32 100644 --- a/src/smpi/colls/smpi_openmpi_selector.c +++ b/src/smpi/colls/smpi_openmpi_selector.c @@ -99,27 +99,27 @@ int smpi_coll_tuned_alltoallv_ompi(void *sbuf, int *scounts, int *sdisps, comm); } -/* -void smpi_coll_tuned_barrier_ompi(MPI_Comm comm) + +int smpi_coll_tuned_barrier_ompi(MPI_Comm comm) { int communicator_size = smpi_comm_size(comm); if( 2 == communicator_size ) - return smpi_coll_tuned_barrier_intra_two_procs(comm, module); - * Basic optimisation. If we have a power of 2 number of nodes - * the use the recursive doubling algorithm, otherwise - * bruck is the one we want. + return smpi_coll_tuned_barrier_ompi_two_procs(comm); +/* * Basic optimisation. If we have a power of 2 number of nodes*/ +/* * the use the recursive doubling algorithm, otherwise*/ +/* * bruck is the one we want.*/ { - bool has_one = false; + int has_one = 0; for( ; communicator_size > 0; communicator_size >>= 1 ) { if( communicator_size & 0x1 ) { if( has_one ) - return smpi_coll_tuned_barrier_intra_bruck(comm, module); - has_one = true; + return smpi_coll_tuned_barrier_ompi_bruck(comm); + has_one = 1; } } } - return smpi_coll_tuned_barrier_intra_recursivedoubling(comm, module); -}*/ + return smpi_coll_tuned_barrier_ompi_recursivedoubling(comm); +} int smpi_coll_tuned_bcast_ompi(void *buff, int count, MPI_Datatype datatype, int root, diff --git a/src/smpi/smpi_coll.c b/src/smpi/smpi_coll.c index 50ee98edc4..172ddb167c 100644 --- a/src/smpi/smpi_coll.c +++ b/src/smpi/smpi_coll.c @@ -63,6 +63,13 @@ COLL_SCATTERS(COLL_DESCRIPTION, COLL_COMMA), {NULL, NULL, NULL} /* this array must be NULL terminated */ }; +s_mpi_coll_description_t mpi_coll_barrier_description[] = { + {"default", + "barrier default collective", + smpi_mpi_barrier}, +COLL_BARRIERS(COLL_DESCRIPTION, COLL_COMMA), + {NULL, NULL, NULL} /* this array must be NULL terminated */ +}; s_mpi_coll_description_t mpi_coll_alltoall_description[] = { {"default", "Ompi alltoall default collective", @@ -161,6 +168,7 @@ int (*mpi_coll_bcast_fun)(void *buf, int count, MPI_Datatype datatype, int root, int (*mpi_coll_reduce_fun)(void *buf, void *rbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); int (*mpi_coll_reduce_scatter_fun)(void *sbuf, void *rbuf, int *rcounts,MPI_Datatype dtype,MPI_Op op,MPI_Comm comm); int (*mpi_coll_scatter_fun)(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,int root, MPI_Comm comm); +int (*mpi_coll_barrier_fun)(MPI_Comm comm); struct s_proc_tree { int PROCTREE_A; int numChildren; diff --git a/src/smpi/smpi_global.c b/src/smpi/smpi_global.c index d980dfc6b5..c62c1036b9 100644 --- a/src/smpi/smpi_global.c +++ b/src/smpi/smpi_global.c @@ -420,6 +420,12 @@ int smpi_main(int (*realmain) (int argc, char *argv[]),int argc, char *argv[]) void *recvbuf, int recvcount, MPI_Datatype recvtype,\ int root, MPI_Comm comm)) mpi_coll_scatter_description[scatter_id].coll; + + int barrier_id = find_coll_description(mpi_coll_barrier_description, + sg_cfg_get_string("smpi/barrier")); + mpi_coll_barrier_fun = (int (*)(MPI_Comm comm)) + mpi_coll_barrier_description[barrier_id].coll; + smpi_global_init(); /* Clean IO before the run */ diff --git a/src/smpi/smpi_pmpi.c b/src/smpi/smpi_pmpi.c index 32fb308b0b..8e890ec452 100644 --- a/src/smpi/smpi_pmpi.c +++ b/src/smpi/smpi_pmpi.c @@ -1579,7 +1579,7 @@ int PMPI_Barrier(MPI_Comm comm) if (comm == MPI_COMM_NULL) { retval = MPI_ERR_COMM; } else { - smpi_mpi_barrier(comm); + mpi_coll_barrier_fun(comm); retval = MPI_SUCCESS; } #ifdef HAVE_TRACING