Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add Reduce SMP collective from MVAPICH2
[simgrid.git] / src / smpi / colls / reduce-scatter-gather.c
index 481079a..30d5e6e 100644 (file)
@@ -1,4 +1,10 @@
-#include "colls.h"
+/* Copyright (c) 2013-2014. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "colls_private.h"
 
 /*
   reduce
@@ -10,12 +16,12 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
                                           MPI_Op op, int root, MPI_Comm comm)
 {
   MPI_Status status;
-  int comm_size, rank, type_size, pof2, rem, newrank;
+  int comm_size, rank, pof2, rem, newrank;
   int mask, *cnts, *disps, i, j, send_idx = 0;
   int recv_idx, last_idx = 0, newdst;
   int dst, send_cnt, recv_cnt, newroot, newdst_tree_root;
   int newroot_tree_root, new_count;
-  int tag = 4321;
+  int tag = COLL_TAG_REDUCE;
   void *send_ptr, *recv_ptr, *tmp_buf;
 
   cnts = NULL;
@@ -25,12 +31,17 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
 
   if (count == 0)
     return 0;
-  MPI_Comm_rank(comm, &rank);
-  MPI_Comm_size(comm, &comm_size);
+  rank = smpi_comm_rank(comm);
+  comm_size = smpi_comm_size(comm);
+  
 
-  MPI_Type_extent(datatype, &extent);
-  MPI_Type_size(datatype, &type_size);
 
+  extent = smpi_datatype_get_extent(datatype);
+  /* If I'm not the root, then my recvbuf may not be valid, therefore
+  I have to allocate a temporary one */
+  if (rank != root && !recvbuf) {
+    recvbuf = (void *)xbt_malloc(count * extent);
+  }
   /* find nearest power-of-two less than or equal to comm_size */
   pof2 = 1;
   while (pof2 <= comm_size)
@@ -39,31 +50,31 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
 
   if (count < comm_size) {
     new_count = comm_size;
-    send_ptr = (void *) malloc(new_count * extent);
-    recv_ptr = (void *) malloc(new_count * extent);
-    tmp_buf = (void *) malloc(new_count * extent);
-    memcpy(send_ptr, sendbuf, extent * new_count);
+    send_ptr = (void *) xbt_malloc(new_count * extent);
+    recv_ptr = (void *) xbt_malloc(new_count * extent);
+    tmp_buf = (void *) xbt_malloc(new_count * extent);
+    memcpy(send_ptr, sendbuf, extent * count);
 
     //if ((rank != root))
-    MPI_Sendrecv(send_ptr, new_count, datatype, rank, tag,
+    smpi_mpi_sendrecv(send_ptr, new_count, datatype, rank, tag,
                  recv_ptr, new_count, datatype, rank, tag, comm, &status);
 
     rem = comm_size - pof2;
     if (rank < 2 * rem) {
       if (rank % 2 != 0) {
         /* odd */
-        MPI_Send(recv_ptr, new_count, datatype, rank - 1, tag, comm);
+        smpi_mpi_send(recv_ptr, new_count, datatype, rank - 1, tag, comm);
         newrank = -1;
       } else {
-        MPI_Recv(tmp_buf, count, datatype, rank + 1, tag, comm, &status);
-        star_reduction(op, tmp_buf, recv_ptr, &new_count, &datatype);
+        smpi_mpi_recv(tmp_buf, count, datatype, rank + 1, tag, comm, &status);
+        smpi_op_apply(op, tmp_buf, recv_ptr, &new_count, &datatype);
         newrank = rank / 2;
       }
     } else                      /* rank >= 2*rem */
       newrank = rank - rem;
 
-    cnts = (int *) malloc(pof2 * sizeof(int));
-    disps = (int *) malloc(pof2 * sizeof(int));
+    cnts = (int *) xbt_malloc(pof2 * sizeof(int));
+    disps = (int *) xbt_malloc(pof2 * sizeof(int));
 
     if (newrank != -1) {
       for (i = 0; i < (pof2 - 1); i++)
@@ -98,7 +109,7 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
         }
 
         /* Send data from recvbuf. Recv into tmp_buf */
-        MPI_Sendrecv((char *) recv_ptr +
+        smpi_mpi_sendrecv((char *) recv_ptr +
                      disps[send_idx] * extent,
                      send_cnt, datatype,
                      dst, tag,
@@ -109,7 +120,7 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
         /* tmp_buf contains data received in this step.
            recvbuf contains data accumulated so far */
 
-        star_reduction(op, (char *) tmp_buf + disps[recv_idx] * extent,
+        smpi_op_apply(op, (char *) tmp_buf + disps[recv_idx] * extent,
                        (char *) recv_ptr + disps[recv_idx] * extent,
                        &recv_cnt, &datatype);
 
@@ -136,13 +147,13 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
           for (i = 1; i < pof2; i++)
             disps[i] = disps[i - 1] + cnts[i - 1];
 
-          MPI_Recv(recv_ptr, cnts[0], datatype, 0, tag, comm, &status);
+          smpi_mpi_recv(recv_ptr, cnts[0], datatype, 0, tag, comm, &status);
 
           newrank = 0;
           send_idx = 0;
           last_idx = 2;
         } else if (newrank == 0) {
-          MPI_Send(recv_ptr, cnts[0], datatype, root, tag, comm);
+          smpi_mpi_send(recv_ptr, cnts[0], datatype, root, tag, comm);
           newrank = -1;
         }
         newroot = 0;
@@ -194,12 +205,12 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
         }
 
         if (newdst_tree_root == newroot_tree_root) {
-          MPI_Send((char *) recv_ptr +
+          smpi_mpi_send((char *) recv_ptr +
                    disps[send_idx] * extent,
                    send_cnt, datatype, dst, tag, comm);
           break;
         } else {
-          MPI_Recv((char *) recv_ptr +
+          smpi_mpi_recv((char *) recv_ptr +
                    disps[recv_idx] * extent,
                    recv_cnt, datatype, dst, tag, comm, &status);
         }
@@ -217,30 +228,30 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
   }
 
 
-  else if (count >= comm_size) {
-    tmp_buf = (void *) malloc(count * extent);
+  else /* (count >= comm_size) */ {
+    tmp_buf = (void *) xbt_malloc(count * extent);
 
     //if ((rank != root))
-    MPI_Sendrecv(sendbuf, count, datatype, rank, tag,
+    smpi_mpi_sendrecv(sendbuf, count, datatype, rank, tag,
                  recvbuf, count, datatype, rank, tag, comm, &status);
 
     rem = comm_size - pof2;
     if (rank < 2 * rem) {
       if (rank % 2 != 0) {      /* odd */
-        MPI_Send(recvbuf, count, datatype, rank - 1, tag, comm);
+        smpi_mpi_send(recvbuf, count, datatype, rank - 1, tag, comm);
         newrank = -1;
       }
 
       else {
-        MPI_Recv(tmp_buf, count, datatype, rank + 1, tag, comm, &status);
-        star_reduction(op, tmp_buf, recvbuf, &count, &datatype);
+        smpi_mpi_recv(tmp_buf, count, datatype, rank + 1, tag, comm, &status);
+        smpi_op_apply(op, tmp_buf, recvbuf, &count, &datatype);
         newrank = rank / 2;
       }
     } else                      /* rank >= 2*rem */
       newrank = rank - rem;
 
-    cnts = (int *) malloc(pof2 * sizeof(int));
-    disps = (int *) malloc(pof2 * sizeof(int));
+    cnts = (int *) xbt_malloc(pof2 * sizeof(int));
+    disps = (int *) xbt_malloc(pof2 * sizeof(int));
 
     if (newrank != -1) {
       for (i = 0; i < (pof2 - 1); i++)
@@ -275,7 +286,7 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
         }
 
         /* Send data from recvbuf. Recv into tmp_buf */
-        MPI_Sendrecv((char *) recvbuf +
+        smpi_mpi_sendrecv((char *) recvbuf +
                      disps[send_idx] * extent,
                      send_cnt, datatype,
                      dst, tag,
@@ -286,7 +297,7 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
         /* tmp_buf contains data received in this step.
            recvbuf contains data accumulated so far */
 
-        star_reduction(op, (char *) tmp_buf + disps[recv_idx] * extent,
+        smpi_op_apply(op, (char *) tmp_buf + disps[recv_idx] * extent,
                        (char *) recvbuf + disps[recv_idx] * extent,
                        &recv_cnt, &datatype);
 
@@ -312,13 +323,13 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
           for (i = 1; i < pof2; i++)
             disps[i] = disps[i - 1] + cnts[i - 1];
 
-          MPI_Recv(recvbuf, cnts[0], datatype, 0, tag, comm, &status);
+          smpi_mpi_recv(recvbuf, cnts[0], datatype, 0, tag, comm, &status);
 
           newrank = 0;
           send_idx = 0;
           last_idx = 2;
         } else if (newrank == 0) {
-          MPI_Send(recvbuf, cnts[0], datatype, root, tag, comm);
+          smpi_mpi_send(recvbuf, cnts[0], datatype, root, tag, comm);
           newrank = -1;
         }
         newroot = 0;
@@ -370,12 +381,12 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
         }
 
         if (newdst_tree_root == newroot_tree_root) {
-          MPI_Send((char *) recvbuf +
+          smpi_mpi_send((char *) recvbuf +
                    disps[send_idx] * extent,
                    send_cnt, datatype, dst, tag, comm);
           break;
         } else {
-          MPI_Recv((char *) recvbuf +
+          smpi_mpi_recv((char *) recvbuf +
                    disps[recv_idx] * extent,
                    recv_cnt, datatype, dst, tag, comm, &status);
         }
@@ -388,6 +399,8 @@ int smpi_coll_tuned_reduce_scatter_gather(void *sendbuf, void *recvbuf,
       }
     }
   }
+  if (tmp_buf)
+    free(tmp_buf);
   if (cnts)
     free(cnts);
   if (disps)