Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add new algos for reduce from ompi
authordegomme <degomme@debian.localdomain>
Mon, 10 Jun 2013 23:59:42 +0000 (01:59 +0200)
committerdegomme <degomme@debian.localdomain>
Mon, 10 Jun 2013 23:59:42 +0000 (01:59 +0200)
buildtools/Cmake/AddTests.cmake
buildtools/Cmake/DefinePackages.cmake
src/smpi/colls/bcast-ompi-pipeline.c
src/smpi/colls/bcast-ompi-split-bintree.c
src/smpi/colls/coll_tuned_topo.c [new file with mode: 0644]
src/smpi/colls/coll_tuned_topo.h [new file with mode: 0644]
src/smpi/colls/colls.h
src/smpi/colls/reduce-ompi.c [new file with mode: 0644]
src/smpi/colls/smpi_openmpi_selector.c

index 3f31945..139057c 100644 (file)
@@ -400,7 +400,7 @@ if(NOT enable_memcheck)
                        scatter_rdb_allgather SMP_binary SMP_binomial SMP_linear ompi ompi_split_bintree ompi_pipeline)
                ADD_TEST(smpi-bcast-coll-${BCAST_COLL} ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg smpi/bcast:${BCAST_COLL} --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/bcast_coll.tesh)
     ENDFOREACH()
-    FOREACH (REDUCE_COLL default arrival_pattern_aware binomial flat_tree NTSL scatter_gather ompi)
+    FOREACH (REDUCE_COLL default arrival_pattern_aware binomial flat_tree NTSL scatter_gather ompi ompi_chain ompi_binary ompi_basic_linear ompi_binomial ompi_in_order_binary)
         ADD_TEST(smpi-reduce-coll-${REDUCE_COLL} ${CMAKE_BINARY_DIR}/bin/tesh ${TESH_OPTION} --cfg smpi/reduce:${REDUCE_COLL} --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/reduce_coll.tesh)
     ENDFOREACH()
 
index 2aaa4d8..0f0b372 100644 (file)
@@ -183,6 +183,7 @@ set(SMPI_SRC
   src/smpi/colls/bcast-SMP-binary.c
   src/smpi/colls/bcast-SMP-binomial.c
   src/smpi/colls/bcast-SMP-linear.c
+  src/smpi/colls/coll_tuned_topo.c
   src/smpi/colls/bcast-ompi-split-bintree.c
   src/smpi/colls/bcast-ompi-pipeline.c
   src/smpi/colls/reduce-arrival-pattern-aware.c
@@ -190,6 +191,7 @@ set(SMPI_SRC
   src/smpi/colls/reduce-flat-tree.c
   src/smpi/colls/reduce-NTSL.c
   src/smpi/colls/reduce-scatter-gather.c
+  src/smpi/colls/reduce-ompi.c
   )
 
 if(SMPI_F2C)
index 2371d10..63c2067 100644 (file)
  #include "colls_private.h"
-
+ #include "coll_tuned_topo.h"
 
 #define MAXTREEFANOUT 32
 
-#define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT)        \
-    if( ((SEGSIZE) >= (TYPELNG)) &&                                     \
-        ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) {                      \
-        size_t residual;                                                \
-        (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG));                      \
-        residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG);                  \
-        if( residual > ((TYPELNG) >> 1) )                               \
-            (SEGCOUNT)++;                                               \
-    }                                                                   \
-
- typedef struct ompi_coll_tree_t {
-        int32_t tree_root;
-        int32_t tree_fanout;
-        int32_t tree_bmtree;
-        int32_t tree_prev;
-        int32_t tree_next[MAXTREEFANOUT];
-        int32_t tree_nextsize;
-    } ompi_coll_tree_t;
-
-    ompi_coll_tree_t*
-    ompi_coll_tuned_topo_build_chain( int fanout,
-                                     MPI_Comm com,
-                                     int root );
-
-ompi_coll_tree_t*
-ompi_coll_tuned_topo_build_chain( int fanout,
-                                  MPI_Comm comm,
-                                  int root )
-{
-    int rank, size;
-    int srank; /* shifted rank */
-    int i,maxchainlen;
-    int mark,head,len;
-    ompi_coll_tree_t *chain;
-
-    XBT_DEBUG("coll:tuned:topo:build_chain fo %d rt %d", fanout, root);
-
-    /* 
-     * Get size and rank of the process in this communicator 
-     */
-    size = smpi_comm_size(comm);
-    rank = smpi_comm_rank(comm);
-
-    if( fanout < 1 ) {
-        XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout of ZERO, forcing to 1 (pipeline)!");
-        fanout = 1;
-    }
-    if (fanout>MAXTREEFANOUT) {
-        XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout %d bigger than max %d, forcing to max!", fanout, MAXTREEFANOUT);
-        fanout = MAXTREEFANOUT;
-    }
-
-    /*
-     * Allocate space for topology arrays if needed 
-     */
-    chain = (ompi_coll_tree_t*)malloc( sizeof(ompi_coll_tree_t) );
-    if (!chain) {
-        XBT_DEBUG("coll:tuned:topo:build_chain PANIC out of memory");
-        fflush(stdout);
-        return NULL;
-    }
-    chain->tree_root     = MPI_UNDEFINED;
-    chain->tree_nextsize = -1;
-    for(i=0;i<fanout;i++) chain->tree_next[i] = -1;
-
-    /* 
-     * Set root & numchain
-     */
-    chain->tree_root = root;
-    if( (size - 1) < fanout ) { 
-        chain->tree_nextsize = size-1;
-        fanout = size-1;
-    } else {
-        chain->tree_nextsize = fanout;
-    }
-    
-    /*
-     * Shift ranks
-     */
-    srank = rank - root;
-    if (srank < 0) srank += size;
-
-    /*
-     * Special case - fanout == 1
-     */
-    if( fanout == 1 ) {
-        if( srank == 0 ) chain->tree_prev = -1;
-        else chain->tree_prev = (srank-1+root)%size;
-
-        if( (srank + 1) >= size) {
-            chain->tree_next[0] = -1;
-            chain->tree_nextsize = 0;
-        } else {
-            chain->tree_next[0] = (srank+1+root)%size;
-            chain->tree_nextsize = 1;
-        }
-        return chain;
-    }
-
-    /* Let's handle the case where there is just one node in the communicator */
-    if( size == 1 ) {
-        chain->tree_next[0] = -1;
-        chain->tree_nextsize = 0;
-        chain->tree_prev = -1;
-        return chain;
-    }
-    /*
-     * Calculate maximum chain length
-     */
-    maxchainlen = (size-1) / fanout;
-    if( (size-1) % fanout != 0 ) {
-        maxchainlen++;
-        mark = (size-1)%fanout;
-    } else {
-        mark = fanout+1;
-    }
-
-    /*
-     * Find your own place in the list of shifted ranks
-     */
-    if( srank != 0 ) {
-        int column;
-        if( srank-1 < (mark * maxchainlen) ) {
-            column = (srank-1)/maxchainlen;
-            head = 1+column*maxchainlen;
-            len = maxchainlen;
-        } else {
-            column = mark + (srank-1-mark*maxchainlen)/(maxchainlen-1);
-            head = mark*maxchainlen+1+(column-mark)*(maxchainlen-1);
-            len = maxchainlen-1;
-        }
-
-        if( srank == head ) {
-            chain->tree_prev = 0; /*root*/
-        } else {
-            chain->tree_prev = srank-1; /* rank -1 */
-        }
-        if( srank == (head + len - 1) ) {
-            chain->tree_next[0] = -1;
-            chain->tree_nextsize = 0;
-        } else {
-            if( (srank + 1) < size ) {
-                chain->tree_next[0] = srank+1;
-                chain->tree_nextsize = 1;
-            } else {
-                chain->tree_next[0] = -1;
-                chain->tree_nextsize = 0;    
-            }
-        }
-    }
-    
-    /*
-     * Unshift values 
-     */
-    if( rank == root ) {
-        chain->tree_prev = -1;
-        chain->tree_next[0] = (root+1)%size;
-        for( i = 1; i < fanout; i++ ) {
-            chain->tree_next[i] = chain->tree_next[i-1] + maxchainlen;
-            if( i > mark ) {
-                chain->tree_next[i]--;
-            }
-            chain->tree_next[i] %= size;
-        }
-        chain->tree_nextsize = fanout;
-    } else {
-        chain->tree_prev = (chain->tree_prev+root)%size;
-        if( chain->tree_next[0] != -1 ) {
-            chain->tree_next[0] = (chain->tree_next[0]+root)%size;
-        }
-    }
-
-    return chain;
-}
 
-smpi_coll_tuned_bcast_ompi_pipeline( void* buffer,
+int smpi_coll_tuned_bcast_ompi_pipeline( void* buffer,
                                       int original_count, 
                                       MPI_Datatype datatype, 
                                       int root,
@@ -186,14 +12,14 @@ smpi_coll_tuned_bcast_ompi_pipeline( void* buffer,
 {
     int count_by_segment = original_count;
     size_t type_size;
-    int segsize;
+    int segsize =1024  << 7;
     //mca_coll_tuned_module_t *tuned_module = (mca_coll_tuned_module_t*) module;
     //mca_coll_tuned_comm_t *data = tuned_module->tuned_data;
     
 //    return ompi_coll_tuned_bcast_intra_generic( buffer, count, datatype, root, comm, module,
 //                                                count_by_segment, data->cached_pipeline );
     ompi_coll_tree_t * tree = ompi_coll_tuned_topo_build_chain( 1, comm, root );
-    int err = 0, line, i;
+    int i;
     int rank, size;
     int segindex;
     int num_segments; /* Number of segments */
index a68ebd2..f1201d1 100644 (file)
  */
  
   #include "colls_private.h"
+  #include "coll_tuned_topo.h"
   #define MAXTREEFANOUT 32
- typedef struct ompi_coll_tree_t {
-        int32_t tree_root;
-        int32_t tree_fanout;
-        int32_t tree_bmtree;
-        int32_t tree_prev;
-        int32_t tree_next[MAXTREEFANOUT];
-        int32_t tree_nextsize;
-    } ompi_coll_tree_t;
-
-    ompi_coll_tree_t*
-    ompi_coll_tuned_topo_build_tree( int fanout,
-                                     MPI_Comm com,
-                                     int root );
-                                     
-
-/*
- * Some static helpers.
- */
-static int pown( int fanout, int num )
-{
-    int j, p = 1;
-    if( num < 0 ) return 0;
-    if (1==num) return fanout;
-    if (2==fanout) {
-        return p<<num;
-    }
-    else {
-        for( j = 0; j < num; j++ ) { p*= fanout; }
-    }
-    return p;
-}
-
-static int calculate_level( int fanout, int rank )
-{
-    int level, num;
-    if( rank < 0 ) return -1;
-    for( level = 0, num = 0; num <= rank; level++ ) {
-        num += pown(fanout, level);
-    }
-    return level-1;
-}
-
-static int calculate_num_nodes_up_to_level( int fanout, int level )
-{
-    /* just use geometric progression formula for sum:
-       a^0+a^1+...a^(n-1) = (a^n-1)/(a-1) */
-    return ((pown(fanout,level) - 1)/(fanout - 1));
-}
-
-/*
- * And now the building functions.
- *
- * An example for fanout = 2, comm_size = 7
- *
- *              0           <-- delta = 1 (fanout^0)
- *            /   \
- *           1     2        <-- delta = 2 (fanout^1)
- *          / \   / \
- *         3   5 4   6      <-- delta = 4 (fanout^2)
- */
-
-ompi_coll_tree_t*
-ompi_coll_tuned_topo_build_tree( int fanout,
-                                 MPI_Comm comm,
-                                 int root )
-{
-    int rank, size;
-    int schild, sparent;
-    int level; /* location of my rank in the tree structure of size */
-    int delta; /* number of nodes on my level */
-    int slimit; /* total number of nodes on levels above me */ 
-    int shiftedrank;
-    int i;
-    ompi_coll_tree_t* tree;
-
-    XBT_DEBUG( "coll:tuned:topo_build_tree Building fo %d rt %d", fanout, root);
-
-    if (fanout<1) {
-        XBT_DEBUG( "coll:tuned:topo_build_tree invalid fanout %d", fanout);
-        return NULL;
-    }
-    if (fanout>MAXTREEFANOUT) {
-        XBT_DEBUG("coll:tuned:topo_build_tree invalid fanout %d bigger than max %d", fanout, MAXTREEFANOUT);
-        return NULL;
-    }
-
-    /* 
-     * Get size and rank of the process in this communicator 
-     */
-    size = smpi_comm_size(comm);
-    rank = smpi_comm_rank(comm);
-
-    tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t));
-    if (!tree) {
-        XBT_DEBUG("coll:tuned:topo_build_tree PANIC::out of memory");
-        return NULL;
-    }
-
-    tree->tree_root     = MPI_UNDEFINED;
-    tree->tree_nextsize = MPI_UNDEFINED;
-
-    /*
-     * Set root
-     */
-    tree->tree_root = root;
-  
-    /* 
-     * Initialize tree
-     */
-    tree->tree_fanout   = fanout;
-    tree->tree_bmtree   = 0;
-    tree->tree_root     = root;
-    tree->tree_prev     = -1;
-    tree->tree_nextsize = 0;
-    for( i = 0; i < fanout; i++ ) {
-        tree->tree_next[i] = -1;
-    }
-
-    /* return if we have less than 2 processes */
-    if( size < 2 ) {
-        return tree;
-    }
-  
-    /*
-     * Shift all ranks by root, so that the algorithm can be 
-     * designed as if root would be always 0
-     * shiftedrank should be used in calculating distances 
-     * and position in tree
-     */
-    shiftedrank = rank - root;
-    if( shiftedrank < 0 ) {
-        shiftedrank += size;
-    }
-
-    /* calculate my level */
-    level = calculate_level( fanout, shiftedrank );
-    delta = pown( fanout, level );
-
-    /* find my children */
-    for( i = 0; i < fanout; i++ ) {
-        schild = shiftedrank + delta * (i+1);
-        if( schild < size ) {
-            tree->tree_next[i] = (schild+root)%size;
-            tree->tree_nextsize = tree->tree_nextsize + 1;
-        } else {
-            break;
-        }
-    }
-    
-    /* find my parent */
-    slimit = calculate_num_nodes_up_to_level( fanout, level );
-    sparent = shiftedrank;
-    if( sparent < fanout ) {
-        sparent = 0;
-    } else {
-        while( sparent >= slimit ) {
-            sparent -= delta/fanout;
-        }
-    }
-    tree->tree_prev = (sparent+root)%size;
-  
-    return tree;
-}
-
  
 int
 smpi_coll_tuned_bcast_ompi_split_bintree ( void* buffer,
diff --git a/src/smpi/colls/coll_tuned_topo.c b/src/smpi/colls/coll_tuned_topo.c
new file mode 100644 (file)
index 0000000..ce7cec7
--- /dev/null
@@ -0,0 +1,629 @@
+/*
+ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
+ *                         University Research and Technology
+ *                         Corporation.  All rights reserved.
+ * Copyright (c) 2004-2005 The University of Tennessee and The University
+ *                         of Tennessee Research Foundation.  All rights
+ *                         reserved.
+ * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
+ *                         University of Stuttgart.  All rights reserved.
+ * Copyright (c) 2004-2005 The Regents of the University of California.
+ *                         All rights reserved.
+ * $COPYRIGHT$
+ * 
+ * Additional copyrights may follow
+ * 
+ * $HEADER$
+ */
+
+#include "colls_private.h"
+#include "coll_tuned_topo.h"
+/*
+ * Some static helpers.
+ */
+static int pown( int fanout, int num )
+{
+    int j, p = 1;
+    if( num < 0 ) return 0;
+    if (1==num) return fanout;
+    if (2==fanout) {
+        return p<<num;
+    }
+    else {
+        for( j = 0; j < num; j++ ) { p*= fanout; }
+    }
+    return p;
+}
+
+static int calculate_level( int fanout, int rank )
+{
+    int level, num;
+    if( rank < 0 ) return -1;
+    for( level = 0, num = 0; num <= rank; level++ ) {
+        num += pown(fanout, level);
+    }
+    return level-1;
+}
+
+static int calculate_num_nodes_up_to_level( int fanout, int level )
+{
+    /* just use geometric progression formula for sum:
+       a^0+a^1+...a^(n-1) = (a^n-1)/(a-1) */
+    return ((pown(fanout,level) - 1)/(fanout - 1));
+}
+
+/*
+ * And now the building functions.
+ *
+ * An example for fanout = 2, comm_size = 7
+ *
+ *              0           <-- delta = 1 (fanout^0)
+ *            /   \
+ *           1     2        <-- delta = 2 (fanout^1)
+ *          / \   / \
+ *         3   5 4   6      <-- delta = 4 (fanout^2)
+ */
+
+ompi_coll_tree_t*
+ompi_coll_tuned_topo_build_tree( int fanout,
+                                 MPI_Comm comm,
+                                 int root )
+{
+    int rank, size;
+    int schild, sparent;
+    int level; /* location of my rank in the tree structure of size */
+    int delta; /* number of nodes on my level */
+    int slimit; /* total number of nodes on levels above me */ 
+    int shiftedrank;
+    int i;
+    ompi_coll_tree_t* tree;
+
+    XBT_DEBUG("coll:tuned:topo_build_tree Building fo %d rt %d", fanout, root);
+
+    if (fanout<1) {
+        XBT_DEBUG("coll:tuned:topo_build_tree invalid fanout %d", fanout);
+        return NULL;
+    }
+    if (fanout>MAXTREEFANOUT) {
+        XBT_DEBUG("coll:tuned:topo_build_tree invalid fanout %d bigger than max %d", fanout, MAXTREEFANOUT);
+        return NULL;
+    }
+
+    /* 
+     * Get size and rank of the process in this communicator 
+     */
+    size = smpi_comm_size(comm);
+    rank = smpi_comm_rank(comm);
+
+    tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t));
+    if (!tree) {
+        XBT_DEBUG("coll:tuned:topo_build_tree PANIC::out of memory");
+        return NULL;
+    }
+
+    tree->tree_root     = MPI_UNDEFINED;
+    tree->tree_nextsize = MPI_UNDEFINED;
+
+    /*
+     * Set root
+     */
+    tree->tree_root = root;
+  
+    /* 
+     * Initialize tree
+     */
+    tree->tree_fanout   = fanout;
+    tree->tree_bmtree   = 0;
+    tree->tree_root     = root;
+    tree->tree_prev     = -1;
+    tree->tree_nextsize = 0;
+    for( i = 0; i < fanout; i++ ) {
+        tree->tree_next[i] = -1;
+    }
+
+    /* return if we have less than 2 processes */
+    if( size < 2 ) {
+        return tree;
+    }
+  
+    /*
+     * Shift all ranks by root, so that the algorithm can be 
+     * designed as if root would be always 0
+     * shiftedrank should be used in calculating distances 
+     * and position in tree
+     */
+    shiftedrank = rank - root;
+    if( shiftedrank < 0 ) {
+        shiftedrank += size;
+    }
+
+    /* calculate my level */
+    level = calculate_level( fanout, shiftedrank );
+    delta = pown( fanout, level );
+
+    /* find my children */
+    for( i = 0; i < fanout; i++ ) {
+        schild = shiftedrank + delta * (i+1);
+        if( schild < size ) {
+            tree->tree_next[i] = (schild+root)%size;
+            tree->tree_nextsize = tree->tree_nextsize + 1;
+        } else {
+            break;
+        }
+    }
+    
+    /* find my parent */
+    slimit = calculate_num_nodes_up_to_level( fanout, level );
+    sparent = shiftedrank;
+    if( sparent < fanout ) {
+        sparent = 0;
+    } else {
+        while( sparent >= slimit ) {
+            sparent -= delta/fanout;
+        }
+    }
+    tree->tree_prev = (sparent+root)%size;
+  
+    return tree;
+}
+
+/*
+ * Constructs in-order binary tree which can be used for non-commutative reduce 
+ * operations.
+ * Root of this tree is always rank (size-1) and fanout is 2.
+ * Here are some of the examples of this tree:
+ * size == 2     size == 3     size == 4                size == 9
+ *      1             2             3                        8
+ *     /             / \          /   \                    /   \
+ *    0             1  0         2     1                  7     3
+ *                                    /                 /  \   / \
+ *                                   0                 6    5 2   1
+ *                                                         /     /
+ *                                                        4     0
+ */
+ompi_coll_tree_t*
+ompi_coll_tuned_topo_build_in_order_bintree( MPI_Comm comm )
+{
+    int rank, size;
+    int myrank, rightsize, delta;
+    int parent, lchild, rchild;
+    ompi_coll_tree_t* tree;
+
+    /* 
+     * Get size and rank of the process in this communicator 
+     */
+    size = smpi_comm_size(comm);
+    rank = smpi_comm_rank(comm);
+
+    tree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t));
+    if (!tree) {
+        XBT_DEBUG(
+                     "coll:tuned:topo_build_tree PANIC::out of memory");
+        return NULL;
+    }
+
+    tree->tree_root     = MPI_UNDEFINED;
+    tree->tree_nextsize = MPI_UNDEFINED;
+
+    /* 
+     * Initialize tree
+     */
+    tree->tree_fanout   = 2;
+    tree->tree_bmtree   = 0;
+    tree->tree_root     = size - 1;
+    tree->tree_prev     = -1;
+    tree->tree_nextsize = 0;
+    tree->tree_next[0]  = -1;
+    tree->tree_next[1]  = -1;
+    XBT_DEBUG(
+                 "coll:tuned:topo_build_in_order_tree Building fo %d rt %d", 
+                 tree->tree_fanout, tree->tree_root);
+
+    /* 
+     * Build the tree
+     */
+    myrank = rank;
+    parent = size - 1;
+    delta = 0;
+
+    while ( 1 ) {
+        /* Compute the size of the right subtree */
+        rightsize = size >> 1;
+
+        /* Determine the left and right child of this parent  */
+        lchild = -1;
+        rchild = -1;
+        if (size - 1 > 0) {
+            lchild = parent - 1;
+            if (lchild > 0) { 
+                rchild = rightsize - 1;
+            }
+        }
+       
+        /* The following cases are possible: myrank can be 
+           - a parent,
+           - belong to the left subtree, or
+           - belong to the right subtee
+           Each of the cases need to be handled differently.
+        */
+          
+        if (myrank == parent) {
+            /* I am the parent:
+               - compute real ranks of my children, and exit the loop. */
+            if (lchild >= 0) tree->tree_next[0] = lchild + delta;
+            if (rchild >= 0) tree->tree_next[1] = rchild + delta;
+            break;
+        }
+        if (myrank > rchild) {
+            /* I belong to the left subtree:
+               - If I am the left child, compute real rank of my parent
+               - Iterate down through tree: 
+               compute new size, shift ranks down, and update delta.
+            */
+            if (myrank == lchild) {
+                tree->tree_prev = parent + delta;
+            }
+            size = size - rightsize - 1;
+            delta = delta + rightsize;
+            myrank = myrank - rightsize;
+            parent = size - 1;
+
+        } else {
+            /* I belong to the right subtree:
+               - If I am the right child, compute real rank of my parent
+               - Iterate down through tree:  
+               compute new size and parent, 
+               but the delta and rank do not need to change.
+            */
+            if (myrank == rchild) {
+                tree->tree_prev = parent + delta;
+            }
+            size = rightsize;
+            parent = rchild;
+        }
+    }
+    
+    if (tree->tree_next[0] >= 0) { tree->tree_nextsize = 1; }
+    if (tree->tree_next[1] >= 0) { tree->tree_nextsize += 1; }
+
+    return tree;
+}
+
+int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree )
+{
+    ompi_coll_tree_t *ptr;
+
+    if ((!tree)||(!*tree)) {
+        return MPI_SUCCESS;
+    }
+
+    ptr = *tree;
+
+    free (ptr);
+    *tree = NULL;   /* mark tree as gone */
+
+    return MPI_SUCCESS;
+}
+
+/*
+ * 
+ * Here are some of the examples of this tree:
+ * size == 2                   size = 4                 size = 8
+ *      0                           0                        0
+ *     /                            | \                    / | \
+ *    1                             2  1                  4  2  1
+ *                                     |                     |  |\
+ *                                     3                     6  5 3
+ *                                                                |
+ *                                                                7
+ */
+ompi_coll_tree_t*
+ompi_coll_tuned_topo_build_bmtree( MPI_Comm comm,
+                                   int root )
+{
+    int childs = 0;
+    int rank;
+    int size;
+    int mask = 1;
+    int index;
+    int remote;
+    ompi_coll_tree_t *bmtree;
+    int i;
+
+    XBT_DEBUG("coll:tuned:topo:build_bmtree rt %d", root);
+
+    /* 
+     * Get size and rank of the process in this communicator 
+     */
+    size = smpi_comm_size(comm);
+    rank = smpi_comm_rank(comm);
+
+    index = rank -root;
+
+    bmtree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t));
+    if (!bmtree) {
+        XBT_DEBUG("coll:tuned:topo:build_bmtree PANIC out of memory");
+        return NULL;
+    }
+
+    bmtree->tree_bmtree   = 1;
+
+    bmtree->tree_root     = MPI_UNDEFINED;
+    bmtree->tree_nextsize = MPI_UNDEFINED;
+    for(i=0;i<MAXTREEFANOUT;i++) {
+        bmtree->tree_next[i] = -1;
+    }
+
+    if( index < 0 ) index += size;
+
+    while( mask <= index ) mask <<= 1;
+
+    /* Now I can compute my father rank */
+    if( root == rank ) {
+        bmtree->tree_prev = root;
+    } else {
+        remote = (index ^ (mask >> 1)) + root;
+        if( remote >= size ) remote -= size;
+        bmtree->tree_prev = remote;
+    }
+    /* And now let's fill my childs */
+    while( mask < size ) {
+        remote = (index ^ mask);
+        if( remote >= size ) break;
+        remote += root;
+        if( remote >= size ) remote -= size;
+        if (childs==MAXTREEFANOUT) {
+            XBT_DEBUG("coll:tuned:topo:build_bmtree max fanout incorrect %d needed %d", MAXTREEFANOUT, childs);
+            return NULL;
+        }
+        bmtree->tree_next[childs] = remote;
+        mask <<= 1;
+        childs++;
+    }
+    bmtree->tree_nextsize = childs;
+    bmtree->tree_root     = root;
+    return bmtree;
+}
+
+/*
+ * Constructs in-order binomial tree which can be used for gather/scatter
+ * operations.
+ * 
+ * Here are some of the examples of this tree:
+ * size == 2                   size = 4                 size = 8
+ *      0                           0                        0
+ *     /                          / |                      / | \
+ *    1                          1  2                     1  2  4
+ *                                  |                        |  | \
+ *                                  3                        3  5  6
+ *                                                                 |
+ *                                                                 7
+ */
+ompi_coll_tree_t*
+ompi_coll_tuned_topo_build_in_order_bmtree( MPI_Comm comm,
+                                           int root )
+{
+    int childs = 0;
+    int rank, vrank;
+    int size;
+    int mask = 1;
+    int remote;
+    ompi_coll_tree_t *bmtree;
+    int i;
+
+    XBT_DEBUG("coll:tuned:topo:build_in_order_bmtree rt %d", root);
+
+    /* 
+     * Get size and rank of the process in this communicator 
+     */
+    size = smpi_comm_size(comm);
+    rank = smpi_comm_rank(comm);
+
+    vrank = (rank - root + size) % size;
+
+    bmtree = (ompi_coll_tree_t*)malloc(sizeof(ompi_coll_tree_t));
+    if (!bmtree) {
+        XBT_DEBUG("coll:tuned:topo:build_bmtree PANIC out of memory");
+        return NULL;
+    }
+
+    bmtree->tree_bmtree   = 1;
+    bmtree->tree_root     = MPI_UNDEFINED;
+    bmtree->tree_nextsize = MPI_UNDEFINED;
+    for(i=0;i<MAXTREEFANOUT;i++) {
+        bmtree->tree_next[i] = -1;
+    }
+
+    if (root == rank) {
+       bmtree->tree_prev = root;
+    }
+
+    while (mask < size) {
+       remote = vrank ^ mask;
+       if (remote < vrank) {
+           bmtree->tree_prev = (remote + root) % size;
+           break;
+       } else if (remote < size) {
+           bmtree->tree_next[childs] = (remote + root) % size;
+           childs++;
+           if (childs==MAXTREEFANOUT) {
+               XBT_DEBUG(
+                            "coll:tuned:topo:build_bmtree max fanout incorrect %d needed %d",
+                            MAXTREEFANOUT, childs);
+               return NULL;
+           }
+       }
+       mask <<= 1;
+    }
+    bmtree->tree_nextsize = childs;
+    bmtree->tree_root     = root;
+
+    return bmtree;
+}
+
+
+ompi_coll_tree_t*
+ompi_coll_tuned_topo_build_chain( int fanout,
+                                  MPI_Comm comm,
+                                  int root )
+{
+    int rank, size;
+    int srank; /* shifted rank */
+    int i,maxchainlen;
+    int mark,head,len;
+    ompi_coll_tree_t *chain;
+
+    XBT_DEBUG("coll:tuned:topo:build_chain fo %d rt %d", fanout, root);
+
+    /* 
+     * Get size and rank of the process in this communicator 
+     */
+    size = smpi_comm_size(comm);
+    rank = smpi_comm_rank(comm);
+
+    if( fanout < 1 ) {
+        XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout of ZERO, forcing to 1 (pipeline)!");
+        fanout = 1;
+    }
+    if (fanout>MAXTREEFANOUT) {
+        XBT_DEBUG("coll:tuned:topo:build_chain WARNING invalid fanout %d bigger than max %d, forcing to max!", fanout, MAXTREEFANOUT);
+        fanout = MAXTREEFANOUT;
+    }
+
+    /*
+     * Allocate space for topology arrays if needed 
+     */
+    chain = (ompi_coll_tree_t*)malloc( sizeof(ompi_coll_tree_t) );
+    if (!chain) {
+        XBT_DEBUG("coll:tuned:topo:build_chain PANIC out of memory");
+        fflush(stdout);
+        return NULL;
+    }
+    chain->tree_root     = MPI_UNDEFINED;
+    chain->tree_nextsize = -1;
+    for(i=0;i<fanout;i++) chain->tree_next[i] = -1;
+
+    /* 
+     * Set root & numchain
+     */
+    chain->tree_root = root;
+    if( (size - 1) < fanout ) { 
+        chain->tree_nextsize = size-1;
+        fanout = size-1;
+    } else {
+        chain->tree_nextsize = fanout;
+    }
+    
+    /*
+     * Shift ranks
+     */
+    srank = rank - root;
+    if (srank < 0) srank += size;
+
+    /*
+     * Special case - fanout == 1
+     */
+    if( fanout == 1 ) {
+        if( srank == 0 ) chain->tree_prev = -1;
+        else chain->tree_prev = (srank-1+root)%size;
+
+        if( (srank + 1) >= size) {
+            chain->tree_next[0] = -1;
+            chain->tree_nextsize = 0;
+        } else {
+            chain->tree_next[0] = (srank+1+root)%size;
+            chain->tree_nextsize = 1;
+        }
+        return chain;
+    }
+
+    /* Let's handle the case where there is just one node in the communicator */
+    if( size == 1 ) {
+        chain->tree_next[0] = -1;
+        chain->tree_nextsize = 0;
+        chain->tree_prev = -1;
+        return chain;
+    }
+    /*
+     * Calculate maximum chain length
+     */
+    maxchainlen = (size-1) / fanout;
+    if( (size-1) % fanout != 0 ) {
+        maxchainlen++;
+        mark = (size-1)%fanout;
+    } else {
+        mark = fanout+1;
+    }
+
+    /*
+     * Find your own place in the list of shifted ranks
+     */
+    if( srank != 0 ) {
+        int column;
+        if( srank-1 < (mark * maxchainlen) ) {
+            column = (srank-1)/maxchainlen;
+            head = 1+column*maxchainlen;
+            len = maxchainlen;
+        } else {
+            column = mark + (srank-1-mark*maxchainlen)/(maxchainlen-1);
+            head = mark*maxchainlen+1+(column-mark)*(maxchainlen-1);
+            len = maxchainlen-1;
+        }
+
+        if( srank == head ) {
+            chain->tree_prev = 0; /*root*/
+        } else {
+            chain->tree_prev = srank-1; /* rank -1 */
+        }
+        if( srank == (head + len - 1) ) {
+            chain->tree_next[0] = -1;
+            chain->tree_nextsize = 0;
+        } else {
+            if( (srank + 1) < size ) {
+                chain->tree_next[0] = srank+1;
+                chain->tree_nextsize = 1;
+            } else {
+                chain->tree_next[0] = -1;
+                chain->tree_nextsize = 0;    
+            }
+        }
+    }
+    
+    /*
+     * Unshift values 
+     */
+    if( rank == root ) {
+        chain->tree_prev = -1;
+        chain->tree_next[0] = (root+1)%size;
+        for( i = 1; i < fanout; i++ ) {
+            chain->tree_next[i] = chain->tree_next[i-1] + maxchainlen;
+            if( i > mark ) {
+                chain->tree_next[i]--;
+            }
+            chain->tree_next[i] %= size;
+        }
+        chain->tree_nextsize = fanout;
+    } else {
+        chain->tree_prev = (chain->tree_prev+root)%size;
+        if( chain->tree_next[0] != -1 ) {
+            chain->tree_next[0] = (chain->tree_next[0]+root)%size;
+        }
+    }
+
+    return chain;
+}
+
+int ompi_coll_tuned_topo_dump_tree (ompi_coll_tree_t* tree, int rank)
+{
+    int i;
+
+    XBT_DEBUG("coll:tuned:topo:topo_dump_tree %1d tree root %d"
+                 " fanout %d BM %1d nextsize %d prev %d",
+                 rank, tree->tree_root, tree->tree_bmtree, tree->tree_fanout,
+                 tree->tree_nextsize, tree->tree_prev);
+    if( tree->tree_nextsize ) {
+        for( i = 0; i < tree->tree_nextsize; i++ )
+            XBT_DEBUG("[%1d] %d", i, tree->tree_next[i]);
+    }
+    return (0);
+}
diff --git a/src/smpi/colls/coll_tuned_topo.h b/src/smpi/colls/coll_tuned_topo.h
new file mode 100644 (file)
index 0000000..73b0361
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
+ *                         University Research and Technology
+ *                         Corporation.  All rights reserved.
+ * Copyright (c) 2004-2005 The University of Tennessee and The University
+ *                         of Tennessee Research Foundation.  All rights
+ *                         reserved.
+ * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
+ *                         University of Stuttgart.  All rights reserved.
+ * Copyright (c) 2004-2005 The Regents of the University of California.
+ *                         All rights reserved.
+ * $COPYRIGHT$
+ * 
+ * Additional copyrights may follow
+ * 
+ * $HEADER$
+ */
+
+#ifndef MCA_COLL_TUNED_TOPO_H_HAS_BEEN_INCLUDED
+#define MCA_COLL_TUNED_TOPO_H_HAS_BEEN_INCLUDED
+
+#include "colls_private.h"
+
+#define MAXTREEFANOUT 32
+
+#define COLL_TUNED_COMPUTED_SEGCOUNT(SEGSIZE, TYPELNG, SEGCOUNT)        \
+    if( ((SEGSIZE) >= (TYPELNG)) &&                                     \
+        ((SEGSIZE) < ((TYPELNG) * (SEGCOUNT))) ) {                      \
+        size_t residual;                                                \
+        (SEGCOUNT) = (int)((SEGSIZE) / (TYPELNG));                      \
+        residual = (SEGSIZE) - (SEGCOUNT) * (TYPELNG);                  \
+        if( residual > ((TYPELNG) >> 1) )                               \
+            (SEGCOUNT)++;                                               \
+    }                                                                   \
+
+
+    typedef struct ompi_coll_tree_t {
+        int32_t tree_root;
+        int32_t tree_fanout;
+        int32_t tree_bmtree;
+        int32_t tree_prev;
+        int32_t tree_next[MAXTREEFANOUT];
+        int32_t tree_nextsize;
+    } ompi_coll_tree_t;
+
+    ompi_coll_tree_t*
+    ompi_coll_tuned_topo_build_tree( int fanout,
+                                     MPI_Comm com,
+                                     int root );
+    ompi_coll_tree_t*
+    ompi_coll_tuned_topo_build_in_order_bintree( MPI_Comm comm );
+
+    ompi_coll_tree_t*
+    ompi_coll_tuned_topo_build_bmtree( MPI_Comm comm,
+                                       int root );
+    ompi_coll_tree_t*
+    ompi_coll_tuned_topo_build_in_order_bmtree( MPI_Comm comm,
+                                               int root );
+    ompi_coll_tree_t*
+    ompi_coll_tuned_topo_build_chain( int fanout,
+                                      MPI_Comm com,
+                                      int root );
+
+    int ompi_coll_tuned_topo_destroy_tree( ompi_coll_tree_t** tree );
+
+    /* debugging stuff, will be removed later */
+    int ompi_coll_tuned_topo_dump_tree (ompi_coll_tree_t* tree, int rank);
+
+#endif  /* MCA_COLL_TUNED_TOPO_H_HAS_BEEN_INCLUDED */
+
index 24cd91c..9af9ad3 100644 (file)
@@ -185,7 +185,13 @@ COLL_APPLY(action, COLL_REDUCE_SIG, binomial) COLL_sep \
 COLL_APPLY(action, COLL_REDUCE_SIG, flat_tree) COLL_sep \
 COLL_APPLY(action, COLL_REDUCE_SIG, NTSL) COLL_sep \
 COLL_APPLY(action, COLL_REDUCE_SIG, scatter_gather) COLL_sep \
-COLL_APPLY(action, COLL_REDUCE_SIG, ompi)
+COLL_APPLY(action, COLL_REDUCE_SIG, ompi) COLL_sep \
+COLL_APPLY(action, COLL_REDUCE_SIG, ompi_chain) COLL_sep \
+COLL_APPLY(action, COLL_REDUCE_SIG, ompi_pipeline) COLL_sep \
+COLL_APPLY(action, COLL_REDUCE_SIG, ompi_basic_linear) COLL_sep \
+COLL_APPLY(action, COLL_REDUCE_SIG, ompi_in_order_binary) COLL_sep \
+COLL_APPLY(action, COLL_REDUCE_SIG, ompi_binary) COLL_sep \
+COLL_APPLY(action, COLL_REDUCE_SIG, ompi_binomial)
 
 COLL_REDUCES(COLL_PROTO, COLL_NOsep)
 
diff --git a/src/smpi/colls/reduce-ompi.c b/src/smpi/colls/reduce-ompi.c
new file mode 100644 (file)
index 0000000..6dc846c
--- /dev/null
@@ -0,0 +1,683 @@
+/*
+ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
+ *                         University Research and Technology
+ *                         Corporation.  All rights reserved.
+ * Copyright (c) 2004-2009 The University of Tennessee and The University
+ *                         of Tennessee Research Foundation.  All rights
+ *                         reserved.
+ * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
+ *                         University of Stuttgart.  All rights reserved.
+ * Copyright (c) 2004-2005 The Regents of the University of California.
+ *                         All rights reserved.
+ * $COPYRIGHT$
+ *
+ * Additional copyrights may follow
+ *
+ * $HEADER$
+ */
+
+#include "colls_private.h"
+#include "coll_tuned_topo.h"
+#define MCA_COLL_BASE_TAG_REDUCE 555
+
+
+
+int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int original_count,
+                                    MPI_Datatype datatype, MPI_Op  op,
+                                    int root, MPI_Comm comm,
+                                    ompi_coll_tree_t* tree, int count_by_segment,
+                                    int max_outstanding_reqs );
+/**
+ * This is a generic implementation of the reduce protocol. It used the tree
+ * provided as an argument and execute all operations using a segment of
+ * count times a datatype.
+ * For the last communication it will update the count in order to limit
+ * the number of datatype to the original count (original_count)
+ *
+ * Note that for non-commutative operations we cannot save memory copy
+ * for the first block: thus we must copy sendbuf to accumbuf on intermediate 
+ * to keep the optimized loop happy.
+ */
+int smpi_coll_tuned_ompi_reduce_generic( void* sendbuf, void* recvbuf, int original_count,
+                                    MPI_Datatype datatype, MPI_Op  op,
+                                    int root, MPI_Comm comm,
+                                    ompi_coll_tree_t* tree, int count_by_segment,
+                                    int max_outstanding_reqs )
+{
+    char *inbuf[2] = {NULL, NULL}, *inbuf_free[2] = {NULL, NULL};
+    char *accumbuf = NULL, *accumbuf_free = NULL;
+    char *local_op_buffer = NULL, *sendtmpbuf = NULL;
+    ptrdiff_t extent, lower_bound, segment_increment;
+    size_t typelng;
+    MPI_Request  reqs[2] = {MPI_REQUEST_NULL, MPI_REQUEST_NULL};
+    int num_segments, line, ret, segindex, i, rank;
+    int recvcount, prevcount, inbi;
+
+    /**
+     * Determine number of segments and number of elements
+     * sent per operation
+     */
+    smpi_datatype_extent( datatype, &lower_bound, &extent);
+    typelng = smpi_datatype_size( datatype );
+    num_segments = (original_count + count_by_segment - 1) / count_by_segment;
+    segment_increment = count_by_segment * extent;
+
+    sendtmpbuf = (char*) sendbuf; 
+    if( sendbuf == MPI_IN_PLACE ) { 
+        sendtmpbuf = (char *)recvbuf; 
+    }
+
+    XBT_DEBUG( "coll:tuned:reduce_generic count %d, msg size %ld, segsize %ld, max_requests %d", original_count, (unsigned long)(num_segments * segment_increment), (unsigned long)segment_increment, max_outstanding_reqs);
+
+    rank = smpi_comm_rank(comm);
+
+    /* non-leaf nodes - wait for children to send me data & forward up 
+       (if needed) */
+    if( tree->tree_nextsize > 0 ) {
+        ptrdiff_t true_extent, real_segment_size;
+        true_extent=smpi_datatype_get_extent( datatype);
+
+        /* handle non existant recv buffer (i.e. its NULL) and 
+           protect the recv buffer on non-root nodes */
+        accumbuf = (char*)recvbuf;
+        if( (NULL == accumbuf) || (root != rank) ) {
+            /* Allocate temporary accumulator buffer. */
+            accumbuf_free = (char*)malloc(true_extent + 
+                                          (original_count - 1) * extent);
+            if (accumbuf_free == NULL) { 
+                line = __LINE__; ret = -1; goto error_hndl; 
+            }
+            accumbuf = accumbuf_free - lower_bound;
+        } 
+
+        /* If this is a non-commutative operation we must copy
+           sendbuf to the accumbuf, in order to simplfy the loops */
+        if (!smpi_op_is_commute(op)) {
+            smpi_datatype_copy(
+                                                (char*)accumbuf, original_count, datatype,
+                                                (char*)sendtmpbuf, original_count, datatype);
+        }
+        /* Allocate two buffers for incoming segments */
+        real_segment_size = true_extent + (count_by_segment - 1) * extent;
+        inbuf_free[0] = (char*) malloc(real_segment_size);
+        if( inbuf_free[0] == NULL ) { 
+            line = __LINE__; ret = -1; goto error_hndl; 
+        }
+        inbuf[0] = inbuf_free[0] - lower_bound;
+        /* if there is chance to overlap communication -
+           allocate second buffer */
+        if( (num_segments > 1) || (tree->tree_nextsize > 1) ) {
+            inbuf_free[1] = (char*) malloc(real_segment_size);
+            if( inbuf_free[1] == NULL ) { 
+                line = __LINE__; ret = -1; goto error_hndl;
+            }
+            inbuf[1] = inbuf_free[1] - lower_bound;
+        } 
+
+        /* reset input buffer index and receive count */
+        inbi = 0;
+        recvcount = 0;
+        /* for each segment */
+        for( segindex = 0; segindex <= num_segments; segindex++ ) {
+            prevcount = recvcount;
+            /* recvcount - number of elements in current segment */
+            recvcount = count_by_segment;
+            if( segindex == (num_segments-1) )
+                recvcount = original_count - count_by_segment * segindex;
+
+            /* for each child */
+            for( i = 0; i < tree->tree_nextsize; i++ ) {
+                /**
+                 * We try to overlap communication:
+                 * either with next segment or with the next child
+                 */
+                /* post irecv for current segindex on current child */
+                if( segindex < num_segments ) {
+                    void* local_recvbuf = inbuf[inbi];
+                    if( 0 == i ) {
+                        /* for the first step (1st child per segment) and 
+                         * commutative operations we might be able to irecv 
+                         * directly into the accumulate buffer so that we can 
+                         * reduce(op) this with our sendbuf in one step as 
+                         * ompi_op_reduce only has two buffer pointers, 
+                         * this avoids an extra memory copy.
+                         *
+                         * BUT if the operation is non-commutative or 
+                         * we are root and are USING MPI_IN_PLACE this is wrong!
+                         */
+                        if( (smpi_op_is_commute(op)) &&
+                            !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
+                            local_recvbuf = accumbuf + segindex * segment_increment;
+                        }
+                    }
+
+                    reqs[inbi]=smpi_mpi_irecv(local_recvbuf, recvcount, datatype,
+                                             tree->tree_next[i], 
+                                             MCA_COLL_BASE_TAG_REDUCE, comm
+                                             );
+                }
+                /* wait for previous req to complete, if any.
+                   if there are no requests reqs[inbi ^1] will be 
+                   MPI_REQUEST_NULL. */
+                /* wait on data from last child for previous segment */
+                smpi_mpi_waitall( 1, &reqs[inbi ^ 1], 
+                                             MPI_STATUSES_IGNORE );
+                local_op_buffer = inbuf[inbi ^ 1];
+                if( i > 0 ) {
+                    /* our first operation is to combine our own [sendbuf] data 
+                     * with the data we recvd from down stream (but only 
+                     * the operation is commutative and if we are not root and 
+                     * not using MPI_IN_PLACE)
+                     */
+                    if( 1 == i ) {
+                        if( (smpi_op_is_commute(op)) && 
+                            !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
+                            local_op_buffer = sendtmpbuf + segindex * segment_increment;
+                        }
+                    }
+                    /* apply operation */
+                    smpi_op_apply(op, local_op_buffer, 
+                                   accumbuf + segindex * segment_increment, 
+                                   &recvcount, &datatype );
+                } else if ( segindex > 0 ) {
+                    void* accumulator = accumbuf + (segindex-1) * segment_increment;
+                    if( tree->tree_nextsize <= 1 ) {
+                        if( (smpi_op_is_commute(op)) &&
+                            !((MPI_IN_PLACE == sendbuf) && (rank == tree->tree_root)) ) {
+                            local_op_buffer = sendtmpbuf + (segindex-1) * segment_increment;
+                        }
+                    }
+                    smpi_op_apply(op, local_op_buffer, accumulator, &prevcount, 
+                                   &datatype );
+
+                    /* all reduced on available data this step (i) complete, 
+                     * pass to the next process unless you are the root.
+                     */
+                    if (rank != tree->tree_root) {
+                        /* send combined/accumulated data to parent */
+                        smpi_mpi_send( accumulator, prevcount, 
+                                                  datatype, tree->tree_prev, 
+                                                  MCA_COLL_BASE_TAG_REDUCE,
+                                                  comm);
+                    }
+
+                    /* we stop when segindex = number of segments 
+                       (i.e. we do num_segment+1 steps for pipelining */
+                    if (segindex == num_segments) break;
+                }
+
+                /* update input buffer index */
+                inbi = inbi ^ 1;
+            } /* end of for each child */
+        } /* end of for each segment */
+
+        /* clean up */
+        if( inbuf_free[0] != NULL) free(inbuf_free[0]);
+        if( inbuf_free[1] != NULL) free(inbuf_free[1]);
+        if( accumbuf_free != NULL ) free(accumbuf_free);
+    }
+
+    /* leaf nodes 
+       Depending on the value of max_outstanding_reqs and 
+       the number of segments we have two options:
+       - send all segments using blocking send to the parent, or
+       - avoid overflooding the parent nodes by limiting the number of 
+       outstanding requests to max_oustanding_reqs.
+       TODO/POSSIBLE IMPROVEMENT: If there is a way to determine the eager size 
+       for the current communication, synchronization should be used only 
+       when the message/segment size is smaller than the eager size.
+    */
+    else {
+
+        /* If the number of segments is less than a maximum number of oustanding
+           requests or there is no limit on the maximum number of outstanding 
+           requests, we send data to the parent using blocking send */
+        if ((0 == max_outstanding_reqs) || 
+            (num_segments <= max_outstanding_reqs)) {
+            
+            segindex = 0;
+            while ( original_count > 0) {
+                if (original_count < count_by_segment) {
+                    count_by_segment = original_count;
+                }
+                smpi_mpi_send((char*)sendbuf + 
+                                         segindex * segment_increment,
+                                         count_by_segment, datatype,
+                                         tree->tree_prev, 
+                                         MCA_COLL_BASE_TAG_REDUCE,
+                                         comm) ;
+                segindex++;
+                original_count -= count_by_segment;
+            }
+        }
+
+        /* Otherwise, introduce flow control:
+           - post max_outstanding_reqs non-blocking synchronous send,
+           - for remaining segments
+           - wait for a ssend to complete, and post the next one.
+           - wait for all outstanding sends to complete.
+        */
+        else {
+
+            int creq = 0;
+            MPI_Request* sreq = NULL;
+
+            sreq = (MPI_Request*) calloc( max_outstanding_reqs,
+                                              sizeof(MPI_Request ) );
+            if (NULL == sreq) { line = __LINE__; ret = -1; goto error_hndl; }
+
+            /* post first group of requests */
+            for (segindex = 0; segindex < max_outstanding_reqs; segindex++) {
+                sreq[segindex]=smpi_mpi_isend((char*)sendbuf +
+                                          segindex * segment_increment,
+                                          count_by_segment, datatype,
+                                          tree->tree_prev, 
+                                          MCA_COLL_BASE_TAG_REDUCE,
+                                          comm);
+                original_count -= count_by_segment;
+            }
+
+            creq = 0;
+            while ( original_count > 0 ) {
+                /* wait on a posted request to complete */
+                smpi_mpi_wait(&sreq[creq], MPI_STATUS_IGNORE);
+                sreq[creq] = MPI_REQUEST_NULL;
+
+                if( original_count < count_by_segment ) {
+                    count_by_segment = original_count;
+                }
+                sreq[creq]=smpi_mpi_isend((char*)sendbuf + 
+                                          segindex * segment_increment, 
+                                          count_by_segment, datatype, 
+                                          tree->tree_prev, 
+                                          MCA_COLL_BASE_TAG_REDUCE, 
+                                          comm );
+                creq = (creq + 1) % max_outstanding_reqs;
+                segindex++;
+                original_count -= count_by_segment;
+            }
+
+            /* Wait on the remaining request to complete */
+            smpi_mpi_waitall( max_outstanding_reqs, sreq, 
+                                         MPI_STATUSES_IGNORE );
+
+            /* free requests */
+            free(sreq);
+        }
+    }
+    return MPI_SUCCESS;
+
+ error_hndl:  /* error handler */
+    XBT_DEBUG("ERROR_HNDL: node %d file %s line %d error %d\n", 
+                   rank, __FILE__, line, ret );
+    if( inbuf_free[0] != NULL ) free(inbuf_free[0]);
+    if( inbuf_free[1] != NULL ) free(inbuf_free[1]);
+    if( accumbuf_free != NULL ) free(accumbuf);
+    return ret;
+}
+
+/* Attention: this version of the reduce operations does not
+   work for:
+   - non-commutative operations
+   - segment sizes which are not multiplies of the extent of the datatype
+     meaning that at least one datatype must fit in the segment !
+*/
+
+int smpi_coll_tuned_reduce_ompi_chain( void *sendbuf, void *recvbuf, int count,
+                                        MPI_Datatype datatype, 
+                                        MPI_Op  op, int root, 
+                                        MPI_Comm  comm
+                                        )
+{
+    uint32_t segsize=64*1024;
+    int segcount = count;
+    size_t typelng;
+    int fanout = smpi_comm_size(comm)/2;
+
+    XBT_DEBUG("coll:tuned:reduce_intra_chain rank %d fo %d ss %5d", smpi_comm_rank(comm), fanout, segsize);
+
+    /**
+     * Determine number of segments and number of elements
+     * sent per operation
+     */
+    typelng = smpi_datatype_size( datatype);
+    
+    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
+
+    return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, 
+                                           op, root, comm,
+                                           ompi_coll_tuned_topo_build_chain(fanout, comm, root), 
+                                           segcount, 0 );
+}
+
+
+int smpi_coll_tuned_reduce_ompi_pipeline( void *sendbuf, void *recvbuf,
+                                           int count, MPI_Datatype datatype,
+                                           MPI_Op  op, int root,
+                                           MPI_Comm  comm  )
+{
+
+    uint32_t segsize;
+    int segcount = count;
+    size_t typelng;
+//    COLL_TUNED_UPDATE_PIPELINE( comm, tuned_module, root );
+
+    /**
+     * Determine number of segments and number of elements
+     * sent per operation
+     */
+    const double a2 =  0.0410 / 1024.0; /* [1/B] */
+    const double b2 =  9.7128;
+    const double a4 =  0.0033 / 1024.0; /* [1/B] */
+    const double b4 =  1.6761;
+    typelng= smpi_datatype_size( datatype);
+    int communicator_size = smpi_comm_size(comm);
+    size_t message_size = typelng * count; 
+
+    if (communicator_size > (a2 * message_size + b2)) {
+        // Pipeline_1K 
+        segsize = 1024;
+    }else if (communicator_size > (a4 * message_size + b4)) {
+        // Pipeline_32K 
+        segsize = 32*1024;
+    } else {
+        // Pipeline_64K 
+        segsize = 64*1024;
+    }
+
+    XBT_DEBUG("coll:tuned:reduce_intra_pipeline rank %d ss %5d",
+                 smpi_comm_rank(comm), segsize);
+
+    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
+
+    return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, 
+                                           op, root, comm,
+                                           ompi_coll_tuned_topo_build_chain( 1, comm, root), 
+                                           segcount, 0);
+}
+
+int smpi_coll_tuned_reduce_ompi_binary( void *sendbuf, void *recvbuf,
+                                         int count, MPI_Datatype datatype,
+                                         MPI_Op  op, int root,
+                                         MPI_Comm  comm)
+{
+    uint32_t segsize;
+    int segcount = count;
+    size_t typelng;
+
+
+
+    /**
+     * Determine number of segments and number of elements
+     * sent per operation
+     */
+    typelng=smpi_datatype_size( datatype );
+
+        // Binary_32K 
+    segsize = 32*1024;
+
+    XBT_DEBUG("coll:tuned:reduce_intra_binary rank %d ss %5d",
+                 smpi_comm_rank(comm), segsize);
+
+    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
+
+    return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, 
+                                           op, root, comm, 
+                                           ompi_coll_tuned_topo_build_tree(2, comm, root), 
+                                           segcount, 0);
+}
+
+int smpi_coll_tuned_reduce_ompi_binomial( void *sendbuf, void *recvbuf,
+                                           int count, MPI_Datatype datatype,
+                                           MPI_Op  op, int root,
+                                           MPI_Comm  comm)
+{
+
+    uint32_t segsize=0;
+    int segcount = count;
+    size_t typelng;
+
+    const double a1 =  0.6016 / 1024.0; /* [1/B] */
+    const double b1 =  1.3496;
+
+//    COLL_TUNED_UPDATE_IN_ORDER_BMTREE( comm, tuned_module, root );
+
+    /**
+     * Determine number of segments and number of elements
+     * sent per operation
+     */
+    typelng= smpi_datatype_size( datatype);
+    int communicator_size = smpi_comm_size(comm);
+    size_t message_size = typelng * count; 
+    if (((communicator_size < 8) && (message_size < 20480)) ||
+               (message_size < 2048) || (count <= 1)) {
+        /* Binomial_0K */
+        segsize = 0;
+    } else if (communicator_size > (a1 * message_size + b1)) {
+        // Binomial_1K 
+        segsize = 1024;
+    }
+
+    XBT_DEBUG("coll:tuned:reduce_intra_binomial rank %d ss %5d",
+                 smpi_comm_rank(comm), segsize);
+    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
+
+    return smpi_coll_tuned_ompi_reduce_generic( sendbuf, recvbuf, count, datatype, 
+                                           op, root, comm, 
+                                           ompi_coll_tuned_topo_build_in_order_bmtree(comm, root), 
+                                           segcount, 0);
+}
+
+/*
+ * reduce_intra_in_order_binary 
+ * 
+ * Function:      Logarithmic reduce operation for non-commutative operations.
+ * Acecpts:       same as MPI_Reduce()
+ * Returns:       MPI_SUCCESS or error code
+ */
+int smpi_coll_tuned_reduce_ompi_in_order_binary( void *sendbuf, void *recvbuf,
+                                                  int count, 
+                                                  MPI_Datatype datatype,
+                                                  MPI_Op  op, int root,
+                                                  MPI_Comm  comm)
+{
+    uint32_t segsize=0;
+    int ret;
+    int rank, size, io_root;
+    int segcount = count;
+    void *use_this_sendbuf = NULL, *use_this_recvbuf = NULL;
+    size_t typelng;
+
+    rank = smpi_comm_rank(comm);
+    size = smpi_comm_size(comm);
+    XBT_DEBUG("coll:tuned:reduce_intra_in_order_binary rank %d ss %5d",
+                 rank, segsize);
+
+    /**
+     * Determine number of segments and number of elements
+     * sent per operation
+     */
+    typelng=smpi_datatype_size( datatype);
+    COLL_TUNED_COMPUTED_SEGCOUNT( segsize, typelng, segcount );
+
+    /* An in-order binary tree must use root (size-1) to preserve the order of
+       operations.  Thus, if root is not rank (size - 1), then we must handle
+       1. MPI_IN_PLACE option on real root, and 
+       2. we must allocate temporary recvbuf on rank (size - 1).
+       Note that generic function must be careful not to switch order of 
+       operations for non-commutative ops.
+    */
+    io_root = size - 1;
+    use_this_sendbuf = sendbuf;
+    use_this_recvbuf = recvbuf;
+    if (io_root != root) {
+        ptrdiff_t text, ext;
+        char *tmpbuf = NULL;
+    
+        ext=smpi_datatype_get_extent(datatype);
+        text=smpi_datatype_get_extent(datatype);
+
+        if ((root == rank) && (MPI_IN_PLACE == sendbuf)) {
+            tmpbuf = (char *) malloc(text + (count - 1) * ext);
+            if (NULL == tmpbuf) {
+                return MPI_ERR_INTERN;
+            }
+            smpi_datatype_copy (
+                                                (char*)tmpbuf, count, datatype,
+                                                (char*)recvbuf, count, datatype);
+            use_this_sendbuf = tmpbuf;
+        } else if (io_root == rank) {
+            tmpbuf = (char *) malloc(text + (count - 1) * ext);
+            if (NULL == tmpbuf) {
+                return MPI_ERR_INTERN;
+            }
+            use_this_recvbuf = tmpbuf;
+        }
+    }
+
+    /* Use generic reduce with in-order binary tree topology and io_root */
+    ret = smpi_coll_tuned_ompi_reduce_generic( use_this_sendbuf, use_this_recvbuf, count, datatype,
+                                          op, io_root, comm, 
+                                          ompi_coll_tuned_topo_build_in_order_bintree(comm), 
+                                          segcount, 0 );
+    if (MPI_SUCCESS != ret) { return ret; }
+
+    /* Clean up */
+    if (io_root != root) {
+        if (root == rank) {
+            /* Receive result from rank io_root to recvbuf */
+            smpi_mpi_recv(recvbuf, count, datatype, io_root,
+                                    MCA_COLL_BASE_TAG_REDUCE, comm,
+                                    MPI_STATUS_IGNORE);
+            if (MPI_IN_PLACE == sendbuf) {
+                free(use_this_sendbuf);
+            }
+          
+        } else if (io_root == rank) {
+            /* Send result from use_this_recvbuf to root */
+            smpi_mpi_send(use_this_recvbuf, count, datatype, root,
+                                    MCA_COLL_BASE_TAG_REDUCE, 
+                                    comm);
+            free(use_this_recvbuf);
+        }
+    }
+
+    return MPI_SUCCESS;
+}
+
+/*
+ * Linear functions are copied from the BASIC coll module
+ * they do not segment the message and are simple implementations
+ * but for some small number of nodes and/or small data sizes they 
+ * are just as fast as tuned/tree based segmenting operations 
+ * and as such may be selected by the decision functions
+ * These are copied into this module due to the way we select modules
+ * in V1. i.e. in V2 we will handle this differently and so will not
+ * have to duplicate code.
+ * GEF Oct05 after asking Jeff.
+ */
+
+/* copied function (with appropriate renaming) starts here */
+
+/*
+ *  reduce_lin_intra
+ *
+ *  Function:   - reduction using O(N) algorithm
+ *  Accepts:    - same as MPI_Reduce()
+ *  Returns:    - MPI_SUCCESS or error code
+ */
+
+int
+smpi_coll_tuned_reduce_ompi_basic_linear(void *sbuf, void *rbuf, int count,
+                                          MPI_Datatype dtype,
+                                          MPI_Op op,
+                                          int root,
+                                          MPI_Comm comm)
+{
+    int i, rank, size;
+    ptrdiff_t true_extent, lb, extent;
+    char *free_buffer = NULL;
+    char *pml_buffer = NULL;
+    char *inplace_temp = NULL;
+    char *inbuf;
+
+    /* Initialize */
+
+    rank = smpi_comm_rank(comm);
+    size = smpi_comm_size(comm);
+
+    XBT_DEBUG("coll:tuned:reduce_intra_basic_linear rank %d", rank);
+
+    /* If not root, send data to the root. */
+
+    if (rank != root) {
+        smpi_mpi_send(sbuf, count, dtype, root,
+                                MCA_COLL_BASE_TAG_REDUCE,
+                                comm);
+        return -1;
+    }
+
+    /* see discussion in ompi_coll_basic_reduce_lin_intra about 
+       extent and true extent */
+    /* for reducing buffer allocation lengths.... */
+
+    smpi_datatype_extent(dtype, &lb, &extent);
+    true_extent = smpi_datatype_get_extent(dtype);
+
+    if (MPI_IN_PLACE == sbuf) {
+        sbuf = rbuf;
+        inplace_temp = (char*)malloc(true_extent + (count - 1) * extent);
+        if (NULL == inplace_temp) {
+            return -1;
+        }
+        rbuf = inplace_temp - lb;
+    }
+
+    if (size > 1) {
+        free_buffer = (char*)malloc(true_extent + (count - 1) * extent);
+        pml_buffer = free_buffer - lb;
+    }
+
+    /* Initialize the receive buffer. */
+
+    if (rank == (size - 1)) {
+        smpi_datatype_copy((char*)rbuf, count, dtype,
+                                                  (char*)sbuf, count, dtype);
+    } else {
+        smpi_mpi_recv(rbuf, count, dtype, size - 1,
+                                MCA_COLL_BASE_TAG_REDUCE, comm,
+                                MPI_STATUS_IGNORE);
+    }
+
+    /* Loop receiving and calling reduction function (C or Fortran). */
+
+    for (i = size - 2; i >= 0; --i) {
+        if (rank == i) {
+            inbuf = (char*)sbuf;
+        } else {
+            smpi_mpi_recv(pml_buffer, count, dtype, i,
+                                    MCA_COLL_BASE_TAG_REDUCE, comm,
+                                    MPI_STATUS_IGNORE);
+            inbuf = pml_buffer;
+        }
+
+        /* Perform the reduction */
+        smpi_op_apply(op, inbuf, rbuf, &count, &dtype);
+    }
+
+    if (NULL != inplace_temp) {
+        smpi_datatype_copy((char*)sbuf, count, dtype,
+                                                  inplace_temp,count , dtype);
+        free(inplace_temp);
+    }
+    if (NULL != free_buffer) {
+        free(free_buffer);
+    }
+
+    /* All done */
+    return MPI_SUCCESS;
+}
+
+/* copied function (with appropriate renaming) ends here */
+
+
index 5d7d5af..2c4fc10 100644 (file)
@@ -232,12 +232,12 @@ int smpi_coll_tuned_reduce_ompi( void *sendbuf, void *recvbuf,
     int communicator_size=0;
     //int segsize = 0;
     size_t message_size, dsize;
-    //const double a1 =  0.6016 / 1024.0; /* [1/B] */
-    //const double b1 =  1.3496;
-    //const double a2 =  0.0410 / 1024.0; /* [1/B] */
-    //const double b2 =  9.7128;
-    //const double a3 =  0.0422 / 1024.0; /* [1/B] */
-    //const double b3 =  1.1614;
+    const double a1 =  0.6016 / 1024.0; /* [1/B] */
+    const double b1 =  1.3496;
+    const double a2 =  0.0410 / 1024.0; /* [1/B] */
+    const double b2 =  9.7128;
+    const double a3 =  0.0422 / 1024.0; /* [1/B] */
+    const double b3 =  1.1614;
     //const double a4 =  0.0033 / 1024.0; /* [1/B] */
     //const double b4 =  1.6761;
 
@@ -253,47 +253,47 @@ int smpi_coll_tuned_reduce_ompi( void *sendbuf, void *recvbuf,
      * If the operation is non commutative we currently have choice of linear 
      * or in-order binary tree algorithm.
      */
-/*    if( !ompi_op_is_commute(op) ) {
+    if( !smpi_op_is_commute(op) ) {
         if ((communicator_size < 12) && (message_size < 2048)) {
-            return smpi_coll_tuned_reduce_intra_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm, module); 
+            return smpi_coll_tuned_reduce_ompi_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm/*, module*/); 
         } 
-        return smpi_coll_tuned_reduce_intra_in_order_binary (sendbuf, recvbuf, count, datatype, op, root, comm, module,
-                                                             0, max_requests); 
-    }*/
+        return smpi_coll_tuned_reduce_ompi_in_order_binary (sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
+                                                             0, max_requests*/); 
+    }
 
     if ((communicator_size < 8) && (message_size < 512)){
         /* Linear_0K */
-        return smpi_coll_tuned_reduce_flat_tree (sendbuf, recvbuf, count, datatype, op, root, comm); 
+        return smpi_coll_tuned_reduce_ompi_basic_linear (sendbuf, recvbuf, count, datatype, op, root, comm); 
     } else if (((communicator_size < 8) && (message_size < 20480)) ||
                (message_size < 2048) || (count <= 1)) {
         /* Binomial_0K */
         //segsize = 0;
-        return smpi_coll_tuned_reduce_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
+        return smpi_coll_tuned_reduce_ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
                                                      segsize, max_requests*/);
-    } /*else if (communicator_size > (a1 * message_size + b1)) {
+    } else if (communicator_size > (a1 * message_size + b1)) {
         // Binomial_1K 
-        segsize = 1024;
-        return smpi_coll_tuned_reduce_intra_binomial(sendbuf, recvbuf, count, datatype, op, root, comm, module,
-                                                     segsize, max_requests);
+        //segsize = 1024;
+        return smpi_coll_tuned_reduce_ompi_binomial(sendbuf, recvbuf, count, datatype, op, root, comm/*, module,
+                                                     segsize, max_requests*/);
     } else if (communicator_size > (a2 * message_size + b2)) {
         // Pipeline_1K 
-        segsize = 1024;
-        return smpi_coll_tuned_reduce_NTSL (sendbuf, recvbuf, count, datatype, op, root, comm, module, 
-                                                      segsize, max_requests);
+        //segsize = 1024;
+        return smpi_coll_tuned_reduce_ompi_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, 
+                                                      segsize, max_requests*/);
     } else if (communicator_size > (a3 * message_size + b3)) {
         // Binary_32K 
-        segsize = 32*1024;
-        return smpi_coll_tuned_reduce_intra_binary( sendbuf, recvbuf, count, datatype, op, root,
-                                                    comm, module, segsize, max_requests);
+        //segsize = 32*1024;
+        return smpi_coll_tuned_reduce_ompi_binary( sendbuf, recvbuf, count, datatype, op, root,
+                                                    comm/*, module, segsize, max_requests*/);
     }
-    if (communicator_size > (a4 * message_size + b4)) {
+    /*if (communicator_size > (a4 * message_size + b4)) {
         // Pipeline_32K 
         segsize = 32*1024;
     } else {
         // Pipeline_64K 
         segsize = 64*1024;
     }*/
-    return smpi_coll_tuned_reduce_NTSL (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, 
+    return smpi_coll_tuned_reduce_ompi_pipeline (sendbuf, recvbuf, count, datatype, op, root, comm/*, module, 
                                                   segsize, max_requests*/);
 
 #if 0