Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add butterfly algorithm from openmpi
authorAugustin Degomme <adegomme@users.noreply.github.com>
Wed, 23 Feb 2022 18:05:09 +0000 (19:05 +0100)
committerAugustin Degomme <adegomme@users.noreply.github.com>
Wed, 23 Feb 2022 18:07:12 +0000 (19:07 +0100)
described in [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.

docs/source/app_smpi.rst
src/smpi/colls/reduce_scatter/reduce_scatter-ompi.cpp
src/smpi/colls/smpi_coll.cpp
src/smpi/colls/smpi_openmpi_selector.cpp
src/smpi/include/smpi_coll.hpp
teshsuite/smpi/CMakeLists.txt

index 01bc627..79265c9 100644 (file)
@@ -177,7 +177,7 @@ of the targeted MPI implementations.
 You can switch the automatic selector through the
 ``smpi/coll-selector`` configuration item. Possible values:
 
- - **ompi:** default selection logic of OpenMPI (version 3.1.2)
+ - **ompi:** default selection logic of OpenMPI (version 4.1.2)
  - **mpich**: default selection logic of MPICH (version 3.3b)
  - **mvapich2**: selection logic of MVAPICH2 (version 1.9) tuned
    on the Stampede cluster
@@ -359,6 +359,7 @@ MPI_Reduce_scatter
 ``automatic (experimental)``: use an automatic self-benchmarking algorithm. |br|
 ``ompi_basic_recursivehalving``: recursive halving version from OpenMPI. |br|
 ``ompi_ring``: ring version from OpenMPI. |br|
+``ompi_butterfly``: butterfly version from OpenMPI. |br|
 ``mpich_pair``: pairwise exchange version from MPICH. |br|
 ``mpich_rdb``: recursive doubling version from MPICH. |br|
 ``mpich_noncomm``: only works for power of 2 procs, recursive doubling for noncommutative ops. |br|
index 3a02915..8e9efeb 100644 (file)
@@ -526,6 +526,282 @@ int reduce_scatter__ompi_ring(const void *sbuf, void *rbuf, const int *rcounts,
       smpi_free_tmp_buffer(inbuf_free[1]);
     return ret;
 }
+
+static int ompi_sum_counts(const int *counts, int *displs, int nprocs_rem, int lo, int hi)
+{
+    /* Adjust lo and hi for taking into account blocks of excluded processes */
+    lo = (lo < nprocs_rem) ? lo * 2 : lo + nprocs_rem;
+    hi = (hi < nprocs_rem) ? hi * 2 + 1 : hi + nprocs_rem;
+    return displs[hi] + counts[hi] - displs[lo];
 }
+
+/*
+ * ompi_mirror_perm: Returns mirror permutation of nbits low-order bits
+ *                   of x [*].
+ * [*] Warren Jr., Henry S. Hacker's Delight (2ed). 2013.
+ *     Chapter 7. Rearranging Bits and Bytes.
+ */
+static unsigned int ompi_mirror_perm(unsigned int x, int nbits)
+{
+    x = (((x & 0xaaaaaaaa) >> 1) | ((x & 0x55555555) << 1));
+    x = (((x & 0xcccccccc) >> 2) | ((x & 0x33333333) << 2));
+    x = (((x & 0xf0f0f0f0) >> 4) | ((x & 0x0f0f0f0f) << 4));
+    x = (((x & 0xff00ff00) >> 8) | ((x & 0x00ff00ff) << 8));
+    x = ((x >> 16) | (x << 16));
+    return x >> (sizeof(x) * 8 - nbits);
 }
+/*
+ * ompi_coll_base_reduce_scatter_intra_butterfly
+ *
+ * Function:  Butterfly algorithm for reduce_scatter
+ * Accepts:   Same as MPI_Reduce_scatter
+ * Returns:   MPI_SUCCESS or error code
+ *
+ * Description:  Implements butterfly algorithm for MPI_Reduce_scatter [*].
+ *               The algorithm can be used both by commutative and non-commutative
+ *               operations, for power-of-two and non-power-of-two number of processes.
+ *
+ * [*] J.L. Traff. An improved Algorithm for (non-commutative) Reduce-scatter
+ *     with an Application // Proc. of EuroPVM/MPI, 2005. -- pp. 129-137.
+ *
+ * Time complexity: O(m\lambda + log(p)\alpha + m\beta + m\gamma),
+ *   where m = sum of rcounts[], p = comm_size
+ * Memory requirements (per process): 2 * m * typesize + comm_size
+ *
+ * Example: comm_size=6, nprocs_pof2=4, nprocs_rem=2, rcounts[]=1, sbuf=[0,1,...,5]
+ * Step 1. Reduce the number of processes to 4
+ * rank 0: [0|1|2|3|4|5]: send to 1: vrank -1
+ * rank 1: [0|1|2|3|4|5]: recv from 0, op: vrank 0: [0|2|4|6|8|10]
+ * rank 2: [0|1|2|3|4|5]: send to 3: vrank -1
+ * rank 3: [0|1|2|3|4|5]: recv from 2, op: vrank 1: [0|2|4|6|8|10]
+ * rank 4: [0|1|2|3|4|5]: vrank 2: [0|1|2|3|4|5]
+ * rank 5: [0|1|2|3|4|5]: vrank 3: [0|1|2|3|4|5]
+ *
+ * Step 2. Butterfly. Buffer of 6 elements is divided into 4 blocks.
+ * Round 1 (mask=1, nblocks=2)
+ * 0: vrank -1
+ * 1: vrank  0 [0 2|4 6|8|10]: exch with 1: send [2,3], recv [0,1]: [0 4|8 12|*|*]
+ * 2: vrank -1
+ * 3: vrank  1 [0 2|4 6|8|10]: exch with 0: send [0,1], recv [2,3]: [**|**|16|20]
+ * 4: vrank  2 [0 1|2 3|4|5] : exch with 3: send [2,3], recv [0,1]: [0 2|4 6|*|*]
+ * 5: vrank  3 [0 1|2 3|4|5] : exch with 2: send [0,1], recv [2,3]: [**|**|8|10]
+ *
+ * Round 2 (mask=2, nblocks=1)
+ * 0: vrank -1
+ * 1: vrank  0 [0 4|8 12|*|*]: exch with 2: send [1], recv [0]: [0 6|**|*|*]
+ * 2: vrank -1
+ * 3: vrank  1 [**|**|16|20] : exch with 3: send [3], recv [2]: [**|**|24|*]
+ * 4: vrank  2 [0 2|4 6|*|*] : exch with 0: send [0], recv [1]: [**|12 18|*|*]
+ * 5: vrank  3 [**|**|8|10]  : exch with 1: send [2], recv [3]: [**|**|*|30]
+ *
+ * Step 3. Exchange with remote process according to a mirror permutation:
+ *         mperm(0)=0, mperm(1)=2, mperm(2)=1, mperm(3)=3
+ * 0: vrank -1: recv "0" from process 0
+ * 1: vrank  0 [0 6|**|*|*]: send "0" to 0, copy "6" to rbuf (mperm(0)=0)
+ * 2: vrank -1: recv result "12" from process 4
+ * 3: vrank  1 [**|**|24|*]
+ * 4: vrank  2 [**|12 18|*|*]: send "12" to 2, send "18" to 3, recv "24" from 3
+ * 5: vrank  3 [**|**|*|30]: copy "30" to rbuf (mperm(3)=3)
+ */
+int reduce_scatter__ompi_butterfly(
+    const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,
+    MPI_Op op, MPI_Comm comm)
+{
+    char *tmpbuf[2] = {NULL, NULL}, *psend, *precv;
+    int *displs = NULL, index;
+    ptrdiff_t span, gap, totalcount, extent;
+    int err = MPI_SUCCESS;
+    int comm_size = comm->size();
+    int rank = comm->rank();
+    int vrank = -1;
+    int nprocs_rem = 0;
+
+    XBT_DEBUG("coll:base:reduce_scatter_intra_butterfly: rank %d/%d",
+                 rank, comm_size);
+    if (comm_size < 2)
+        return MPI_SUCCESS;
 
+    displs = (int*)malloc(sizeof(*displs) * comm_size);
+    if (NULL == displs) {
+        err = MPI_ERR_OTHER;
+        goto cleanup_and_return;
+    }
+    displs[0] = 0;
+    for (int i = 1; i < comm_size; i++) {
+        displs[i] = displs[i - 1] + rcounts[i - 1];
+    }
+    totalcount = displs[comm_size - 1] + rcounts[comm_size - 1];
+    dtype->extent(&gap, &extent);
+    span = extent * totalcount;
+    tmpbuf[0] = (char*)malloc(span);
+    tmpbuf[1] = (char*)malloc(span);
+    if (NULL == tmpbuf[0] || NULL == tmpbuf[1]) {
+        err = MPI_ERR_OTHER;
+        goto cleanup_and_return;
+    }
+    psend = tmpbuf[0] - gap;
+    precv = tmpbuf[1] - gap;
+
+    if (sbuf != MPI_IN_PLACE) {
+       err = Datatype::copy(sbuf, totalcount, dtype, psend, totalcount, dtype);
+       if (MPI_SUCCESS != err) { goto cleanup_and_return; }
+    } else {
+       err = Datatype::copy(rbuf, totalcount, dtype, psend, totalcount, dtype);
+        if (MPI_SUCCESS != err) { goto cleanup_and_return; }
+    }
+
+    /*
+     * Step 1. Reduce the number of processes to the nearest lower power of two
+     * p' = 2^{\floor{\log_2 p}} by removing r = p - p' processes.
+     * In the first 2r processes (ranks 0 to 2r - 1), all the even ranks send
+     * the input vector to their neighbor (rank + 1) and all the odd ranks recv
+     * the input vector and perform local reduction.
+     * The odd ranks (0 to 2r - 1) contain the reduction with the input
+     * vector on their neighbors (the even ranks). The first r odd
+     * processes and the p - 2r last processes are renumbered from
+     * 0 to 2^{\floor{\log_2 p}} - 1. Even ranks do not participate in the
+     * rest of the algorithm.
+     */
+
+    /* Find nearest power-of-two less than or equal to comm_size */
+    int nprocs_pof2, size;
+    for( nprocs_pof2 = 1, size = comm_size; size > 0; size >>= 1, nprocs_pof2 <<= 1 );
+    nprocs_pof2 >>= 1;
+
+    nprocs_rem = comm_size - nprocs_pof2;
+    int log2_size;
+    for (log2_size = 0, size = 1; size < nprocs_pof2; ++log2_size, size <<= 1);
+
+    if (rank < 2 * nprocs_rem) {
+        if ((rank % 2) == 0) {
+            /* Even process */
+            Request::send(psend, totalcount, dtype, rank + 1,
+                                    COLL_TAG_REDUCE_SCATTER, comm);
+            /* This process does not participate in the rest of the algorithm */
+            vrank = -1;
+        } else {
+            /* Odd process */
+            Request::recv(precv, totalcount, dtype, rank - 1,
+                                    COLL_TAG_REDUCE_SCATTER, comm, MPI_STATUS_IGNORE);
+           op->apply(precv, psend, (int*)&totalcount, dtype);
+            /* Adjust rank to be the bottom "remain" ranks */
+            vrank = rank / 2;
+        }
+    } else {
+        /* Adjust rank to show that the bottom "even remain" ranks dropped out */
+        vrank = rank - nprocs_rem;
+    }
+
+    if (vrank != -1) {
+        /*
+         * Now, psend vector of size totalcount is divided into nprocs_pof2 blocks:
+         * block 0:   rcounts[0] and rcounts[1] -- for process 0 and 1
+         * block 1:   rcounts[2] and rcounts[3] -- for process 2 and 3
+         * ...
+         * block r-1: rcounts[2*(r-1)] and rcounts[2*(r-1)+1]
+         * block r:   rcounts[r+r]
+         * block r+1: rcounts[r+r+1]
+         * ...
+         * block nprocs_pof2 - 1: rcounts[r+nprocs_pof2-1]
+         */
+        int nblocks = nprocs_pof2, send_index = 0, recv_index = 0;
+        for (int mask = 1; mask < nprocs_pof2; mask <<= 1) {
+            int vpeer = vrank ^ mask;
+            int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
+
+            nblocks /= 2;
+            if ((vrank & mask) == 0) {
+                /* Send the upper half of reduction buffer, recv the lower half */
+                send_index += nblocks;
+            } else {
+                /* Send the upper half of reduction buffer, recv the lower half */
+                recv_index += nblocks;
+            }
+
+            /* Send blocks: [send_index, send_index + nblocks - 1] */
+            int send_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
+                                             send_index, send_index + nblocks - 1);
+            index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
+            ptrdiff_t sdispl = displs[index];
+
+            /* Recv blocks: [recv_index, recv_index + nblocks - 1] */
+            int recv_count = ompi_sum_counts(rcounts, displs, nprocs_rem,
+                                             recv_index, recv_index + nblocks - 1);
+            index = (recv_index < nprocs_rem) ? 2 * recv_index : nprocs_rem + recv_index;
+            ptrdiff_t rdispl = displs[index];
+
+            Request::sendrecv(psend + (ptrdiff_t)sdispl * extent, send_count,
+                                          dtype, peer, COLL_TAG_REDUCE_SCATTER,
+                                          precv + (ptrdiff_t)rdispl * extent, recv_count,
+                                          dtype, peer, COLL_TAG_REDUCE_SCATTER,
+                                          comm, MPI_STATUS_IGNORE);
+
+            if (vrank < vpeer) {
+                /* precv = psend <op> precv */
+                op->apply(psend + (ptrdiff_t)rdispl * extent,
+                               precv + (ptrdiff_t)rdispl * extent, &recv_count, dtype);
+                char *p = psend;
+                psend = precv;
+                precv = p;
+            } else {
+                /* psend = precv <op> psend */
+                op->apply(precv + (ptrdiff_t)rdispl * extent,
+                               psend + (ptrdiff_t)rdispl * extent, &recv_count, dtype);
+            }
+            send_index = recv_index;
+        }
+        /*
+         * psend points to the result block [send_index]
+         * Exchange results with remote process according to a mirror permutation.
+         */
+        int vpeer = ompi_mirror_perm(vrank, log2_size);
+        int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
+        index = (send_index < nprocs_rem) ? 2 * send_index : nprocs_rem + send_index;
+
+        if (vpeer < nprocs_rem) {
+            /*
+             * Process has two blocks: for excluded process and own.
+             * Send the first block to excluded process.
+             */
+            Request::send(psend + (ptrdiff_t)displs[index] * extent,
+                                    rcounts[index], dtype, peer - 1,
+                                    COLL_TAG_REDUCE_SCATTER,
+                                    comm);
+        }
+
+        /* If process has two blocks, then send the second block (own block) */
+        if (vpeer < nprocs_rem)
+            index++;
+        if (vpeer != vrank) {
+            Request::sendrecv(psend + (ptrdiff_t)displs[index] * extent,
+                                          rcounts[index], dtype, peer,
+                                          COLL_TAG_REDUCE_SCATTER,
+                                          rbuf, rcounts[rank], dtype, peer,
+                                          COLL_TAG_REDUCE_SCATTER,
+                                          comm, MPI_STATUS_IGNORE);
+        } else {
+            err = Datatype::copy(psend + (ptrdiff_t)displs[rank] * extent, rcounts[rank], dtype,
+                                 rbuf, rcounts[rank], dtype);
+            if (MPI_SUCCESS != err) { goto cleanup_and_return; }
+        }
+
+    } else {
+        /* Excluded process: receive result */
+        int vpeer = ompi_mirror_perm((rank + 1) / 2, log2_size);
+        int peer = (vpeer < nprocs_rem) ? vpeer * 2 + 1 : vpeer + nprocs_rem;
+        Request::recv(rbuf, rcounts[rank], dtype, peer,
+                                COLL_TAG_REDUCE_SCATTER, comm,
+                                MPI_STATUS_IGNORE);
+    }
+
+cleanup_and_return:
+    if (displs)
+        free(displs);
+    if (tmpbuf[0])
+        free(tmpbuf[0]);
+    if (tmpbuf[1])
+        free(tmpbuf[1]);
+    return err;
+}
+}
+}
index 2f362ec..ea44ce1 100644 (file)
@@ -103,6 +103,7 @@ std::map<std::string, std::vector<s_mpi_coll_description_t>, std::less<>> smpi_c
        {"ompi_basic_recursivehalving", "reduce_scatter ompi_basic_recursivehalving collective",
         (void*)reduce_scatter__ompi_basic_recursivehalving},
        {"ompi_ring", "reduce_scatter ompi_ring collective", (void*)reduce_scatter__ompi_ring},
+       {"ompi_butterfly", "reduce_scatter ompi_butterfly collective", (void*)reduce_scatter__ompi_butterfly},
        {"mpich", "reduce_scatter mpich collective", (void*)reduce_scatter__mpich},
        {"mpich_pair", "reduce_scatter mpich_pair collective", (void*)reduce_scatter__mpich_pair},
        {"mpich_rdb", "reduce_scatter mpich_rdb collective", (void*)reduce_scatter__mpich_rdb},
index d1607bd..5a76200 100644 (file)
@@ -16,7 +16,6 @@ add algos:
 allreduce nonoverlapping, basic linear
 alltoall linear_sync
 bcast chain
-reduce_scatter butterfly
 scatter linear_nb
 */
 
@@ -780,7 +779,7 @@ int reduce_scatter__ompi(const void *sbuf, void *rbuf,
         &reduce_scatter__default,
         &reduce_scatter__ompi_basic_recursivehalving,
         &reduce_scatter__ompi_ring,
-        &reduce_scatter__ompi_ring,
+        &reduce_scatter__ompi_butterfly,
     };
     /** Algorithms:
      *  {1, "non-overlapping"},
index edb12df..e9dc7e9 100644 (file)
@@ -300,8 +300,9 @@ int reduce__automatic(const void *buf, void *rbuf, int count, MPI_Datatype datat
 int reduce_scatter__default(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
 int reduce_scatter__ompi(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
 int reduce_scatter__ompi_basic_recursivehalving(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
-int reduce_scatter__ompi_ring(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
 int reduce_scatter__mpich(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
+int reduce_scatter__ompi_ring(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
+int reduce_scatter__ompi_butterfly(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
 int reduce_scatter__mpich_pair(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
 int reduce_scatter__mpich_rdb(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
 int reduce_scatter__mpich_noncomm(const void *sbuf, void *rbuf, const int *rcounts, MPI_Datatype dtype,MPI_Op  op,MPI_Comm  comm);
index 3327d0b..cea94e0 100644 (file)
@@ -126,7 +126,7 @@ if(enable_smpi)
     ADD_TESH(tesh-smpi-coll-reduce-${REDUCE} --cfg smpi/reduce:${REDUCE} --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/smpi/coll-reduce --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi/coll-reduce ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/coll-reduce/coll-reduce.tesh)
   endforeach()
 
-  foreach (REDUCE_SCATTER ompi mpich ompi_basic_recursivehalving ompi_ring mpich_noncomm mpich_pair mvapich2 mpich_rdb impi)
+  foreach (REDUCE_SCATTER ompi mpich ompi_basic_recursivehalving ompi_ring ompi_butterfly mpich_noncomm mpich_pair mvapich2 mpich_rdb impi)
     ADD_TESH(tesh-smpi-coll-reduce-scatter-${REDUCE_SCATTER} --cfg smpi/reduce_scatter:${REDUCE_SCATTER} --setenv platfdir=${CMAKE_HOME_DIRECTORY}/examples/platforms --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/smpi/coll-reduce-scatter --cd ${CMAKE_BINARY_DIR}/teshsuite/smpi/coll-reduce-scatter ${CMAKE_HOME_DIRECTORY}/teshsuite/smpi/coll-reduce-scatter/coll-reduce-scatter.tesh)
   endforeach()