From: degomme Date: Mon, 10 Jun 2013 20:41:53 +0000 (+0200) Subject: add another ompi bcast algo X-Git-Tag: v3_9_90~294 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/9631558b72f9b30b510f23a391d77c61c0541bd5?ds=sidebyside add another ompi bcast algo --- diff --git a/buildtools/Cmake/AddTests.cmake b/buildtools/Cmake/AddTests.cmake index 7bd0ec8782..3f31945f9a 100644 --- a/buildtools/Cmake/AddTests.cmake +++ b/buildtools/Cmake/AddTests.cmake @@ -397,7 +397,7 @@ if(NOT enable_memcheck) ENDFOREACH() FOREACH (BCAST_COLL default arrival_nb arrival_pattern_aware arrival_pattern_aware_wait arrival_scatter binomial_tree flattree flattree_pipeline NTSB NTSL NTSL_Isend scatter_LR_allgather - scatter_rdb_allgather SMP_binary SMP_binomial SMP_linear ompi ompi_split_bintree) + scatter_rdb_allgather SMP_binary SMP_binomial SMP_linear ompi ompi_split_bintree ompi_pipeline) ADD_TEST(smpi-bcast-coll-${BCAST_COLL} ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg smpi/bcast:${BCAST_COLL} --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/bcast_coll.tesh) ENDFOREACH() FOREACH (REDUCE_COLL default arrival_pattern_aware binomial flat_tree NTSL scatter_gather ompi) diff --git a/buildtools/Cmake/DefinePackages.cmake b/buildtools/Cmake/DefinePackages.cmake index f9587289a9..2aaa4d8a06 100644 --- a/buildtools/Cmake/DefinePackages.cmake +++ b/buildtools/Cmake/DefinePackages.cmake @@ -184,6 +184,7 @@ set(SMPI_SRC src/smpi/colls/bcast-SMP-binomial.c src/smpi/colls/bcast-SMP-linear.c src/smpi/colls/bcast-ompi-split-bintree.c + src/smpi/colls/bcast-ompi-pipeline.c src/smpi/colls/reduce-arrival-pattern-aware.c src/smpi/colls/reduce-binomial.c src/smpi/colls/reduce-flat-tree.c diff --git a/src/smpi/colls/bcast-ompi-pipeline.c b/src/smpi/colls/bcast-ompi-pipeline.c new file mode 100644 index 0000000000..2371d10e99 --- /dev/null +++ b/src/smpi/colls/bcast-ompi-pipeline.c @@ -0,0 +1,378 @@ + #include "colls_private.h" + + +#define MAXTREEFANOUT 32 + +#define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT) \ + if( ((SEGSIZE) >= (TYPELNG)) && \ + ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) { \ + size_t residual; \ + (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG)); \ + residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG); \ + if( residual > ((TYPELNG) >> 1) ) \ + (SEGCOUNT)++; \ + } \ + + typedef struct ompi_coll_tree_t { + int32_t tree_root; + int32_t tree_fanout; + int32_t tree_bmtree; + int32_t tree_prev; + int32_t tree_next[MAXTREEFANOUT]; + int32_t tree_nextsize; + } ompi_coll_tree_t; + + ompi_coll_tree_t* + ompi_coll_tuned_topo_build_chain( int fanout, + MPI_Comm com, + int root ); + +ompi_coll_tree_t* +ompi_coll_tuned_topo_build_chain( int fanout, + MPI_Comm comm, + int root ) +{ + int rank, size; + int srank; /* shifted rank */ + int i,maxchainlen; + int mark,head,len; + ompi_coll_tree_t *chain; + + XBT_DEBUG("coll:tuned:topo:build_chain fo %d rt %d", fanout, root); + + /* + * Get size and rank of the process in this communicator + */ + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + + if( fanout < 1 ) { + XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout of ZERO, forcing to 1 (pipeline)!"); + fanout = 1; + } + if (fanout>MAXTREEFANOUT) { + XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout %d bigger than max %d, forcing to max!", fanout, MAXTREEFANOUT); + fanout = MAXTREEFANOUT; + } + + /* + * Allocate space for topology arrays if needed + */ + chain = (ompi_coll_tree_t*)malloc( sizeof(ompi_coll_tree_t) ); + if (!chain) { + XBT_DEBUG("coll:tuned:topo:build_chain PANIC out of memory"); + fflush(stdout); + return NULL; + } + chain->tree_root = MPI_UNDEFINED; + chain->tree_nextsize = -1; + for(i=0;itree_next[i] = -1; + + /* + * Set root & numchain + */ + chain->tree_root = root; + if( (size - 1) < fanout ) { + chain->tree_nextsize = size-1; + fanout = size-1; + } else { + chain->tree_nextsize = fanout; + } + + /* + * Shift ranks + */ + srank = rank - root; + if (srank < 0) srank += size; + + /* + * Special case - fanout == 1 + */ + if( fanout == 1 ) { + if( srank == 0 ) chain->tree_prev = -1; + else chain->tree_prev = (srank-1+root)%size; + + if( (srank + 1) >= size) { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + } else { + chain->tree_next[0] = (srank+1+root)%size; + chain->tree_nextsize = 1; + } + return chain; + } + + /* Let's handle the case where there is just one node in the communicator */ + if( size == 1 ) { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + chain->tree_prev = -1; + return chain; + } + /* + * Calculate maximum chain length + */ + maxchainlen = (size-1) / fanout; + if( (size-1) % fanout != 0 ) { + maxchainlen++; + mark = (size-1)%fanout; + } else { + mark = fanout+1; + } + + /* + * Find your own place in the list of shifted ranks + */ + if( srank != 0 ) { + int column; + if( srank-1 < (mark * maxchainlen) ) { + column = (srank-1)/maxchainlen; + head = 1+column*maxchainlen; + len = maxchainlen; + } else { + column = mark + (srank-1-mark*maxchainlen)/(maxchainlen-1); + head = mark*maxchainlen+1+(column-mark)*(maxchainlen-1); + len = maxchainlen-1; + } + + if( srank == head ) { + chain->tree_prev = 0; /*root*/ + } else { + chain->tree_prev = srank-1; /* rank -1 */ + } + if( srank == (head + len - 1) ) { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + } else { + if( (srank + 1) < size ) { + chain->tree_next[0] = srank+1; + chain->tree_nextsize = 1; + } else { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + } + } + } + + /* + * Unshift values + */ + if( rank == root ) { + chain->tree_prev = -1; + chain->tree_next[0] = (root+1)%size; + for( i = 1; i < fanout; i++ ) { + chain->tree_next[i] = chain->tree_next[i-1] + maxchainlen; + if( i > mark ) { + chain->tree_next[i]--; + } + chain->tree_next[i] %= size; + } + chain->tree_nextsize = fanout; + } else { + chain->tree_prev = (chain->tree_prev+root)%size; + if( chain->tree_next[0] != -1 ) { + chain->tree_next[0] = (chain->tree_next[0]+root)%size; + } + } + + return chain; +} + +smpi_coll_tuned_bcast_ompi_pipeline( void* buffer, + int original_count, + MPI_Datatype datatype, + int root, + MPI_Comm comm) +{ + int count_by_segment = original_count; + size_t type_size; + int segsize; + //mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module; + //mca_coll_tuned_comm_t *data = tuned_module->tuned_data; + +// return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, module, +// count_by_segment, data->cached_pipeline ); + ompi_coll_tree_t * tree = ompi_coll_tuned_topo_build_chain( 1, comm, root ); + int err = 0, line, i; + int rank, size; + int segindex; + int num_segments; /* Number of segments */ + int sendcount; /* number of elements sent in this segment */ + size_t realsegsize; + char *tmpbuf; + ptrdiff_t extent; + MPI_Request recv_reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL}; + MPI_Request *send_reqs = NULL; + int req_index; + + /** + * Determine number of elements sent per operation. + */ + type_size = smpi_datatype_size(datatype); + + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + xbt_assert( size > 1 ); + + + const double a_p16 = 3.2118e-6; /* [1 / byte] */ + const double b_p16 = 8.7936; + const double a_p64 = 2.3679e-6; /* [1 / byte] */ + const double b_p64 = 1.1787; + const double a_p128 = 1.6134e-6; /* [1 / byte] */ + const double b_p128 = 2.1102; + size_t message_size; + + /* else we need data size for decision function */ + message_size = type_size * (unsigned long)original_count; /* needed for decision */ + + if (size < (a_p128 * message_size + b_p128)) { + //Pipeline with 128KB segments + segsize = 1024 << 7; + }else if (size < (a_p64 * message_size + b_p64)) { + // Pipeline with 64KB segments + segsize = 1024 << 6; + }else if (size < (a_p16 * message_size + b_p16)) { + //Pipeline with 16KB segments + segsize = 1024 << 4; + } + + COLL_TUNED_COMPUTED_SEGCOUNT( segsize, type_size, count_by_segment ); + + XBT_DEBUG("coll:tuned:bcast_intra_pipeline rank %d ss %5d type_size %lu count_by_segment %d", + smpi_comm_rank(comm), segsize, (unsigned long)type_size, count_by_segment); + + + + extent = smpi_datatype_get_extent (datatype); + num_segments = (original_count + count_by_segment - 1) / count_by_segment; + realsegsize = count_by_segment * extent; + + /* Set the buffer pointers */ + tmpbuf = (char *) buffer; + + if( tree->tree_nextsize != 0 ) { + send_reqs = xbt_new(MPI_Request, tree->tree_nextsize ); + } + + /* Root code */ + if( rank == root ) { + /* + For each segment: + - send segment to all children. + The last segment may have less elements than other segments. + */ + sendcount = count_by_segment; + for( segindex = 0; segindex < num_segments; segindex++ ) { + if( segindex == (num_segments - 1) ) { + sendcount = original_count - segindex * count_by_segment; + } + for( i = 0; i < tree->tree_nextsize; i++ ) { + send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype, + tree->tree_next[i], + 777, comm); + } + + /* complete the sends before starting the next sends */ + smpi_mpi_waitall( tree->tree_nextsize, send_reqs, + MPI_STATUSES_IGNORE ); + + /* update tmp buffer */ + tmpbuf += realsegsize; + + } + } + + /* Intermediate nodes code */ + else if( tree->tree_nextsize > 0 ) { + /* + Create the pipeline. + 1) Post the first receive + 2) For segments 1 .. num_segments + - post new receive + - wait on the previous receive to complete + - send this data to children + 3) Wait on the last segment + 4) Compute number of elements in last segment. + 5) Send the last segment to children + */ + req_index = 0; + recv_reqs[req_index]=smpi_mpi_irecv(tmpbuf, count_by_segment, datatype, + tree->tree_prev, 777, + comm); + + for( segindex = 1; segindex < num_segments; segindex++ ) { + + req_index = req_index ^ 0x1; + + /* post new irecv */ + recv_reqs[req_index]= smpi_mpi_irecv( tmpbuf + realsegsize, count_by_segment, + datatype, tree->tree_prev, + 777, + comm); + + /* wait for and forward the previous segment to children */ + smpi_mpi_wait( &recv_reqs[req_index ^ 0x1], + MPI_STATUSES_IGNORE ); + + for( i = 0; i < tree->tree_nextsize; i++ ) { + send_reqs[i]=smpi_mpi_isend(tmpbuf, count_by_segment, datatype, + tree->tree_next[i], + 777, comm ); + } + + /* complete the sends before starting the next iteration */ + smpi_mpi_waitall( tree->tree_nextsize, send_reqs, + MPI_STATUSES_IGNORE ); + + /* Update the receive buffer */ + tmpbuf += realsegsize; + } + + /* Process the last segment */ + smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUSES_IGNORE ); + sendcount = original_count - (num_segments - 1) * count_by_segment; + for( i = 0; i < tree->tree_nextsize; i++ ) { + send_reqs[i] = smpi_mpi_isend(tmpbuf, sendcount, datatype, + tree->tree_next[i], + 777, comm); + } + + smpi_mpi_waitall( tree->tree_nextsize, send_reqs, + MPI_STATUSES_IGNORE ); + } + + /* Leaf nodes */ + else { + /* + Receive all segments from parent in a loop: + 1) post irecv for the first segment + 2) for segments 1 .. num_segments + - post irecv for the next segment + - wait on the previous segment to arrive + 3) wait for the last segment + */ + req_index = 0; + recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype, + tree->tree_prev, 777, + comm); + + for( segindex = 1; segindex < num_segments; segindex++ ) { + req_index = req_index ^ 0x1; + tmpbuf += realsegsize; + /* post receive for the next segment */ + recv_reqs[req_index] = smpi_mpi_irecv(tmpbuf, count_by_segment, datatype, + tree->tree_prev, 777, + comm); + /* wait on the previous segment */ + smpi_mpi_wait( &recv_reqs[req_index ^ 0x1], + MPI_STATUS_IGNORE ); + } + + smpi_mpi_wait( &recv_reqs[req_index], MPI_STATUS_IGNORE ); + } + + if( NULL != send_reqs ) free(send_reqs); + + return (MPI_SUCCESS); +} diff --git a/src/smpi/colls/colls.h b/src/smpi/colls/colls.h index ddbf183a8c..24cd91cac8 100644 --- a/src/smpi/colls/colls.h +++ b/src/smpi/colls/colls.h @@ -166,7 +166,8 @@ COLL_APPLY(action, COLL_BCAST_SIG, SMP_binary) COLL_sep \ COLL_APPLY(action, COLL_BCAST_SIG, SMP_binomial) COLL_sep \ COLL_APPLY(action, COLL_BCAST_SIG, SMP_linear) COLL_sep \ COLL_APPLY(action, COLL_BCAST_SIG, ompi) COLL_sep \ -COLL_APPLY(action, COLL_BCAST_SIG, ompi_split_bintree) +COLL_APPLY(action, COLL_BCAST_SIG, ompi_split_bintree) COLL_sep \ +COLL_APPLY(action, COLL_BCAST_SIG, ompi_pipeline) COLL_BCASTS(COLL_PROTO, COLL_NOsep) diff --git a/src/smpi/colls/smpi_openmpi_selector.c b/src/smpi/colls/smpi_openmpi_selector.c index 13e5a41ebb..5d7d5afa15 100644 --- a/src/smpi/colls/smpi_openmpi_selector.c +++ b/src/smpi/colls/smpi_openmpi_selector.c @@ -131,12 +131,12 @@ int smpi_coll_tuned_bcast_ompi(void *buff, int count, messages up to 36MB and communicator sizes up to 64 nodes */ const size_t small_message_size = 2048; const size_t intermediate_message_size = 370728; - //const double a_p16 = 3.2118e-6; /* [1 / byte] */ - //const double b_p16 = 8.7936; - //const double a_p64 = 2.3679e-6; /* [1 / byte] */ - //const double b_p64 = 1.1787; - //const double a_p128 = 1.6134e-6; /* [1 / byte] */ - //const double b_p128 = 2.1102; + const double a_p16 = 3.2118e-6; /* [1 / byte] */ + const double b_p16 = 8.7936; + const double a_p64 = 2.3679e-6; /* [1 / byte] */ + const double b_p64 = 1.1787; + const double a_p128 = 1.6134e-6; /* [1 / byte] */ + const double b_p128 = 2.1102; int communicator_size; //int segsize = 0; @@ -160,36 +160,35 @@ int smpi_coll_tuned_bcast_ompi(void *buff, int count, return smpi_coll_tuned_bcast_ompi_split_bintree(buff, count, datatype, root, comm); - } /* - Handle large message sizes + } + //Handle large message sizes else if (communicator_size < (a_p128 * message_size + b_p128)) { - Pipeline with 128KB segments - segsize = 1024 << 7; - return smpi_coll_tuned_bcast_flattree_pipeline (buff, count, datatype, - root, comm, module, - segsize); + //Pipeline with 128KB segments + //segsize = 1024 << 7; + return smpi_coll_tuned_bcast_ompi_pipeline (buff, count, datatype, + root, comm); + - }*/ else if (communicator_size < 13) { + } else if (communicator_size < 13) { // Split Binary with 8KB segments return smpi_coll_tuned_bcast_ompi_split_bintree(buff, count, datatype, root, comm); - } /*else if (communicator_size < (a_p64 * message_size + b_p64)) { + } else if (communicator_size < (a_p64 * message_size + b_p64)) { // Pipeline with 64KB segments - segsize = 1024 << 6; - return smpi_coll_tuned_bcast_intra_pipeline (buff, count, datatype, - root, comm, module, - segsize); + //segsize = 1024 << 6; + return smpi_coll_tuned_bcast_ompi_pipeline (buff, count, datatype, + root, comm); + } else if (communicator_size < (a_p16 * message_size + b_p16)) { - Pipeline with 16KB segments + //Pipeline with 16KB segments //segsize = 1024 << 4; - return smpi_coll_tuned_bcast_flattree_pipeline (buff, count, datatype, - root, comm, module, - segsize); - - }*/ + return smpi_coll_tuned_bcast_ompi_pipeline (buff, count, datatype, + root, comm); + + } /* Pipeline with 8KB segments */ //segsize = 1024 << 3; return smpi_coll_tuned_bcast_flattree_pipeline (buff, count, datatype,