Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
memleaks --
[simgrid.git] / src / smpi / colls / allreduce-rab-rsag.c
index 5af0511..c387a22 100644 (file)
@@ -1,30 +1,24 @@
-#include "colls.h"
+#include "colls_private.h"
 //#include <star-reduction.c>
 
 int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
                                        MPI_Datatype dtype, MPI_Op op,
                                        MPI_Comm comm)
 {
-  int nprocs, rank, type_size, tag = 543;
+  int nprocs, rank, tag = COLL_TAG_ALLREDUCE;
   int mask, dst, pof2, newrank, rem, newdst, i,
       send_idx, recv_idx, last_idx, send_cnt, recv_cnt, *cnts, *disps;
   MPI_Aint extent;
   MPI_Status status;
   void *tmp_buf = NULL;
-  MPI_Comm_size(comm, &nprocs);
-  MPI_Comm_rank(comm, &rank);
-
-  MPI_Type_extent(dtype, &extent);
-  tmp_buf = (void *) malloc(count * extent);
-  if (!tmp_buf) {
-    printf("Could not allocate memory for tmp_buf\n");
-    return 1;
-  }
+  nprocs = smpi_comm_size(comm);
+  rank = smpi_comm_rank(comm);
 
-  MPI_Sendrecv(sbuff, count, dtype, rank, tag, rbuff, count, dtype, rank, tag,
-               comm, &status);
+  extent = smpi_datatype_get_extent(dtype);
+  tmp_buf = (void *) xbt_malloc(count * extent);
 
-  MPI_Type_size(dtype, &type_size);
+  smpi_mpi_sendrecv(sbuff, count, dtype, rank, tag, rbuff, count, dtype, rank, tag,
+               comm, &status);
 
   // find nearest power-of-two less than or equal to comm_size
   pof2 = 1;
@@ -44,7 +38,7 @@ int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
     // even       
     if (rank % 2 == 0) {
 
-      MPI_Send(rbuff, count, dtype, rank + 1, tag, comm);
+      smpi_mpi_send(rbuff, count, dtype, rank + 1, tag, comm);
 
       // temporarily set the rank to -1 so that this
       // process does not pariticipate in recursive
@@ -52,11 +46,11 @@ int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
       newrank = -1;
     } else                      // odd
     {
-      MPI_Recv(tmp_buf, count, dtype, rank - 1, tag, comm, &status);
+      smpi_mpi_recv(tmp_buf, count, dtype, rank - 1, tag, comm, &status);
       // do the reduction on received data. since the
       // ordering is right, it doesn't matter whether
       // the operation is commutative or not.
-      star_reduction(op, tmp_buf, rbuff, &count, &dtype);
+      smpi_op_apply(op, tmp_buf, rbuff, &count, &dtype);
 
       // change the rank 
       newrank = rank / 2;
@@ -80,8 +74,8 @@ int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
     // reduce-scatter, calculate the count that each process receives
     // and the displacement within the buffer 
 
-    cnts = (int *) malloc(pof2 * sizeof(int));
-    disps = (int *) malloc(pof2 * sizeof(int));
+    cnts = (int *) xbt_malloc(pof2 * sizeof(int));
+    disps = (int *) xbt_malloc(pof2 * sizeof(int));
 
     for (i = 0; i < (pof2 - 1); i++)
       cnts[i] = count / pof2;
@@ -115,7 +109,7 @@ int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
       }
 
       // Send data from recvbuf. Recv into tmp_buf 
-      MPI_Sendrecv((char *) rbuff + disps[send_idx] * extent, send_cnt,
+      smpi_mpi_sendrecv((char *) rbuff + disps[send_idx] * extent, send_cnt,
                    dtype, dst, tag,
                    (char *) tmp_buf + disps[recv_idx] * extent, recv_cnt,
                    dtype, dst, tag, comm, &status);
@@ -125,7 +119,7 @@ int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
 
       // This algorithm is used only for predefined ops
       // and predefined ops are always commutative.
-      star_reduction(op, (char *) tmp_buf + disps[recv_idx] * extent,
+      smpi_op_apply(op, (char *) tmp_buf + disps[recv_idx] * extent,
                      (char *) rbuff + disps[recv_idx] * extent,
                      &recv_cnt, &dtype);
 
@@ -166,7 +160,7 @@ int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
           recv_cnt += cnts[i];
       }
 
-      MPI_Sendrecv((char *) rbuff + disps[send_idx] * extent, send_cnt,
+      smpi_mpi_sendrecv((char *) rbuff + disps[send_idx] * extent, send_cnt,
                    dtype, dst, tag,
                    (char *) rbuff + disps[recv_idx] * extent, recv_cnt,
                    dtype, dst, tag, comm, &status);
@@ -187,11 +181,11 @@ int smpi_coll_tuned_allreduce_rab_rsag(void *sbuff, void *rbuff, int count,
 
   if (rank < 2 * rem) {
     if (rank % 2)               // odd 
-      MPI_Send(rbuff, count, dtype, rank - 1, tag, comm);
+      smpi_mpi_send(rbuff, count, dtype, rank - 1, tag, comm);
     else                        // even 
-      MPI_Recv(rbuff, count, dtype, rank + 1, tag, comm, &status);
+      smpi_mpi_recv(rbuff, count, dtype, rank + 1, tag, comm, &status);
   }
 
   free(tmp_buf);
-  return 0;
+  return MPI_SUCCESS;
 }