Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
leaks --
[simgrid.git] / src / smpi / colls / reduce_scatter-mpich.c
index 77eaef5..11adf04 100644 (file)
@@ -1,5 +1,10 @@
+/* Copyright (c) 2013-2014. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
 #include "colls_private.h"
-#define MPIR_REDUCE_SCATTER_TAG 222
 
 static inline int MPIU_Mirror_permutation(unsigned int x, int bits)
 {
@@ -25,7 +30,7 @@ int smpi_coll_tuned_reduce_scatter_mpich_pair(void *sendbuf, void *recvbuf, int
     int  *disps;
     void *tmp_recvbuf;
     int mpi_errno = MPI_SUCCESS;
-    int type_size, total_count, nbytes, dst, src;
+    int total_count, dst, src;
     int is_commutative;
     comm_size = smpi_comm_size(comm);
     rank = smpi_comm_rank(comm);
@@ -46,13 +51,10 @@ int smpi_coll_tuned_reduce_scatter_mpich_pair(void *sendbuf, void *recvbuf, int
     }
     
     if (total_count == 0) {
+        xbt_free(disps);
         return MPI_ERR_COUNT;
     }
 
-    type_size= smpi_datatype_size(datatype);
-    nbytes = total_count * type_size;
-    
-
         if (sendbuf != MPI_IN_PLACE) {
             /* copy local data into recvbuf */
             smpi_datatype_copy(((char *)sendbuf+disps[rank]*extent),
@@ -61,7 +63,7 @@ int smpi_coll_tuned_reduce_scatter_mpich_pair(void *sendbuf, void *recvbuf, int
         }
         
         /* allocate temporary buffer to store incoming data */
-        tmp_recvbuf = (void*)xbt_malloc(recvcounts[rank]*(max(true_extent,extent))+1);
+        tmp_recvbuf = (void*)smpi_get_tmp_recvbuffer(recvcounts[rank]*(MAX(true_extent,extent))+1);
         /* adjust for potential negative lower bound in datatype */
         tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
         
@@ -74,16 +76,16 @@ int smpi_coll_tuned_reduce_scatter_mpich_pair(void *sendbuf, void *recvbuf, int
             if (sendbuf != MPI_IN_PLACE) 
                 smpi_mpi_sendrecv(((char *)sendbuf+disps[dst]*extent), 
                                              recvcounts[dst], datatype, dst,
-                                             MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
+                                             COLL_TAG_SCATTER, tmp_recvbuf,
                                              recvcounts[rank], datatype, src,
-                                             MPIR_REDUCE_SCATTER_TAG, comm,
+                                             COLL_TAG_SCATTER, comm,
                                              MPI_STATUS_IGNORE);
             else
                 smpi_mpi_sendrecv(((char *)recvbuf+disps[dst]*extent), 
                                              recvcounts[dst], datatype, dst,
-                                             MPIR_REDUCE_SCATTER_TAG, tmp_recvbuf,
+                                             COLL_TAG_SCATTER, tmp_recvbuf,
                                              recvcounts[rank], datatype, src,
-                                             MPIR_REDUCE_SCATTER_TAG, comm,
+                                             COLL_TAG_SCATTER, comm,
                                              MPI_STATUS_IGNORE);
             
             if (is_commutative || (src < rank)) {
@@ -139,7 +141,10 @@ int smpi_coll_tuned_reduce_scatter_mpich_pair(void *sendbuf, void *recvbuf, int
             if (mpi_errno) return(mpi_errno);
         }
     
-return MPI_SUCCESS;
+        xbt_free(disps);
+        smpi_free_tmp_buffer(tmp_recvbuf);
+
+        return MPI_SUCCESS;
 }
     
 
@@ -181,8 +186,11 @@ int smpi_coll_tuned_reduce_scatter_mpich_noncomm(void *sendbuf, void *recvbuf, i
     block_size = recvcounts[0];
     total_count = block_size * comm_size;
 
-    tmp_buf0=( void *)xbt_malloc( true_extent * total_count);
-    tmp_buf1=( void *)xbt_malloc( true_extent * total_count);
+    tmp_buf0=( void *)smpi_get_tmp_sendbuffer( true_extent * total_count);
+    tmp_buf1=( void *)smpi_get_tmp_recvbuffer( true_extent * total_count);
+    void *tmp_buf0_save=tmp_buf0;
+    void *tmp_buf1_save=tmp_buf1;
+
     /* adjust for potential negative lower bound in datatype */
     tmp_buf0 = (void *)((char*)tmp_buf0 - true_lb);
     tmp_buf1 = (void *)((char*)tmp_buf1 - true_lb);
@@ -216,9 +224,9 @@ int smpi_coll_tuned_reduce_scatter_mpich_noncomm(void *sendbuf, void *recvbuf, i
         }
 
         smpi_mpi_sendrecv(outgoing_data + send_offset*true_extent,
-                                     size, datatype, peer, MPIR_REDUCE_SCATTER_TAG,
+                                     size, datatype, peer, COLL_TAG_SCATTER,
                                      incoming_data + recv_offset*true_extent,
-                                     size, datatype, peer, MPIR_REDUCE_SCATTER_TAG,
+                                     size, datatype, peer, COLL_TAG_SCATTER,
                                      comm, MPI_STATUS_IGNORE);
         /* always perform the reduction at recv_offset, the data at send_offset
            is now our peer's responsibility */
@@ -228,7 +236,7 @@ int smpi_coll_tuned_reduce_scatter_mpich_noncomm(void *sendbuf, void *recvbuf, i
                    incoming_data + recv_offset*true_extent,
                      outgoing_data + recv_offset*true_extent,
                      &size, &datatype );
-            buf0_was_inout = buf0_was_inout;
+            /* buf0_was_inout = buf0_was_inout; */
         }
         else {
             /* lower ranked value so need to call op(my_data, received_data) */
@@ -250,6 +258,8 @@ int smpi_coll_tuned_reduce_scatter_mpich_noncomm(void *sendbuf, void *recvbuf, i
     result_ptr = (char *)(buf0_was_inout ? tmp_buf0 : tmp_buf1) + recv_offset * true_extent;
     mpi_errno = smpi_datatype_copy(result_ptr, size, datatype,
                                recvbuf, size, datatype);
+    smpi_free_tmp_buffer(tmp_buf0_save);
+    smpi_free_tmp_buffer(tmp_buf1_save);
     if (mpi_errno) return(mpi_errno);
     return MPI_SUCCESS;
 }
@@ -264,11 +274,11 @@ int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int r
     int  *disps;
     void *tmp_recvbuf, *tmp_results;
     int mpi_errno = MPI_SUCCESS;
-    int type_size, dis[2], blklens[2], total_count, nbytes, dst;
+    int dis[2], blklens[2], total_count, dst;
     int mask, dst_tree_root, my_tree_root, j, k;
     int received;
     MPI_Datatype sendtype, recvtype;
-    int nprocs_completed, tmp_mask, tree_root, is_commutative;
+    int nprocs_completed, tmp_mask, tree_root, is_commutative=0;
     comm_size = smpi_comm_size(comm);
     rank = smpi_comm_rank(comm);
 
@@ -287,20 +297,16 @@ int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int r
         total_count += recvcounts[i];
     }
     
-    type_size= smpi_datatype_size(datatype);
-    nbytes = total_count * type_size;
-    
-
             /* noncommutative and (non-pof2 or block irregular), use recursive doubling. */
 
             /* need to allocate temporary buffer to receive incoming data*/
-            tmp_recvbuf= (void *) xbt_malloc( total_count*(max(true_extent,extent)));
+            tmp_recvbuf= (void *) smpi_get_tmp_recvbuffer( total_count*(MAX(true_extent,extent)));
             /* adjust for potential negative lower bound in datatype */
             tmp_recvbuf = (void *)((char*)tmp_recvbuf - true_lb);
 
             /* need to allocate another temporary buffer to accumulate
                results */
-            tmp_results = (void *)xbt_malloc( total_count*(max(true_extent,extent)));
+            tmp_results = (void *)smpi_get_tmp_sendbuffer( total_count*(MAX(true_extent,extent)));
             /* adjust for potential negative lower bound in datatype */
             tmp_results = (void *)((char*)tmp_results - true_lb);
 
@@ -375,9 +381,9 @@ int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int r
                        tmp_results. accumulation is done later below.   */ 
 
                     smpi_mpi_sendrecv(tmp_results, 1, sendtype, dst,
-                                                 MPIR_REDUCE_SCATTER_TAG, 
+                                                 COLL_TAG_SCATTER,
                                                  tmp_recvbuf, 1, recvtype, dst,
-                                                 MPIR_REDUCE_SCATTER_TAG, comm,
+                                                 COLL_TAG_SCATTER, comm,
                                                  MPI_STATUS_IGNORE);
                     received = 1;
                 }
@@ -419,7 +425,7 @@ int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int r
                             && (dst >= tree_root + nprocs_completed)) {
                             /* send the current result */
                             smpi_mpi_send(tmp_recvbuf, 1, recvtype,
-                                                     dst, MPIR_REDUCE_SCATTER_TAG,
+                                                     dst, COLL_TAG_SCATTER,
                                                      comm);
                         }
                         /* recv only if this proc. doesn't have data and sender
@@ -428,7 +434,7 @@ int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int r
                                  (dst < tree_root + nprocs_completed) &&
                                  (rank >= tree_root + nprocs_completed)) {
                             smpi_mpi_recv(tmp_recvbuf, 1, recvtype, dst,
-                                                     MPIR_REDUCE_SCATTER_TAG,
+                                                     COLL_TAG_SCATTER,
                                                      comm, MPI_STATUS_IGNORE); 
                             received = 1;
                         }
@@ -474,8 +480,8 @@ int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int r
                     }
                 }
 
-                //smpi_datatype_free(&sendtype);
-                //smpi_datatype_free(&recvtype);
+                smpi_datatype_unuse(sendtype);
+                smpi_datatype_unuse(recvtype);
 
                 mask <<= 1;
                 i++;
@@ -487,6 +493,9 @@ int smpi_coll_tuned_reduce_scatter_mpich_rdb(void *sendbuf, void *recvbuf, int r
                                        recvcounts[rank], datatype);
             if (mpi_errno) return(mpi_errno);
 
+    xbt_free(disps);
+    smpi_free_tmp_buffer(tmp_recvbuf);
+    smpi_free_tmp_buffer(tmp_results);
     return MPI_SUCCESS;
         }