Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
smpi: many classes died tonight, but that will save kitten on the long term.
[simgrid.git] / src / smpi / colls / reduce_scatter / reduce_scatter-mpich.cpp
index 8061733..2f75f1f 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2013-2017. The SimGrid Team.
+/* Copyright (c) 2013-2019. The SimGrid Team.
  * All rights reserved.                                                     */
 
 /* This program is free software; you can redistribute it and/or modify it
@@ -24,13 +24,12 @@ static inline int MPIU_Mirror_permutation(unsigned int x, int bits)
 namespace simgrid{
 namespace smpi{
 
-int Coll_reduce_scatter_mpich_pair::reduce_scatter(void *sendbuf, void *recvbuf, int recvcounts[],
-                              MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
+int reduce_scatter__mpich_pair(const void *sendbuf, void *recvbuf, const int recvcounts[],
+                               MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
     int   rank, comm_size, i;
     MPI_Aint extent, true_extent, true_lb;
-    int  *disps;
-    void *tmp_recvbuf;
+    unsigned char* tmp_recvbuf;
     int mpi_errno = MPI_SUCCESS;
     int total_count, dst, src;
     int is_commutative;
@@ -44,7 +43,7 @@ int Coll_reduce_scatter_mpich_pair::reduce_scatter(void *sendbuf, void *recvbuf,
         is_commutative = 1;
     }
 
-    disps = (int*)xbt_malloc( comm_size * sizeof(int));
+    int* disps = new int[comm_size];
 
     total_count = 0;
     for (i=0; i<comm_size; i++) {
@@ -53,8 +52,8 @@ int Coll_reduce_scatter_mpich_pair::reduce_scatter(void *sendbuf, void *recvbuf,
     }
 
     if (total_count == 0) {
-        xbt_free(disps);
-        return MPI_ERR_COUNT;
+      delete[] disps;
+      return MPI_ERR_COUNT;
     }
 
         if (sendbuf != MPI_IN_PLACE) {
@@ -65,9 +64,9 @@ int Coll_reduce_scatter_mpich_pair::reduce_scatter(void *sendbuf, void *recvbuf,
         }
 
         /* allocate temporary buffer to store incoming data */
-        tmp_recvbuf = (void*)smpi_get_tmp_recvbuffer(recvcounts[rank] * std::max(true_extent, extent) + 1);
+        tmp_recvbuf = smpi_get_tmp_recvbuffer(recvcounts[rank] * std::max(true_extent, extent) + 1);
         /* adjust for potential negative lower bound in datatype */
-        tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
+        tmp_recvbuf = tmp_recvbuf - true_lb;
 
         for (i=1; i<comm_size; i++) {
             src = (rank - i + comm_size) % comm_size;
@@ -138,15 +137,15 @@ int Coll_reduce_scatter_mpich_pair::reduce_scatter(void *sendbuf, void *recvbuf,
             if (mpi_errno) return(mpi_errno);
         }
 
-        xbt_free(disps);
+        delete[] disps;
         smpi_free_tmp_buffer(tmp_recvbuf);
 
         return MPI_SUCCESS;
 }
 
 
-int Coll_reduce_scatter_mpich_noncomm::reduce_scatter(void *sendbuf, void *recvbuf, int recvcounts[],
-                              MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
+int reduce_scatter__mpich_noncomm(const void *sendbuf, void *recvbuf, const int recvcounts[],
+                                  MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
     int mpi_errno = MPI_SUCCESS;
     int comm_size = comm->size() ;
@@ -158,9 +157,9 @@ int Coll_reduce_scatter_mpich_noncomm::reduce_scatter(void *sendbuf, void *recvb
     int block_size, total_count, size;
     MPI_Aint true_extent, true_lb;
     int buf0_was_inout;
-    void *tmp_buf0;
-    void *tmp_buf1;
-    void *result_ptr;
+    unsigned char* tmp_buf0;
+    unsigned char* tmp_buf1;
+    unsigned char* result_ptr;
 
     datatype->extent(&true_lb, &true_extent);
 
@@ -183,21 +182,24 @@ int Coll_reduce_scatter_mpich_noncomm::reduce_scatter(void *sendbuf, void *recvb
     block_size = recvcounts[0];
     total_count = block_size * comm_size;
 
-    tmp_buf0=( void *)smpi_get_tmp_sendbuffer( true_extent * total_count);
-    tmp_buf1=( void *)smpi_get_tmp_recvbuffer( true_extent * total_count);
-    void *tmp_buf0_save=tmp_buf0;
-    void *tmp_buf1_save=tmp_buf1;
+    tmp_buf0                     = smpi_get_tmp_sendbuffer(true_extent * total_count);
+    tmp_buf1                     = smpi_get_tmp_recvbuffer(true_extent * total_count);
+    unsigned char* tmp_buf0_save = tmp_buf0;
+    unsigned char* tmp_buf1_save = tmp_buf1;
 
     /* adjust for potential negative lower bound in datatype */
-    tmp_buf0 = (void *)((char*)tmp_buf0 - true_lb);
-    tmp_buf1 = (void *)((char*)tmp_buf1 - true_lb);
+    tmp_buf0 = tmp_buf0 - true_lb;
+    tmp_buf1 = tmp_buf1 - true_lb;
 
     /* Copy our send data to tmp_buf0.  We do this one block at a time and
        permute the blocks as we go according to the mirror permutation. */
     for (i = 0; i < comm_size; ++i) {
-        mpi_errno = Datatype::copy((char *)(sendbuf == MPI_IN_PLACE ? recvbuf : sendbuf) + (i * true_extent * block_size), block_size, datatype,
-                                   (char *)tmp_buf0 + (MPIU_Mirror_permutation(i, log2_comm_size) * true_extent * block_size), block_size, datatype);
-        if (mpi_errno) return(mpi_errno);
+      mpi_errno = Datatype::copy(
+          static_cast<const char*>(sendbuf == MPI_IN_PLACE ? recvbuf : sendbuf) + (i * true_extent * block_size), block_size,
+          datatype, tmp_buf0 + (MPIU_Mirror_permutation(i, log2_comm_size) * true_extent * block_size), block_size,
+          datatype);
+      if (mpi_errno)
+        return mpi_errno;
     }
     buf0_was_inout = 1;
 
@@ -206,8 +208,8 @@ int Coll_reduce_scatter_mpich_noncomm::reduce_scatter(void *sendbuf, void *recvb
     size = total_count;
     for (k = 0; k < log2_comm_size; ++k) {
         /* use a double-buffering scheme to avoid local copies */
-        char *incoming_data = static_cast<char*>(buf0_was_inout ? tmp_buf1 : tmp_buf0);
-        char *outgoing_data = static_cast<char*>(buf0_was_inout ? tmp_buf0 : tmp_buf1);
+        unsigned char* incoming_data = buf0_was_inout ? tmp_buf1 : tmp_buf0;
+        unsigned char* outgoing_data = buf0_was_inout ? tmp_buf0 : tmp_buf1;
         int peer = rank ^ (0x1 << k);
         size /= 2;
 
@@ -251,7 +253,7 @@ int Coll_reduce_scatter_mpich_noncomm::reduce_scatter(void *sendbuf, void *recvb
     xbt_assert(size == recvcounts[rank]);
 
     /* copy the reduced data to the recvbuf */
-    result_ptr = (char *)(buf0_was_inout ? tmp_buf0 : tmp_buf1) + recv_offset * true_extent;
+    result_ptr = (buf0_was_inout ? tmp_buf0 : tmp_buf1) + recv_offset * true_extent;
     mpi_errno = Datatype::copy(result_ptr, size, datatype,
                                recvbuf, size, datatype);
     smpi_free_tmp_buffer(tmp_buf0_save);
@@ -262,13 +264,11 @@ int Coll_reduce_scatter_mpich_noncomm::reduce_scatter(void *sendbuf, void *recvb
 
 
 
-int Coll_reduce_scatter_mpich_rdb::reduce_scatter(void *sendbuf, void *recvbuf, int recvcounts[],
+int reduce_scatter__mpich_rdb(const void *sendbuf, void *recvbuf, const int recvcounts[],
                               MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
 {
     int   rank, comm_size, i;
     MPI_Aint extent, true_extent, true_lb;
-    int  *disps;
-    void *tmp_recvbuf, *tmp_results;
     int mpi_errno = MPI_SUCCESS;
     int dis[2], blklens[2], total_count, dst;
     int mask, dst_tree_root, my_tree_root, j, k;
@@ -285,7 +285,7 @@ int Coll_reduce_scatter_mpich_rdb::reduce_scatter(void *sendbuf, void *recvbuf,
         is_commutative = 1;
     }
 
-    disps = (int*)xbt_malloc( comm_size * sizeof(int));
+    int* disps = new int[comm_size];
 
     total_count = 0;
     for (i=0; i<comm_size; i++) {
@@ -296,196 +296,182 @@ int Coll_reduce_scatter_mpich_rdb::reduce_scatter(void *sendbuf, void *recvbuf,
             /* noncommutative and (non-pof2 or block irregular), use recursive doubling. */
 
             /* need to allocate temporary buffer to receive incoming data*/
-            tmp_recvbuf= (void*)smpi_get_tmp_recvbuffer(total_count * std::max(true_extent, extent));
-            /* adjust for potential negative lower bound in datatype */
-            tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
-
-            /* need to allocate another temporary buffer to accumulate
-               results */
-            tmp_results = (void*)smpi_get_tmp_sendbuffer(total_count * std::max(true_extent, extent));
-            /* adjust for potential negative lower bound in datatype */
-            tmp_results = (void *)((char*)tmp_results - true_lb);
-
-            /* copy sendbuf into tmp_results */
-            if (sendbuf != MPI_IN_PLACE)
-                mpi_errno = Datatype::copy(sendbuf, total_count, datatype,
-                                           tmp_results, total_count, datatype);
-            else
-                mpi_errno = Datatype::copy(recvbuf, total_count, datatype,
-                                           tmp_results, total_count, datatype);
-
-            if (mpi_errno) return(mpi_errno);
-
-            mask = 0x1;
-            i = 0;
-            while (mask < comm_size) {
-                dst = rank ^ mask;
-
-                dst_tree_root = dst >> i;
-                dst_tree_root <<= i;
-
-                my_tree_root = rank >> i;
-                my_tree_root <<= i;
-
-                /* At step 1, processes exchange (n-n/p) amount of
-                   data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
-                   amount of data, and so forth. We use derived datatypes for this.
-
-                   At each step, a process does not need to send data
-                   indexed from my_tree_root to
-                   my_tree_root+mask-1. Similarly, a process won't receive
-                   data indexed from dst_tree_root to dst_tree_root+mask-1. */
-
-                /* calculate sendtype */
-                blklens[0] = blklens[1] = 0;
-                for (j=0; j<my_tree_root; j++)
-                    blklens[0] += recvcounts[j];
-                for (j=my_tree_root+mask; j<comm_size; j++)
-                    blklens[1] += recvcounts[j];
-
-                dis[0] = 0;
-                dis[1] = blklens[0];
-                for (j=my_tree_root; (j<my_tree_root+mask) && (j<comm_size); j++)
-                    dis[1] += recvcounts[j];
-
-                mpi_errno = Datatype::create_indexed(2, blklens, dis, datatype, &sendtype);
-                if (mpi_errno) return(mpi_errno);
-
-                sendtype->commit();
-
-                /* calculate recvtype */
-                blklens[0] = blklens[1] = 0;
-                for (j=0; j<dst_tree_root && j<comm_size; j++)
-                    blklens[0] += recvcounts[j];
-                for (j=dst_tree_root+mask; j<comm_size; j++)
-                    blklens[1] += recvcounts[j];
-
-                dis[0] = 0;
-                dis[1] = blklens[0];
-                for (j=dst_tree_root; (j<dst_tree_root+mask) && (j<comm_size); j++)
-                    dis[1] += recvcounts[j];
-
-                mpi_errno = Datatype::create_indexed(2, blklens, dis, datatype, &recvtype);
-                if (mpi_errno) return(mpi_errno);
-
-                recvtype->commit();
-
-                received = 0;
-                if (dst < comm_size) {
-                    /* tmp_results contains data to be sent in each step. Data is
-                       received in tmp_recvbuf and then accumulated into
-                       tmp_results. accumulation is done later below.   */
-
-                    Request::sendrecv(tmp_results, 1, sendtype, dst,
-                                                 COLL_TAG_SCATTER,
-                                                 tmp_recvbuf, 1, recvtype, dst,
-                                                 COLL_TAG_SCATTER, comm,
-                                                 MPI_STATUS_IGNORE);
-                    received = 1;
-                }
-
-                /* if some processes in this process's subtree in this step
-                   did not have any destination process to communicate with
-                   because of non-power-of-two, we need to send them the
-                   result. We use a logarithmic recursive-halfing algorithm
-                   for this. */
-
-                if (dst_tree_root + mask > comm_size) {
-                    nprocs_completed = comm_size - my_tree_root - mask;
-                    /* nprocs_completed is the number of processes in this
-                       subtree that have all the data. Send data to others
-                       in a tree fashion. First find root of current tree
-                       that is being divided into two. k is the number of
-                       least-significant bits in this process's rank that
-                       must be zeroed out to find the rank of the root */
-                    j = mask;
-                    k = 0;
-                    while (j) {
-                        j >>= 1;
-                        k++;
-                    }
-                    k--;
-
-                    tmp_mask = mask >> 1;
-                    while (tmp_mask) {
-                        dst = rank ^ tmp_mask;
-
-                        tree_root = rank >> k;
-                        tree_root <<= k;
-
-                        /* send only if this proc has data and destination
-                           doesn't have data. at any step, multiple processes
-                           can send if they have the data */
-                        if ((dst > rank) &&
-                            (rank < tree_root + nprocs_completed)
-                            && (dst >= tree_root + nprocs_completed)) {
-                            /* send the current result */
-                            Request::send(tmp_recvbuf, 1, recvtype,
-                                                     dst, COLL_TAG_SCATTER,
-                                                     comm);
-                        }
-                        /* recv only if this proc. doesn't have data and sender
-                           has data */
-                        else if ((dst < rank) &&
-                                 (dst < tree_root + nprocs_completed) &&
-                                 (rank >= tree_root + nprocs_completed)) {
-                            Request::recv(tmp_recvbuf, 1, recvtype, dst,
-                                                     COLL_TAG_SCATTER,
-                                                     comm, MPI_STATUS_IGNORE);
-                            received = 1;
-                        }
-                        tmp_mask >>= 1;
-                        k--;
-                    }
-                }
+    unsigned char* tmp_recvbuf = smpi_get_tmp_recvbuffer(total_count * std::max(true_extent, extent));
+    /* adjust for potential negative lower bound in datatype */
+    tmp_recvbuf = tmp_recvbuf - true_lb;
 
-                /* The following reduction is done here instead of after
-                   the MPIC_Sendrecv_ft or MPIC_Recv_ft above. This is
-                   because to do it above, in the noncommutative
-                   case, we would need an extra temp buffer so as not to
-                   overwrite temp_recvbuf, because temp_recvbuf may have
-                   to be communicated to other processes in the
-                   non-power-of-two case. To avoid that extra allocation,
-                   we do the reduce here. */
-                if (received) {
-                    if (is_commutative || (dst_tree_root < my_tree_root)) {
-                        {
-                          if (op != MPI_OP_NULL)
-                            op->apply(tmp_recvbuf, tmp_results, &blklens[0], datatype);
-                          if (op != MPI_OP_NULL)
-                            op->apply(((char*)tmp_recvbuf + dis[1] * extent), ((char*)tmp_results + dis[1] * extent),
-                                      &blklens[1], datatype);
-                        }
-                    }
-                    else {
-                        {
-                          if (op != MPI_OP_NULL)
-                            op->apply(tmp_results, tmp_recvbuf, &blklens[0], datatype);
-                          if (op != MPI_OP_NULL)
-                            op->apply(((char*)tmp_results + dis[1] * extent), ((char*)tmp_recvbuf + dis[1] * extent),
-                                      &blklens[1], datatype);
-                        }
-                        /* copy result back into tmp_results */
-                        mpi_errno = Datatype::copy(tmp_recvbuf, 1, recvtype,
-                                                   tmp_results, 1, recvtype);
-                        if (mpi_errno) return(mpi_errno);
-                    }
-                }
+    /* need to allocate another temporary buffer to accumulate
+       results */
+    unsigned char* tmp_results = smpi_get_tmp_sendbuffer(total_count * std::max(true_extent, extent));
+    /* adjust for potential negative lower bound in datatype */
+    tmp_results = tmp_results - true_lb;
+
+    /* copy sendbuf into tmp_results */
+    if (sendbuf != MPI_IN_PLACE)
+      mpi_errno = Datatype::copy(sendbuf, total_count, datatype, tmp_results, total_count, datatype);
+    else
+      mpi_errno = Datatype::copy(recvbuf, total_count, datatype, tmp_results, total_count, datatype);
+
+    if (mpi_errno)
+      return (mpi_errno);
+
+    mask = 0x1;
+    i    = 0;
+    while (mask < comm_size) {
+      dst = rank ^ mask;
+
+      dst_tree_root = dst >> i;
+      dst_tree_root <<= i;
+
+      my_tree_root = rank >> i;
+      my_tree_root <<= i;
+
+      /* At step 1, processes exchange (n-n/p) amount of
+         data; at step 2, (n-2n/p) amount of data; at step 3, (n-4n/p)
+         amount of data, and so forth. We use derived datatypes for this.
+
+         At each step, a process does not need to send data
+         indexed from my_tree_root to
+         my_tree_root+mask-1. Similarly, a process won't receive
+         data indexed from dst_tree_root to dst_tree_root+mask-1. */
+
+      /* calculate sendtype */
+      blklens[0] = blklens[1] = 0;
+      for (j = 0; j < my_tree_root; j++)
+        blklens[0] += recvcounts[j];
+      for (j = my_tree_root + mask; j < comm_size; j++)
+        blklens[1] += recvcounts[j];
+
+      dis[0] = 0;
+      dis[1] = blklens[0];
+      for (j = my_tree_root; (j < my_tree_root + mask) && (j < comm_size); j++)
+        dis[1] += recvcounts[j];
+
+      mpi_errno = Datatype::create_indexed(2, blklens, dis, datatype, &sendtype);
+      if (mpi_errno)
+        return (mpi_errno);
+
+      sendtype->commit();
+
+      /* calculate recvtype */
+      blklens[0] = blklens[1] = 0;
+      for (j = 0; j < dst_tree_root && j < comm_size; j++)
+        blklens[0] += recvcounts[j];
+      for (j = dst_tree_root + mask; j < comm_size; j++)
+        blklens[1] += recvcounts[j];
+
+      dis[0] = 0;
+      dis[1] = blklens[0];
+      for (j = dst_tree_root; (j < dst_tree_root + mask) && (j < comm_size); j++)
+        dis[1] += recvcounts[j];
+
+      mpi_errno = Datatype::create_indexed(2, blklens, dis, datatype, &recvtype);
+      if (mpi_errno)
+        return (mpi_errno);
+
+      recvtype->commit();
+
+      received = 0;
+      if (dst < comm_size) {
+        /* tmp_results contains data to be sent in each step. Data is
+           received in tmp_recvbuf and then accumulated into
+           tmp_results. accumulation is done later below.   */
+
+        Request::sendrecv(tmp_results, 1, sendtype, dst, COLL_TAG_SCATTER, tmp_recvbuf, 1, recvtype, dst,
+                          COLL_TAG_SCATTER, comm, MPI_STATUS_IGNORE);
+        received = 1;
+      }
+
+      /* if some processes in this process's subtree in this step
+         did not have any destination process to communicate with
+         because of non-power-of-two, we need to send them the
+         result. We use a logarithmic recursive-halfing algorithm
+         for this. */
+
+      if (dst_tree_root + mask > comm_size) {
+        nprocs_completed = comm_size - my_tree_root - mask;
+        /* nprocs_completed is the number of processes in this
+           subtree that have all the data. Send data to others
+           in a tree fashion. First find root of current tree
+           that is being divided into two. k is the number of
+           least-significant bits in this process's rank that
+           must be zeroed out to find the rank of the root */
+        j = mask;
+        k = 0;
+        while (j) {
+          j >>= 1;
+          k++;
+        }
+        k--;
+
+        tmp_mask = mask >> 1;
+        while (tmp_mask) {
+          dst = rank ^ tmp_mask;
+
+          tree_root = rank >> k;
+          tree_root <<= k;
+
+          /* send only if this proc has data and destination
+             doesn't have data. at any step, multiple processes
+             can send if they have the data */
+          if ((dst > rank) && (rank < tree_root + nprocs_completed) && (dst >= tree_root + nprocs_completed)) {
+            /* send the current result */
+            Request::send(tmp_recvbuf, 1, recvtype, dst, COLL_TAG_SCATTER, comm);
+          }
+          /* recv only if this proc. doesn't have data and sender
+             has data */
+          else if ((dst < rank) && (dst < tree_root + nprocs_completed) && (rank >= tree_root + nprocs_completed)) {
+            Request::recv(tmp_recvbuf, 1, recvtype, dst, COLL_TAG_SCATTER, comm, MPI_STATUS_IGNORE);
+            received = 1;
+          }
+          tmp_mask >>= 1;
+          k--;
+        }
+      }
+
+      /* The following reduction is done here instead of after
+         the MPIC_Sendrecv_ft or MPIC_Recv_ft above. This is
+         because to do it above, in the noncommutative
+         case, we would need an extra temp buffer so as not to
+         overwrite temp_recvbuf, because temp_recvbuf may have
+         to be communicated to other processes in the
+         non-power-of-two case. To avoid that extra allocation,
+         we do the reduce here. */
+      if (received) {
+        if (is_commutative || (dst_tree_root < my_tree_root)) {
+          {
+            if (op != MPI_OP_NULL)
+              op->apply(tmp_recvbuf, tmp_results, &blklens[0], datatype);
+            if (op != MPI_OP_NULL)
+              op->apply(tmp_recvbuf + dis[1] * extent, tmp_results + dis[1] * extent, &blklens[1], datatype);
+          }
+        } else {
+          {
+            if (op != MPI_OP_NULL)
+              op->apply(tmp_results, tmp_recvbuf, &blklens[0], datatype);
+            if (op != MPI_OP_NULL)
+              op->apply(tmp_results + dis[1] * extent, tmp_recvbuf + dis[1] * extent, &blklens[1], datatype);
+          }
+          /* copy result back into tmp_results */
+          mpi_errno = Datatype::copy(tmp_recvbuf, 1, recvtype, tmp_results, 1, recvtype);
+          if (mpi_errno)
+            return (mpi_errno);
+        }
+      }
 
-                Datatype::unref(sendtype);
-                Datatype::unref(recvtype);
+      Datatype::unref(sendtype);
+      Datatype::unref(recvtype);
 
-                mask <<= 1;
-                i++;
+      mask <<= 1;
+      i++;
             }
 
             /* now copy final results from tmp_results to recvbuf */
-            mpi_errno = Datatype::copy(((char *)tmp_results+disps[rank]*extent),
-                                       recvcounts[rank], datatype, recvbuf,
+            mpi_errno = Datatype::copy(tmp_results + disps[rank] * extent, recvcounts[rank], datatype, recvbuf,
                                        recvcounts[rank], datatype);
             if (mpi_errno) return(mpi_errno);
 
-    xbt_free(disps);
+    delete[] disps;
     smpi_free_tmp_buffer(tmp_recvbuf);
     smpi_free_tmp_buffer(tmp_results);
     return MPI_SUCCESS;