From 32b45cab9d4731c6cc648f79f1bebce2d1c7ca9a Mon Sep 17 00:00:00 2001 From: degomme Date: Tue, 11 Jun 2013 01:59:42 +0200 Subject: [PATCH] add new algos for reduce from ompi --- buildtools/Cmake/AddTests.cmake | 2 +- buildtools/Cmake/DefinePackages.cmake | 2 + src/smpi/colls/bcast-ompi-pipeline.c | 182 +----- src/smpi/colls/bcast-ompi-split-bintree.c | 165 +----- src/smpi/colls/coll_tuned_topo.c | 629 ++++++++++++++++++++ src/smpi/colls/coll_tuned_topo.h | 70 +++ src/smpi/colls/colls.h | 8 +- src/smpi/colls/reduce-ompi.c | 683 ++++++++++++++++++++++ src/smpi/colls/smpi_openmpi_selector.c | 50 +- 9 files changed, 1422 insertions(+), 369 deletions(-) create mode 100644 src/smpi/colls/coll_tuned_topo.c create mode 100644 src/smpi/colls/coll_tuned_topo.h create mode 100644 src/smpi/colls/reduce-ompi.c diff --git a/buildtools/Cmake/AddTests.cmake b/buildtools/Cmake/AddTests.cmake index 3f31945f9a..139057c397 100644 --- a/buildtools/Cmake/AddTests.cmake +++ b/buildtools/Cmake/AddTests.cmake @@ -400,7 +400,7 @@ if(NOT enable_memcheck) scatter_rdb_allgather SMP_binary SMP_binomial SMP_linear ompi ompi_split_bintree ompi_pipeline) ADD_TEST(smpi-bcast-coll-${BCAST_COLL} ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg smpi/bcast:${BCAST_COLL} --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/bcast_coll.tesh) ENDFOREACH() - FOREACH (REDUCE_COLL default arrival_pattern_aware binomial flat_tree NTSL scatter_gather ompi) + FOREACH (REDUCE_COLL default arrival_pattern_aware binomial flat_tree NTSL scatter_gather ompi ompi_chain ompi_binary ompi_basic_linear ompi_binomial ompi_in_order_binary) ADD_TEST(smpi-reduce-coll-${REDUCE_COLL} ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg smpi/reduce:${REDUCE_COLL} --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/reduce_coll.tesh) ENDFOREACH() diff --git a/buildtools/Cmake/DefinePackages.cmake b/buildtools/Cmake/DefinePackages.cmake index 2aaa4d8a06..0f0b372bd3 100644 --- a/buildtools/Cmake/DefinePackages.cmake +++ b/buildtools/Cmake/DefinePackages.cmake @@ -183,6 +183,7 @@ set(SMPI_SRC src/smpi/colls/bcast-SMP-binary.c src/smpi/colls/bcast-SMP-binomial.c src/smpi/colls/bcast-SMP-linear.c + src/smpi/colls/coll_tuned_topo.c src/smpi/colls/bcast-ompi-split-bintree.c src/smpi/colls/bcast-ompi-pipeline.c src/smpi/colls/reduce-arrival-pattern-aware.c @@ -190,6 +191,7 @@ set(SMPI_SRC src/smpi/colls/reduce-flat-tree.c src/smpi/colls/reduce-NTSL.c src/smpi/colls/reduce-scatter-gather.c + src/smpi/colls/reduce-ompi.c ) if(SMPI_F2C) diff --git a/src/smpi/colls/bcast-ompi-pipeline.c b/src/smpi/colls/bcast-ompi-pipeline.c index 2371d10e99..63c206784a 100644 --- a/src/smpi/colls/bcast-ompi-pipeline.c +++ b/src/smpi/colls/bcast-ompi-pipeline.c @@ -1,184 +1,10 @@ #include "colls_private.h" - + #include "coll_tuned_topo.h" #define MAXTREEFANOUT 32 -#define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT) \ - if( ((SEGSIZE) >= (TYPELNG)) && \ - ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) { \ - size_t residual; \ - (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG)); \ - residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG); \ - if( residual > ((TYPELNG) >> 1) ) \ - (SEGCOUNT)++; \ - } \ - - typedef struct ompi_coll_tree_t { - int32_t tree_root; - int32_t tree_fanout; - int32_t tree_bmtree; - int32_t tree_prev; - int32_t tree_next[MAXTREEFANOUT]; - int32_t tree_nextsize; - } ompi_coll_tree_t; - - ompi_coll_tree_t* - ompi_coll_tuned_topo_build_chain( int fanout, - MPI_Comm com, - int root ); - -ompi_coll_tree_t* -ompi_coll_tuned_topo_build_chain( int fanout, - MPI_Comm comm, - int root ) -{ - int rank, size; - int srank; /* shifted rank */ - int i,maxchainlen; - int mark,head,len; - ompi_coll_tree_t *chain; - - XBT_DEBUG("coll:tuned:topo:build_chain fo %d rt %d", fanout, root); - - /* - * Get size and rank of the process in this communicator - */ - size = smpi_comm_size(comm); - rank = smpi_comm_rank(comm); - - if( fanout < 1 ) { - XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout of ZERO, forcing to 1 (pipeline)!"); - fanout = 1; - } - if (fanout>MAXTREEFANOUT) { - XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout %d bigger than max %d, forcing to max!", fanout, MAXTREEFANOUT); - fanout = MAXTREEFANOUT; - } - - /* - * Allocate space for topology arrays if needed - */ - chain = (ompi_coll_tree_t*)malloc( sizeof(ompi_coll_tree_t) ); - if (!chain) { - XBT_DEBUG("coll:tuned:topo:build_chain PANIC out of memory"); - fflush(stdout); - return NULL; - } - chain->tree_root = MPI_UNDEFINED; - chain->tree_nextsize = -1; - for(i=0;itree_next[i] = -1; - - /* - * Set root & numchain - */ - chain->tree_root = root; - if( (size - 1) < fanout ) { - chain->tree_nextsize = size-1; - fanout = size-1; - } else { - chain->tree_nextsize = fanout; - } - - /* - * Shift ranks - */ - srank = rank - root; - if (srank < 0) srank += size; - - /* - * Special case - fanout == 1 - */ - if( fanout == 1 ) { - if( srank == 0 ) chain->tree_prev = -1; - else chain->tree_prev = (srank-1+root)%size; - - if( (srank + 1) >= size) { - chain->tree_next[0] = -1; - chain->tree_nextsize = 0; - } else { - chain->tree_next[0] = (srank+1+root)%size; - chain->tree_nextsize = 1; - } - return chain; - } - - /* Let's handle the case where there is just one node in the communicator */ - if( size == 1 ) { - chain->tree_next[0] = -1; - chain->tree_nextsize = 0; - chain->tree_prev = -1; - return chain; - } - /* - * Calculate maximum chain length - */ - maxchainlen = (size-1) / fanout; - if( (size-1) % fanout != 0 ) { - maxchainlen++; - mark = (size-1)%fanout; - } else { - mark = fanout+1; - } - - /* - * Find your own place in the list of shifted ranks - */ - if( srank != 0 ) { - int column; - if( srank-1 < (mark * maxchainlen) ) { - column = (srank-1)/maxchainlen; - head = 1+column*maxchainlen; - len = maxchainlen; - } else { - column = mark + (srank-1-mark*maxchainlen)/(maxchainlen-1); - head = mark*maxchainlen+1+(column-mark)*(maxchainlen-1); - len = maxchainlen-1; - } - - if( srank == head ) { - chain->tree_prev = 0; /*root*/ - } else { - chain->tree_prev = srank-1; /* rank -1 */ - } - if( srank == (head + len - 1) ) { - chain->tree_next[0] = -1; - chain->tree_nextsize = 0; - } else { - if( (srank + 1) < size ) { - chain->tree_next[0] = srank+1; - chain->tree_nextsize = 1; - } else { - chain->tree_next[0] = -1; - chain->tree_nextsize = 0; - } - } - } - - /* - * Unshift values - */ - if( rank == root ) { - chain->tree_prev = -1; - chain->tree_next[0] = (root+1)%size; - for( i = 1; i < fanout; i++ ) { - chain->tree_next[i] = chain->tree_next[i-1] + maxchainlen; - if( i > mark ) { - chain->tree_next[i]--; - } - chain->tree_next[i] %= size; - } - chain->tree_nextsize = fanout; - } else { - chain->tree_prev = (chain->tree_prev+root)%size; - if( chain->tree_next[0] != -1 ) { - chain->tree_next[0] = (chain->tree_next[0]+root)%size; - } - } - - return chain; -} -smpi_coll_tuned_bcast_ompi_pipeline( void* buffer, +int smpi_coll_tuned_bcast_ompi_pipeline( void* buffer, int original_count, MPI_Datatype datatype, int root, @@ -186,14 +12,14 @@ smpi_coll_tuned_bcast_ompi_pipeline( void* buffer, { int count_by_segment = original_count; size_t type_size; - int segsize; + int segsize =1024 << 7; //mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module; //mca_coll_tuned_comm_t *data = tuned_module->tuned_data; // return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, module, // count_by_segment, data->cached_pipeline ); ompi_coll_tree_t * tree = ompi_coll_tuned_topo_build_chain( 1, comm, root ); - int err = 0, line, i; + int i; int rank, size; int segindex; int num_segments; /* Number of segments */ diff --git a/src/smpi/colls/bcast-ompi-split-bintree.c b/src/smpi/colls/bcast-ompi-split-bintree.c index a68ebd2bca..f1201d1c4b 100644 --- a/src/smpi/colls/bcast-ompi-split-bintree.c +++ b/src/smpi/colls/bcast-ompi-split-bintree.c @@ -53,171 +53,8 @@ */ #include "colls_private.h" + #include "coll_tuned_topo.h" #define MAXTREEFANOUT 32 - typedef struct ompi_coll_tree_t { - int32_t tree_root; - int32_t tree_fanout; - int32_t tree_bmtree; - int32_t tree_prev; - int32_t tree_next[MAXTREEFANOUT]; - int32_t tree_nextsize; - } ompi_coll_tree_t; - - ompi_coll_tree_t* - ompi_coll_tuned_topo_build_tree( int fanout, - MPI_Comm com, - int root ); - - -/* - * Some static helpers. - */ -static int pown( int fanout, int num ) -{ - int j, p = 1; - if( num < 0 ) return 0; - if (1==num) return fanout; - if (2==fanout) { - return p<MAXTREEFANOUT) { - XBT_DEBUG("coll:tuned:topo_build_tree invalid fanout %d bigger than max %d", fanout, MAXTREEFANOUT); - return NULL; - } - - /* - * Get size and rank of the process in this communicator - */ - size = smpi_comm_size(comm); - rank = smpi_comm_rank(comm); - - tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t)); - if (!tree) { - XBT_DEBUG("coll:tuned:topo_build_tree PANIC::out of memory"); - return NULL; - } - - tree->tree_root = MPI_UNDEFINED; - tree->tree_nextsize = MPI_UNDEFINED; - - /* - * Set root - */ - tree->tree_root = root; - - /* - * Initialize tree - */ - tree->tree_fanout = fanout; - tree->tree_bmtree = 0; - tree->tree_root = root; - tree->tree_prev = -1; - tree->tree_nextsize = 0; - for( i = 0; i < fanout; i++ ) { - tree->tree_next[i] = -1; - } - - /* return if we have less than 2 processes */ - if( size < 2 ) { - return tree; - } - - /* - * Shift all ranks by root, so that the algorithm can be - * designed as if root would be always 0 - * shiftedrank should be used in calculating distances - * and position in tree - */ - shiftedrank = rank - root; - if( shiftedrank < 0 ) { - shiftedrank += size; - } - - /* calculate my level */ - level = calculate_level( fanout, shiftedrank ); - delta = pown( fanout, level ); - - /* find my children */ - for( i = 0; i < fanout; i++ ) { - schild = shiftedrank + delta * (i+1); - if( schild < size ) { - tree->tree_next[i] = (schild+root)%size; - tree->tree_nextsize = tree->tree_nextsize + 1; - } else { - break; - } - } - - /* find my parent */ - slimit = calculate_num_nodes_up_to_level( fanout, level ); - sparent = shiftedrank; - if( sparent < fanout ) { - sparent = 0; - } else { - while( sparent >= slimit ) { - sparent -= delta/fanout; - } - } - tree->tree_prev = (sparent+root)%size; - - return tree; -} - int smpi_coll_tuned_bcast_ompi_split_bintree ( void* buffer, diff --git a/src/smpi/colls/coll_tuned_topo.c b/src/smpi/colls/coll_tuned_topo.c new file mode 100644 index 0000000000..ce7cec739e --- /dev/null +++ b/src/smpi/colls/coll_tuned_topo.c @@ -0,0 +1,629 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "colls_private.h" +#include "coll_tuned_topo.h" +/* + * Some static helpers. + */ +static int pown( int fanout, int num ) +{ + int j, p = 1; + if( num < 0 ) return 0; + if (1==num) return fanout; + if (2==fanout) { + return p<MAXTREEFANOUT) { + XBT_DEBUG("coll:tuned:topo_build_tree invalid fanout %d bigger than max %d", fanout, MAXTREEFANOUT); + return NULL; + } + + /* + * Get size and rank of the process in this communicator + */ + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + + tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t)); + if (!tree) { + XBT_DEBUG("coll:tuned:topo_build_tree PANIC::out of memory"); + return NULL; + } + + tree->tree_root = MPI_UNDEFINED; + tree->tree_nextsize = MPI_UNDEFINED; + + /* + * Set root + */ + tree->tree_root = root; + + /* + * Initialize tree + */ + tree->tree_fanout = fanout; + tree->tree_bmtree = 0; + tree->tree_root = root; + tree->tree_prev = -1; + tree->tree_nextsize = 0; + for( i = 0; i < fanout; i++ ) { + tree->tree_next[i] = -1; + } + + /* return if we have less than 2 processes */ + if( size < 2 ) { + return tree; + } + + /* + * Shift all ranks by root, so that the algorithm can be + * designed as if root would be always 0 + * shiftedrank should be used in calculating distances + * and position in tree + */ + shiftedrank = rank - root; + if( shiftedrank < 0 ) { + shiftedrank += size; + } + + /* calculate my level */ + level = calculate_level( fanout, shiftedrank ); + delta = pown( fanout, level ); + + /* find my children */ + for( i = 0; i < fanout; i++ ) { + schild = shiftedrank + delta * (i+1); + if( schild < size ) { + tree->tree_next[i] = (schild+root)%size; + tree->tree_nextsize = tree->tree_nextsize + 1; + } else { + break; + } + } + + /* find my parent */ + slimit = calculate_num_nodes_up_to_level( fanout, level ); + sparent = shiftedrank; + if( sparent < fanout ) { + sparent = 0; + } else { + while( sparent >= slimit ) { + sparent -= delta/fanout; + } + } + tree->tree_prev = (sparent+root)%size; + + return tree; +} + +/* + * Constructs in-order binary tree which can be used for non-commutative reduce + * operations. + * Root of this tree is always rank (size-1) and fanout is 2. + * Here are some of the examples of this tree: + * size == 2 size == 3 size == 4 size == 9 + * 1 2 3 8 + * / / \ / \ / \ + * 0 1 0 2 1 7 3 + * / / \ / \ + * 0 6 5 2 1 + * / / + * 4 0 + */ +ompi_coll_tree_t* +ompi_coll_tuned_topo_build_in_order_bintree( MPI_Comm comm ) +{ + int rank, size; + int myrank, rightsize, delta; + int parent, lchild, rchild; + ompi_coll_tree_t* tree; + + /* + * Get size and rank of the process in this communicator + */ + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + + tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t)); + if (!tree) { + XBT_DEBUG( + "coll:tuned:topo_build_tree PANIC::out of memory"); + return NULL; + } + + tree->tree_root = MPI_UNDEFINED; + tree->tree_nextsize = MPI_UNDEFINED; + + /* + * Initialize tree + */ + tree->tree_fanout = 2; + tree->tree_bmtree = 0; + tree->tree_root = size - 1; + tree->tree_prev = -1; + tree->tree_nextsize = 0; + tree->tree_next[0] = -1; + tree->tree_next[1] = -1; + XBT_DEBUG( + "coll:tuned:topo_build_in_order_tree Building fo %d rt %d", + tree->tree_fanout, tree->tree_root); + + /* + * Build the tree + */ + myrank = rank; + parent = size - 1; + delta = 0; + + while ( 1 ) { + /* Compute the size of the right subtree */ + rightsize = size >> 1; + + /* Determine the left and right child of this parent */ + lchild = -1; + rchild = -1; + if (size - 1 > 0) { + lchild = parent - 1; + if (lchild > 0) { + rchild = rightsize - 1; + } + } + + /* The following cases are possible: myrank can be + - a parent, + - belong to the left subtree, or + - belong to the right subtee + Each of the cases need to be handled differently. + */ + + if (myrank == parent) { + /* I am the parent: + - compute real ranks of my children, and exit the loop. */ + if (lchild >= 0) tree->tree_next[0] = lchild + delta; + if (rchild >= 0) tree->tree_next[1] = rchild + delta; + break; + } + if (myrank > rchild) { + /* I belong to the left subtree: + - If I am the left child, compute real rank of my parent + - Iterate down through tree: + compute new size, shift ranks down, and update delta. + */ + if (myrank == lchild) { + tree->tree_prev = parent + delta; + } + size = size - rightsize - 1; + delta = delta + rightsize; + myrank = myrank - rightsize; + parent = size - 1; + + } else { + /* I belong to the right subtree: + - If I am the right child, compute real rank of my parent + - Iterate down through tree: + compute new size and parent, + but the delta and rank do not need to change. + */ + if (myrank == rchild) { + tree->tree_prev = parent + delta; + } + size = rightsize; + parent = rchild; + } + } + + if (tree->tree_next[0] >= 0) { tree->tree_nextsize = 1; } + if (tree->tree_next[1] >= 0) { tree->tree_nextsize += 1; } + + return tree; +} + +int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree ) +{ + ompi_coll_tree_t *ptr; + + if ((!tree)||(!*tree)) { + return MPI_SUCCESS; + } + + ptr = *tree; + + free (ptr); + *tree = NULL; /* mark tree as gone */ + + return MPI_SUCCESS; +} + +/* + * + * Here are some of the examples of this tree: + * size == 2 size = 4 size = 8 + * 0 0 0 + * / | \ / | \ + * 1 2 1 4 2 1 + * | | |\ + * 3 6 5 3 + * | + * 7 + */ +ompi_coll_tree_t* +ompi_coll_tuned_topo_build_bmtree( MPI_Comm comm, + int root ) +{ + int childs = 0; + int rank; + int size; + int mask = 1; + int index; + int remote; + ompi_coll_tree_t *bmtree; + int i; + + XBT_DEBUG("coll:tuned:topo:build_bmtree rt %d", root); + + /* + * Get size and rank of the process in this communicator + */ + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + + index = rank -root; + + bmtree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t)); + if (!bmtree) { + XBT_DEBUG("coll:tuned:topo:build_bmtree PANIC out of memory"); + return NULL; + } + + bmtree->tree_bmtree = 1; + + bmtree->tree_root = MPI_UNDEFINED; + bmtree->tree_nextsize = MPI_UNDEFINED; + for(i=0;itree_next[i] = -1; + } + + if( index < 0 ) index += size; + + while( mask <= index ) mask <<= 1; + + /* Now I can compute my father rank */ + if( root == rank ) { + bmtree->tree_prev = root; + } else { + remote = (index ^ (mask >> 1)) + root; + if( remote >= size ) remote -= size; + bmtree->tree_prev = remote; + } + /* And now let's fill my childs */ + while( mask < size ) { + remote = (index ^ mask); + if( remote >= size ) break; + remote += root; + if( remote >= size ) remote -= size; + if (childs==MAXTREEFANOUT) { + XBT_DEBUG("coll:tuned:topo:build_bmtree max fanout incorrect %d needed %d", MAXTREEFANOUT, childs); + return NULL; + } + bmtree->tree_next[childs] = remote; + mask <<= 1; + childs++; + } + bmtree->tree_nextsize = childs; + bmtree->tree_root = root; + return bmtree; +} + +/* + * Constructs in-order binomial tree which can be used for gather/scatter + * operations. + * + * Here are some of the examples of this tree: + * size == 2 size = 4 size = 8 + * 0 0 0 + * / / | / | \ + * 1 1 2 1 2 4 + * | | | \ + * 3 3 5 6 + * | + * 7 + */ +ompi_coll_tree_t* +ompi_coll_tuned_topo_build_in_order_bmtree( MPI_Comm comm, + int root ) +{ + int childs = 0; + int rank, vrank; + int size; + int mask = 1; + int remote; + ompi_coll_tree_t *bmtree; + int i; + + XBT_DEBUG("coll:tuned:topo:build_in_order_bmtree rt %d", root); + + /* + * Get size and rank of the process in this communicator + */ + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + + vrank = (rank - root + size) % size; + + bmtree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t)); + if (!bmtree) { + XBT_DEBUG("coll:tuned:topo:build_bmtree PANIC out of memory"); + return NULL; + } + + bmtree->tree_bmtree = 1; + bmtree->tree_root = MPI_UNDEFINED; + bmtree->tree_nextsize = MPI_UNDEFINED; + for(i=0;itree_next[i] = -1; + } + + if (root == rank) { + bmtree->tree_prev = root; + } + + while (mask < size) { + remote = vrank ^ mask; + if (remote < vrank) { + bmtree->tree_prev = (remote + root) % size; + break; + } else if (remote < size) { + bmtree->tree_next[childs] = (remote + root) % size; + childs++; + if (childs==MAXTREEFANOUT) { + XBT_DEBUG( + "coll:tuned:topo:build_bmtree max fanout incorrect %d needed %d", + MAXTREEFANOUT, childs); + return NULL; + } + } + mask <<= 1; + } + bmtree->tree_nextsize = childs; + bmtree->tree_root = root; + + return bmtree; +} + + +ompi_coll_tree_t* +ompi_coll_tuned_topo_build_chain( int fanout, + MPI_Comm comm, + int root ) +{ + int rank, size; + int srank; /* shifted rank */ + int i,maxchainlen; + int mark,head,len; + ompi_coll_tree_t *chain; + + XBT_DEBUG("coll:tuned:topo:build_chain fo %d rt %d", fanout, root); + + /* + * Get size and rank of the process in this communicator + */ + size = smpi_comm_size(comm); + rank = smpi_comm_rank(comm); + + if( fanout < 1 ) { + XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout of ZERO, forcing to 1 (pipeline)!"); + fanout = 1; + } + if (fanout>MAXTREEFANOUT) { + XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout %d bigger than max %d, forcing to max!", fanout, MAXTREEFANOUT); + fanout = MAXTREEFANOUT; + } + + /* + * Allocate space for topology arrays if needed + */ + chain = (ompi_coll_tree_t*)malloc( sizeof(ompi_coll_tree_t) ); + if (!chain) { + XBT_DEBUG("coll:tuned:topo:build_chain PANIC out of memory"); + fflush(stdout); + return NULL; + } + chain->tree_root = MPI_UNDEFINED; + chain->tree_nextsize = -1; + for(i=0;itree_next[i] = -1; + + /* + * Set root & numchain + */ + chain->tree_root = root; + if( (size - 1) < fanout ) { + chain->tree_nextsize = size-1; + fanout = size-1; + } else { + chain->tree_nextsize = fanout; + } + + /* + * Shift ranks + */ + srank = rank - root; + if (srank < 0) srank += size; + + /* + * Special case - fanout == 1 + */ + if( fanout == 1 ) { + if( srank == 0 ) chain->tree_prev = -1; + else chain->tree_prev = (srank-1+root)%size; + + if( (srank + 1) >= size) { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + } else { + chain->tree_next[0] = (srank+1+root)%size; + chain->tree_nextsize = 1; + } + return chain; + } + + /* Let's handle the case where there is just one node in the communicator */ + if( size == 1 ) { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + chain->tree_prev = -1; + return chain; + } + /* + * Calculate maximum chain length + */ + maxchainlen = (size-1) / fanout; + if( (size-1) % fanout != 0 ) { + maxchainlen++; + mark = (size-1)%fanout; + } else { + mark = fanout+1; + } + + /* + * Find your own place in the list of shifted ranks + */ + if( srank != 0 ) { + int column; + if( srank-1 < (mark * maxchainlen) ) { + column = (srank-1)/maxchainlen; + head = 1+column*maxchainlen; + len = maxchainlen; + } else { + column = mark + (srank-1-mark*maxchainlen)/(maxchainlen-1); + head = mark*maxchainlen+1+(column-mark)*(maxchainlen-1); + len = maxchainlen-1; + } + + if( srank == head ) { + chain->tree_prev = 0; /*root*/ + } else { + chain->tree_prev = srank-1; /* rank -1 */ + } + if( srank == (head + len - 1) ) { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + } else { + if( (srank + 1) < size ) { + chain->tree_next[0] = srank+1; + chain->tree_nextsize = 1; + } else { + chain->tree_next[0] = -1; + chain->tree_nextsize = 0; + } + } + } + + /* + * Unshift values + */ + if( rank == root ) { + chain->tree_prev = -1; + chain->tree_next[0] = (root+1)%size; + for( i = 1; i < fanout; i++ ) { + chain->tree_next[i] = chain->tree_next[i-1] + maxchainlen; + if( i > mark ) { + chain->tree_next[i]--; + } + chain->tree_next[i] %= size; + } + chain->tree_nextsize = fanout; + } else { + chain->tree_prev = (chain->tree_prev+root)%size; + if( chain->tree_next[0] != -1 ) { + chain->tree_next[0] = (chain->tree_next[0]+root)%size; + } + } + + return chain; +} + +int ompi_coll_tuned_topo_dump_tree (ompi_coll_tree_t* tree, int rank) +{ + int i; + + XBT_DEBUG("coll:tuned:topo:topo_dump_tree %1d tree root %d" + " fanout %d BM %1d nextsize %d prev %d", + rank, tree->tree_root, tree->tree_bmtree, tree->tree_fanout, + tree->tree_nextsize, tree->tree_prev); + if( tree->tree_nextsize ) { + for( i = 0; i < tree->tree_nextsize; i++ ) + XBT_DEBUG("[%1d] %d", i, tree->tree_next[i]); + } + return (0); +} diff --git a/src/smpi/colls/coll_tuned_topo.h b/src/smpi/colls/coll_tuned_topo.h new file mode 100644 index 0000000000..73b0361356 --- /dev/null +++ b/src/smpi/colls/coll_tuned_topo.h @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_COLL_TUNED_TOPO_H_HAS_BEEN_INCLUDED +#define MCA_COLL_TUNED_TOPO_H_HAS_BEEN_INCLUDED + +#include "colls_private.h" + +#define MAXTREEFANOUT 32 + +#define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT) \ + if( ((SEGSIZE) >= (TYPELNG)) && \ + ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) { \ + size_t residual; \ + (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG)); \ + residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG); \ + if( residual > ((TYPELNG) >> 1) ) \ + (SEGCOUNT)++; \ + } \ + + + typedef struct ompi_coll_tree_t { + int32_t tree_root; + int32_t tree_fanout; + int32_t tree_bmtree; + int32_t tree_prev; + int32_t tree_next[MAXTREEFANOUT]; + int32_t tree_nextsize; + } ompi_coll_tree_t; + + ompi_coll_tree_t* + ompi_coll_tuned_topo_build_tree( int fanout, + MPI_Comm com, + int root ); + ompi_coll_tree_t* + ompi_coll_tuned_topo_build_in_order_bintree( MPI_Comm comm ); + + ompi_coll_tree_t* + ompi_coll_tuned_topo_build_bmtree( MPI_Comm comm, + int root ); + ompi_coll_tree_t* + ompi_coll_tuned_topo_build_in_order_bmtree( MPI_Comm comm, + int root ); + ompi_coll_tree_t* + ompi_coll_tuned_topo_build_chain( int fanout, + MPI_Comm com, + int root ); + + int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree ); + + /* debugging stuff, will be removed later */ + int ompi_coll_tuned_topo_dump_tree (ompi_coll_tree_t* tree, int rank); + +#endif /* MCA_COLL_TUNED_TOPO_H_HAS_BEEN_INCLUDED */ + diff --git a/src/smpi/colls/colls.h b/src/smpi/colls/colls.h index 24cd91cac8..9af9ad3751 100644 --- a/src/smpi/colls/colls.h +++ b/src/smpi/colls/colls.h @@ -185,7 +185,13 @@ COLL_APPLY(action, COLL_REDUCE_SIG, binomial) COLL_sep \ COLL_APPLY(action, COLL_REDUCE_SIG, flat_tree) COLL_sep \ COLL_APPLY(action, COLL_REDUCE_SIG, NTSL) COLL_sep \ COLL_APPLY(action, COLL_REDUCE_SIG, scatter_gather) COLL_sep \ -COLL_APPLY(action, COLL_REDUCE_SIG, ompi) +COLL_APPLY(action, COLL_REDUCE_SIG, ompi) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SIG, ompi_chain) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SIG, ompi_pipeline) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SIG, ompi_basic_linear) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SIG, ompi_in_order_binary) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SIG, ompi_binary) COLL_sep \ +COLL_APPLY(action, COLL_REDUCE_SIG, ompi_binomial) COLL_REDUCES(COLL_PROTO, COLL_NOsep) diff --git a/src/smpi/colls/reduce-ompi.c b/src/smpi/colls/reduce-ompi.c new file mode 100644 index 0000000000..6dc846cd59 --- /dev/null +++ b/src/smpi/colls/reduce-ompi.c @@ -0,0 +1,683 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2009 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "colls_private.h" +#include "coll_tuned_topo.h" +#define MCA_COLL_BASE_TAG_REDUCE 555 + + + +int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int original_count, + MPI_Datatype datatype, MPI_Op op, + int root, MPI_Comm comm, + ompi_coll_tree_t* tree, int count_by_segment, + int max_outstanding_reqs ); +/** + * This is a generic implementation of the reduce protocol. It used the tree + * provided as an argument and execute all operations using a segment of + * count times a datatype. + * For the last communication it will update the count in order to limit + * the number of datatype to the original count (original_count) + * + * Note that for non-commutative operations we cannot save memory copy + * for the first block: thus we must copy sendbuf to accumbuf on intermediate + * to keep the optimized loop happy. + */ +int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int original_count, + MPI_Datatype datatype, MPI_Op op, + int root, MPI_Comm comm, + ompi_coll_tree_t* tree, int count_by_segment, + int max_outstanding_reqs ) +{ + char *inbuf[2] = {NULL, NULL}, *inbuf_free[2] = {NULL, NULL}; + char *accumbuf = NULL, *accumbuf_free = NULL; + char *local_op_buffer = NULL, *sendtmpbuf = NULL; + ptrdiff_t extent, lower_bound, segment_increment; + size_t typelng; + MPI_Request reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL}; + int num_segments, line, ret, segindex, i, rank; + int recvcount, prevcount, inbi; + + /** + * Determine number of segments and number of elements + * sent per operation + */ + smpi_datatype_extent( datatype, &lower_bound, &extent); + typelng = smpi_datatype_size( datatype ); + num_segments = (original_count + count_by_segment - 1) / count_by_segment; + segment_increment = count_by_segment * extent; + + sendtmpbuf = (char*) sendbuf; + if( sendbuf == MPI_IN_PLACE ) { + sendtmpbuf = (char *)recvbuf; + } + + XBT_DEBUG( "coll:tuned:reduce_generic count %d, msg size %ld, segsize %ld, max_requests %d", original_count, (unsigned long)(num_segments * segment_increment), (unsigned long)segment_increment, max_outstanding_reqs); + + rank = smpi_comm_rank(comm); + + /* non-leaf nodes - wait for children to send me data & forward up + (if needed) */ + if( tree->tree_nextsize > 0 ) { + ptrdiff_t true_extent, real_segment_size; + true_extent=smpi_datatype_get_extent( datatype); + + /* handle non existant recv buffer (i.e. its NULL) and + protect the recv buffer on non-root nodes */ + accumbuf = (char*)recvbuf; + if( (NULL == accumbuf) || (root != rank) ) { + /* Allocate temporary accumulator buffer. */ + accumbuf_free = (char*)malloc(true_extent + + (original_count - 1) * extent); + if (accumbuf_free == NULL) { + line = __LINE__; ret = -1; goto error_hndl; + } + accumbuf = accumbuf_free - lower_bound; + } + + /* If this is a non-commutative operation we must copy + sendbuf to the accumbuf, in order to simplfy the loops */ + if (!smpi_op_is_commute(op)) { + smpi_datatype_copy( + (char*)accumbuf, original_count, datatype, + (char*)sendtmpbuf, original_count, datatype); + } + /* Allocate two buffers for incoming segments */ + real_segment_size = true_extent + (count_by_segment - 1) * extent; + inbuf_free[0] = (char*) malloc(real_segment_size); + if( inbuf_free[0] == NULL ) { + line = __LINE__; ret = -1; goto error_hndl; + } + inbuf[0] = inbuf_free[0] - lower_bound; + /* if there is chance to overlap communication - + allocate second buffer */ + if( (num_segments > 1) || (tree->tree_nextsize > 1) ) { + inbuf_free[1] = (char*) malloc(real_segment_size); + if( inbuf_free[1] == NULL ) { + line = __LINE__; ret = -1; goto error_hndl; + } + inbuf[1] = inbuf_free[1] - lower_bound; + } + + /* reset input buffer index and receive count */ + inbi = 0; + recvcount = 0; + /* for each segment */ + for( segindex = 0; segindex <= num_segments; segindex++ ) { + prevcount = recvcount; + /* recvcount - number of elements in current segment */ + recvcount = count_by_segment; + if( segindex == (num_segments-1) ) + recvcount = original_count - count_by_segment * segindex; + + /* for each child */ + for( i = 0; i < tree->tree_nextsize; i++ ) { + /** + * We try to overlap communication: + * either with next segment or with the next child + */ + /* post irecv for current segindex on current child */ + if( segindex < num_segments ) { + void* local_recvbuf = inbuf[inbi]; + if( 0 == i ) { + /* for the first step (1st child per segment) and + * commutative operations we might be able to irecv + * directly into the accumulate buffer so that we can + * reduce(op) this with our sendbuf in one step as + * ompi_op_reduce only has two buffer pointers, + * this avoids an extra memory copy. + * + * BUT if the operation is non-commutative or + * we are root and are USING MPI_IN_PLACE this is wrong! + */ + if( (smpi_op_is_commute(op)) && + !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) { + local_recvbuf = accumbuf + segindex * segment_increment; + } + } + + reqs[inbi]=smpi_mpi_irecv(local_recvbuf, recvcount, datatype, + tree->tree_next[i], + MCA_COLL_BASE_TAG_REDUCE, comm + ); + } + /* wait for previous req to complete, if any. + if there are no requests reqs[inbi ^1] will be + MPI_REQUEST_NULL. */ + /* wait on data from last child for previous segment */ + smpi_mpi_waitall( 1, &reqs[inbi ^ 1], + MPI_STATUSES_IGNORE ); + local_op_buffer = inbuf[inbi ^ 1]; + if( i > 0 ) { + /* our first operation is to combine our own [sendbuf] data + * with the data we recvd from down stream (but only + * the operation is commutative and if we are not root and + * not using MPI_IN_PLACE) + */ + if( 1 == i ) { + if( (smpi_op_is_commute(op)) && + !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) { + local_op_buffer = sendtmpbuf + segindex * segment_increment; + } + } + /* apply operation */ + smpi_op_apply(op, local_op_buffer, + accumbuf + segindex * segment_increment, + &recvcount, &datatype ); + } else if ( segindex > 0 ) { + void* accumulator = accumbuf + (segindex-1) * segment_increment; + if( tree->tree_nextsize <= 1 ) { + if( (smpi_op_is_commute(op)) && + !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) { + local_op_buffer = sendtmpbuf + (segindex-1) * segment_increment; + } + } + smpi_op_apply(op, local_op_buffer, accumulator, &prevcount, + &datatype ); + + /* all reduced on available data this step (i) complete, + * pass to the next process unless you are the root. + */ + if (rank != tree->tree_root) { + /* send combined/accumulated data to parent */ + smpi_mpi_send( accumulator, prevcount, + datatype, tree->tree_prev, + MCA_COLL_BASE_TAG_REDUCE, + comm); + } + + /* we stop when segindex = number of segments + (i.e. we do num_segment+1 steps for pipelining */ + if (segindex == num_segments) break; + } + + /* update input buffer index */ + inbi = inbi ^ 1; + } /* end of for each child */ + } /* end of for each segment */ + + /* clean up */ + if( inbuf_free[0] != NULL) free(inbuf_free[0]); + if( inbuf_free[1] != NULL) free(inbuf_free[1]); + if( accumbuf_free != NULL ) free(accumbuf_free); + } + + /* leaf nodes + Depending on the value of max_outstanding_reqs and + the number of segments we have two options: + - send all segments using blocking send to the parent, or + - avoid overflooding the parent nodes by limiting the number of + outstanding requests to max_oustanding_reqs. + TODO/POSSIBLE IMPROVEMENT: If there is a way to determine the eager size + for the current communication, synchronization should be used only + when the message/segment size is smaller than the eager size. + */ + else { + + /* If the number of segments is less than a maximum number of oustanding + requests or there is no limit on the maximum number of outstanding + requests, we send data to the parent using blocking send */ + if ((0 == max_outstanding_reqs) || + (num_segments <= max_outstanding_reqs)) { + + segindex = 0; + while ( original_count > 0) { + if (original_count < count_by_segment) { + count_by_segment = original_count; + } + smpi_mpi_send((char*)sendbuf + + segindex * segment_increment, + count_by_segment, datatype, + tree->tree_prev, + MCA_COLL_BASE_TAG_REDUCE, + comm) ; + segindex++; + original_count -= count_by_segment; + } + } + + /* Otherwise, introduce flow control: + - post max_outstanding_reqs non-blocking synchronous send, + - for remaining segments + - wait for a ssend to complete, and post the next one. + - wait for all outstanding sends to complete. + */ + else { + + int creq = 0; + MPI_Request* sreq = NULL; + + sreq = (MPI_Request*) calloc( max_outstanding_reqs, + sizeof(MPI_Request ) ); + if (NULL == sreq) { line = __LINE__; ret = -1; goto error_hndl; } + + /* post first group of requests */ + for (segindex = 0; segindex < max_outstanding_reqs; segindex++) { + sreq[segindex]=smpi_mpi_isend((char*)sendbuf + + segindex * segment_increment, + count_by_segment, datatype, + tree->tree_prev, + MCA_COLL_BASE_TAG_REDUCE, + comm); + original_count -= count_by_segment; + } + + creq = 0; + while ( original_count > 0 ) { + /* wait on a posted request to complete */ + smpi_mpi_wait(&sreq[creq], MPI_STATUS_IGNORE); + sreq[creq] = MPI_REQUEST_NULL; + + if( original_count < count_by_segment ) { + count_by_segment = original_count; + } + sreq[creq]=smpi_mpi_isend((char*)sendbuf + + segindex * segment_increment, + count_by_segment, datatype, + tree->tree_prev, + MCA_COLL_BASE_TAG_REDUCE, + comm ); + creq = (creq + 1) % max_outstanding_reqs; + segindex++; + original_count -= count_by_segment; + } + + /* Wait on the remaining request to complete */ + smpi_mpi_waitall( max_outstanding_reqs, sreq, + MPI_STATUSES_IGNORE ); + + /* free requests */ + free(sreq); + } + } + return MPI_SUCCESS; + + error_hndl: /* error handler */ + XBT_DEBUG("ERROR_HNDL: node %d file %s line %d error %d\n", + rank, __FILE__, line, ret ); + if( inbuf_free[0] != NULL ) free(inbuf_free[0]); + if( inbuf_free[1] != NULL ) free(inbuf_free[1]); + if( accumbuf_free != NULL ) free(accumbuf); + return ret; +} + +/* Attention: this version of the reduce operations does not + work for: + - non-commutative operations + - segment sizes which are not multiplies of the extent of the datatype + meaning that at least one datatype must fit in the segment ! +*/ + +int smpi_coll_tuned_reduce_ompi_chain( void *sendbuf, void *recvbuf, int count, + MPI_Datatype datatype, + MPI_Op op, int root, + MPI_Comm comm + ) +{ + uint32_t segsize=64*1024; + int segcount = count; + size_t typelng; + int fanout = smpi_comm_size(comm)/2; + + XBT_DEBUG("coll:tuned:reduce_intra_chain rank %d fo %d ss %5d", smpi_comm_rank(comm), fanout, segsize); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + typelng = smpi_datatype_size( datatype); + + COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); + + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + op, root, comm, + ompi_coll_tuned_topo_build_chain(fanout, comm, root), + segcount, 0 ); +} + + +int smpi_coll_tuned_reduce_ompi_pipeline( void *sendbuf, void *recvbuf, + int count, MPI_Datatype datatype, + MPI_Op op, int root, + MPI_Comm comm ) +{ + + uint32_t segsize; + int segcount = count; + size_t typelng; +// COLL_TUNED_UPDATE_PIPELINE( comm, tuned_module, root ); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + const double a2 = 0.0410 / 1024.0; /* [1/B] */ + const double b2 = 9.7128; + const double a4 = 0.0033 / 1024.0; /* [1/B] */ + const double b4 = 1.6761; + typelng= smpi_datatype_size( datatype); + int communicator_size = smpi_comm_size(comm); + size_t message_size = typelng * count; + + if (communicator_size > (a2 * message_size + b2)) { + // Pipeline_1K + segsize = 1024; + }else if (communicator_size > (a4 * message_size + b4)) { + // Pipeline_32K + segsize = 32*1024; + } else { + // Pipeline_64K + segsize = 64*1024; + } + + XBT_DEBUG("coll:tuned:reduce_intra_pipeline rank %d ss %5d", + smpi_comm_rank(comm), segsize); + + COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); + + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + op, root, comm, + ompi_coll_tuned_topo_build_chain( 1, comm, root), + segcount, 0); +} + +int smpi_coll_tuned_reduce_ompi_binary( void *sendbuf, void *recvbuf, + int count, MPI_Datatype datatype, + MPI_Op op, int root, + MPI_Comm comm) +{ + uint32_t segsize; + int segcount = count; + size_t typelng; + + + + /** + * Determine number of segments and number of elements + * sent per operation + */ + typelng=smpi_datatype_size( datatype ); + + // Binary_32K + segsize = 32*1024; + + XBT_DEBUG("coll:tuned:reduce_intra_binary rank %d ss %5d", + smpi_comm_rank(comm), segsize); + + COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); + + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + op, root, comm, + ompi_coll_tuned_topo_build_tree(2, comm, root), + segcount, 0); +} + +int smpi_coll_tuned_reduce_ompi_binomial( void *sendbuf, void *recvbuf, + int count, MPI_Datatype datatype, + MPI_Op op, int root, + MPI_Comm comm) +{ + + uint32_t segsize=0; + int segcount = count; + size_t typelng; + + const double a1 = 0.6016 / 1024.0; /* [1/B] */ + const double b1 = 1.3496; + +// COLL_TUNED_UPDATE_IN_ORDER_BMTREE( comm, tuned_module, root ); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + typelng= smpi_datatype_size( datatype); + int communicator_size = smpi_comm_size(comm); + size_t message_size = typelng * count; + if (((communicator_size < 8) && (message_size < 20480)) || + (message_size < 2048) || (count <= 1)) { + /* Binomial_0K */ + segsize = 0; + } else if (communicator_size > (a1 * message_size + b1)) { + // Binomial_1K + segsize = 1024; + } + + XBT_DEBUG("coll:tuned:reduce_intra_binomial rank %d ss %5d", + smpi_comm_rank(comm), segsize); + COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); + + return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, + op, root, comm, + ompi_coll_tuned_topo_build_in_order_bmtree(comm, root), + segcount, 0); +} + +/* + * reduce_intra_in_order_binary + * + * Function: Logarithmic reduce operation for non-commutative operations. + * Acecpts: same as MPI_Reduce() + * Returns: MPI_SUCCESS or error code + */ +int smpi_coll_tuned_reduce_ompi_in_order_binary( void *sendbuf, void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, int root, + MPI_Comm comm) +{ + uint32_t segsize=0; + int ret; + int rank, size, io_root; + int segcount = count; + void *use_this_sendbuf = NULL, *use_this_recvbuf = NULL; + size_t typelng; + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + XBT_DEBUG("coll:tuned:reduce_intra_in_order_binary rank %d ss %5d", + rank, segsize); + + /** + * Determine number of segments and number of elements + * sent per operation + */ + typelng=smpi_datatype_size( datatype); + COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount ); + + /* An in-order binary tree must use root (size-1) to preserve the order of + operations. Thus, if root is not rank (size - 1), then we must handle + 1. MPI_IN_PLACE option on real root, and + 2. we must allocate temporary recvbuf on rank (size - 1). + Note that generic function must be careful not to switch order of + operations for non-commutative ops. + */ + io_root = size - 1; + use_this_sendbuf = sendbuf; + use_this_recvbuf = recvbuf; + if (io_root != root) { + ptrdiff_t text, ext; + char *tmpbuf = NULL; + + ext=smpi_datatype_get_extent(datatype); + text=smpi_datatype_get_extent(datatype); + + if ((root == rank) && (MPI_IN_PLACE == sendbuf)) { + tmpbuf = (char *) malloc(text + (count - 1) * ext); + if (NULL == tmpbuf) { + return MPI_ERR_INTERN; + } + smpi_datatype_copy ( + (char*)tmpbuf, count, datatype, + (char*)recvbuf, count, datatype); + use_this_sendbuf = tmpbuf; + } else if (io_root == rank) { + tmpbuf = (char *) malloc(text + (count - 1) * ext); + if (NULL == tmpbuf) { + return MPI_ERR_INTERN; + } + use_this_recvbuf = tmpbuf; + } + } + + /* Use generic reduce with in-order binary tree topology and io_root */ + ret = smpi_coll_tuned_ompi_reduce_generic( use_this_sendbuf, use_this_recvbuf, count, datatype, + op, io_root, comm, + ompi_coll_tuned_topo_build_in_order_bintree(comm), + segcount, 0 ); + if (MPI_SUCCESS != ret) { return ret; } + + /* Clean up */ + if (io_root != root) { + if (root == rank) { + /* Receive result from rank io_root to recvbuf */ + smpi_mpi_recv(recvbuf, count, datatype, io_root, + MCA_COLL_BASE_TAG_REDUCE, comm, + MPI_STATUS_IGNORE); + if (MPI_IN_PLACE == sendbuf) { + free(use_this_sendbuf); + } + + } else if (io_root == rank) { + /* Send result from use_this_recvbuf to root */ + smpi_mpi_send(use_this_recvbuf, count, datatype, root, + MCA_COLL_BASE_TAG_REDUCE, + comm); + free(use_this_recvbuf); + } + } + + return MPI_SUCCESS; +} + +/* + * Linear functions are copied from the BASIC coll module + * they do not segment the message and are simple implementations + * but for some small number of nodes and/or small data sizes they + * are just as fast as tuned/tree based segmenting operations + * and as such may be selected by the decision functions + * These are copied into this module due to the way we select modules + * in V1. i.e. in V2 we will handle this differently and so will not + * have to duplicate code. + * GEF Oct05 after asking Jeff. + */ + +/* copied function (with appropriate renaming) starts here */ + +/* + * reduce_lin_intra + * + * Function: - reduction using O(N) algorithm + * Accepts: - same as MPI_Reduce() + * Returns: - MPI_SUCCESS or error code + */ + +int +smpi_coll_tuned_reduce_ompi_basic_linear(void *sbuf, void *rbuf, int count, + MPI_Datatype dtype, + MPI_Op op, + int root, + MPI_Comm comm) +{ + int i, rank, size; + ptrdiff_t true_extent, lb, extent; + char *free_buffer = NULL; + char *pml_buffer = NULL; + char *inplace_temp = NULL; + char *inbuf; + + /* Initialize */ + + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + + XBT_DEBUG("coll:tuned:reduce_intra_basic_linear rank %d", rank); + + /* If not root, send data to the root. */ + + if (rank != root) { + smpi_mpi_send(sbuf, count, dtype, root, + MCA_COLL_BASE_TAG_REDUCE, + comm); + return -1; + } + + /* see discussion in ompi_coll_basic_reduce_lin_intra about + extent and true extent */ + /* for reducing buffer allocation lengths.... */ + + smpi_datatype_extent(dtype, &lb, &extent); + true_extent = smpi_datatype_get_extent(dtype); + + if (MPI_IN_PLACE == sbuf) { + sbuf = rbuf; + inplace_temp = (char*)malloc(true_extent + (count - 1) * extent); + if (NULL == inplace_temp) { + return -1; + } + rbuf = inplace_temp - lb; + } + + if (size > 1) { + free_buffer = (char*)malloc(true_extent + (count - 1) * extent); + pml_buffer = free_buffer - lb; + } + + /* Initialize the receive buffer. */ + + if (rank == (size - 1)) { + smpi_datatype_copy((char*)rbuf, count, dtype, + (char*)sbuf, count, dtype); + } else { + smpi_mpi_recv(rbuf, count, dtype, size - 1, + MCA_COLL_BASE_TAG_REDUCE, comm, + MPI_STATUS_IGNORE); + } + + /* Loop receiving and calling reduction function (C or Fortran). */ + + for (i = size - 2; i >= 0; --i) { + if (rank == i) { + inbuf = (char*)sbuf; + } else { + smpi_mpi_recv(pml_buffer, count, dtype, i, + MCA_COLL_BASE_TAG_REDUCE, comm, + MPI_STATUS_IGNORE); + inbuf = pml_buffer; + } + + /* Perform the reduction */ + smpi_op_apply(op, inbuf, rbuf, &count, &dtype); + } + + if (NULL != inplace_temp) { + smpi_datatype_copy((char*)sbuf, count, dtype, + inplace_temp,count , dtype); + free(inplace_temp); + } + if (NULL != free_buffer) { + free(free_buffer); + } + + /* All done */ + return MPI_SUCCESS; +} + +/* copied function (with appropriate renaming) ends here */ + + diff --git a/src/smpi/colls/smpi_openmpi_selector.c b/src/smpi/colls/smpi_openmpi_selector.c index 5d7d5afa15..2c4fc1011a 100644 --- a/src/smpi/colls/smpi_openmpi_selector.c +++ b/src/smpi/colls/smpi_openmpi_selector.c @@ -232,12 +232,12 @@ int smpi_coll_tuned_reduce_ompi( void *sendbuf, void *recvbuf, int communicator_size=0; //int segsize = 0; size_t message_size, dsize; - //const double a1 = 0.6016 / 1024.0; /* [1/B] */ - //const double b1 = 1.3496; - //const double a2 = 0.0410 / 1024.0; /* [1/B] */ - //const double b2 = 9.7128; - //const double a3 = 0.0422 / 1024.0; /* [1/B] */ - //const double b3 = 1.1614; + const double a1 = 0.6016 / 1024.0; /* [1/B] */ + const double b1 = 1.3496; + const double a2 = 0.0410 / 1024.0; /* [1/B] */ + const double b2 = 9.7128; + const double a3 = 0.0422 / 1024.0; /* [1/B] */ + const double b3 = 1.1614; //const double a4 = 0.0033 / 1024.0; /* [1/B] */ //const double b4 = 1.6761; @@ -253,47 +253,47 @@ int smpi_coll_tuned_reduce_ompi( void *sendbuf, void *recvbuf, * If the operation is non commutative we currently have choice of linear * or in-order binary tree algorithm. */ -/* if( !ompi_op_is_commute(op) ) { + if( !smpi_op_is_commute(op) ) { if ((communicator_size < 12) && (message_size < 2048)) { - return smpi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm, module); + return smpi_coll_tuned_reduce_ompi_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm/*, module*/); } - return smpi_coll_tuned_reduce_intra_in_order_binary (sendbuf, recvbuf, count, datatype, op, root, comm, module, - 0, max_requests); - }*/ + return smpi_coll_tuned_reduce_ompi_in_order_binary (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, + 0, max_requests*/); + } if ((communicator_size < 8) && (message_size < 512)){ /* Linear_0K */ - return smpi_coll_tuned_reduce_flat_tree (sendbuf, recvbuf, count, datatype, op, root, comm); + return smpi_coll_tuned_reduce_ompi_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm); } else if (((communicator_size < 8) && (message_size < 20480)) || (message_size < 2048) || (count <= 1)) { /* Binomial_0K */ //segsize = 0; - return smpi_coll_tuned_reduce_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module, + return smpi_coll_tuned_reduce_ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module, segsize, max_requests*/); - } /*else if (communicator_size > (a1 * message_size + b1)) { + } else if (communicator_size > (a1 * message_size + b1)) { // Binomial_1K - segsize = 1024; - return smpi_coll_tuned_reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, module, - segsize, max_requests); + //segsize = 1024; + return smpi_coll_tuned_reduce_ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module, + segsize, max_requests*/); } else if (communicator_size > (a2 * message_size + b2)) { // Pipeline_1K - segsize = 1024; - return smpi_coll_tuned_reduce_NTSL (sendbuf, recvbuf, count, datatype, op, root, comm, module, - segsize, max_requests); + //segsize = 1024; + return smpi_coll_tuned_reduce_ompi_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, + segsize, max_requests*/); } else if (communicator_size > (a3 * message_size + b3)) { // Binary_32K - segsize = 32*1024; - return smpi_coll_tuned_reduce_intra_binary( sendbuf, recvbuf, count, datatype, op, root, - comm, module, segsize, max_requests); + //segsize = 32*1024; + return smpi_coll_tuned_reduce_ompi_binary( sendbuf, recvbuf, count, datatype, op, root, + comm/*, module, segsize, max_requests*/); } - if (communicator_size > (a4 * message_size + b4)) { + /*if (communicator_size > (a4 * message_size + b4)) { // Pipeline_32K segsize = 32*1024; } else { // Pipeline_64K segsize = 64*1024; }*/ - return smpi_coll_tuned_reduce_NTSL (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, + return smpi_coll_tuned_reduce_ompi_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, segsize, max_requests*/); #if 0 -- 2.20.1