Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix reduce_scatter ompi
[simgrid.git] / src / smpi / colls / reduce_scatter-ompi.c
index 90978b8..e188c00 100644 (file)
@@ -1,4 +1,9 @@
-/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
+/* Copyright (c) 2013-2014. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
 /*
  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
  *                         University Research and Technology
  *                         All rights reserved.
  * Copyright (c) 2008      Sun Microsystems, Inc.  All rights reserved.
  * Copyright (c) 2009      University of Houston. All rights reserved.
- * $COPYRIGHT$
  *
  * Additional copyrights may follow
- *
- * $HEADER$
  */
 
 #include "colls_private.h"
 #include "coll_tuned_topo.h"
+#include "xbt/replay.h"
 
-#define MCA_COLL_BASE_TAG_REDUCE_SCATTER 222
 /*
  * Recursive-halving function is (*mostly*) copied from the BASIC coll module.
  * I have removed the part which handles "large" message sizes 
@@ -60,6 +62,8 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
     size = smpi_comm_size(comm);
    
     XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
+    if(!smpi_op_is_commute(op))
+      THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
 
     /* Find displacements and the like */
     disps = (int*) xbt_malloc(sizeof(int) * size);
@@ -88,7 +92,13 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
     }
 
     /* Allocate temporary receive buffer. */
-    recv_buf_free = (char*) xbt_malloc(buf_size);
+#ifndef WIN32
+    if(_xbt_replay_is_active()){
+      recv_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
+    }else
+#endif
+      recv_buf_free = (char*) xbt_malloc(buf_size);
+
     recv_buf = recv_buf_free - lb;
     if (NULL == recv_buf_free) {
         err = MPI_ERR_OTHER;
@@ -96,7 +106,13 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
     }
    
     /* allocate temporary buffer for results */
-    result_buf_free = (char*) xbt_malloc(buf_size);
+#ifndef WIN32
+    if(_xbt_replay_is_active()){
+      result_buf_free = (char*) SMPI_SHARED_MALLOC(buf_size);
+    }else
+#endif
+      result_buf_free = (char*) xbt_malloc(buf_size);
+
     result_buf = result_buf_free - lb;
    
     /* copy local buffer into the temporary results */
@@ -116,13 +132,13 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
     if (rank < 2 * remain) {
         if ((rank & 1) == 0) {
             smpi_mpi_send(result_buf, count, dtype, rank + 1, 
-                                    MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                                    COLL_TAG_REDUCE_SCATTER,
                                     comm);
             /* we don't participate from here on out */
             tmp_rank = -1;
         } else {
             smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
-                                    MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                                    COLL_TAG_REDUCE_SCATTER,
                                     comm, MPI_STATUS_IGNORE);
          
             /* integrate their results into our temp results */
@@ -211,7 +227,7 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
             if (send_count > 0 && recv_count != 0) {
                 request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
                                          recv_count, dtype, peer,
-                                         MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                                         COLL_TAG_REDUCE_SCATTER,
                                          comm);
                 if (MPI_SUCCESS != err) {
                     xbt_free(tmp_rcounts);
@@ -222,7 +238,7 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
             if (recv_count > 0 && send_count != 0) {
                 smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
                                         send_count, dtype, peer, 
-                                        MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                                        COLL_TAG_REDUCE_SCATTER,
                                         comm);
                 if (MPI_SUCCESS != err) {
                     xbt_free(tmp_rcounts);
@@ -271,14 +287,14 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
         if ((rank & 1) == 0) {
             if (rcounts[rank]) {
                 smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
-                                        MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                                        COLL_TAG_REDUCE_SCATTER,
                                         comm, MPI_STATUS_IGNORE);
             }
         } else {
             if (rcounts[rank - 1]) {
                 smpi_mpi_send(result_buf + disps[rank - 1] * extent,
                                         rcounts[rank - 1], dtype, rank - 1,
-                                        MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                                        COLL_TAG_REDUCE_SCATTER,
                                         comm);
             }
         }            
@@ -286,9 +302,16 @@ smpi_coll_tuned_reduce_scatter_ompi_basic_recursivehalving(void *sbuf,
 
  cleanup:
     if (NULL != disps) xbt_free(disps);
-    if (NULL != recv_buf_free) xbt_free(recv_buf_free);
-    if (NULL != result_buf_free) xbt_free(result_buf_free);
-
+    if (!_xbt_replay_is_active()){
+      if (NULL != recv_buf_free) xbt_free(recv_buf_free);
+      if (NULL != result_buf_free) xbt_free(result_buf_free);
+    }
+#ifndef WIN32
+    else{
+      if (NULL != recv_buf_free) SMPI_SHARED_FREE(recv_buf_free);
+      if (NULL != result_buf_free) SMPI_SHARED_FREE(result_buf_free);
+    }
+#endif
     return err;
 }
 
@@ -367,7 +390,6 @@ smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
     char *inbuf_free[2] = {NULL, NULL}, *inbuf[2] = {NULL, NULL};
     ptrdiff_t true_lb, true_extent, lb, extent, max_real_segsize;
     MPI_Request reqs[2] = {NULL, NULL};
-    size_t typelng;
 
     size = smpi_comm_size(comm);
     rank = smpi_comm_rank(comm);
@@ -406,7 +428,6 @@ smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
     */
     smpi_datatype_extent(dtype, &lb, &extent);
     smpi_datatype_extent(dtype, &true_lb, &true_extent);
-    typelng = smpi_datatype_size(dtype);
 
     max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
 
@@ -454,11 +475,11 @@ smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
     inbi = 0;
     /* Initialize first receive from the neighbor on the left */
     reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
-                             MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm
+                             COLL_TAG_REDUCE_SCATTER, comm
                              );
     tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
     smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
-                            MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                            COLL_TAG_REDUCE_SCATTER,
                              comm);
 
     for (k = 2; k < size; k++) {
@@ -468,7 +489,7 @@ smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
 
         /* Post irecv for the current block */
         reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
-                                 MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm 
+                                 COLL_TAG_REDUCE_SCATTER, comm
                                  );
       
         /* Wait on previous block to arrive */
@@ -482,7 +503,7 @@ smpi_coll_tuned_reduce_scatter_ompi_ring(void *sbuf, void *rbuf, int *rcounts,
       
         /* send previous block to send_to */
         smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
-                                MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+                                COLL_TAG_REDUCE_SCATTER,
                                  comm);
     }