-/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
+/* Copyright (c) 2013-2014. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
/*
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* All rights reserved.
* Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2009 University of Houston. All rights reserved.
- * $COPYRIGHT$
*
* Additional copyrights may follow
- *
- * $HEADER$
*/
#include "colls_private.h"
#include "coll_tuned_topo.h"
+#include "xbt/replay.h"
-#define MCA_COLL_BASE_TAG_REDUCE_SCATTER 222
/*
* Recursive-halving function is (*mostly*) copied from the BASIC coll module.
* I have removed the part which handles "large" message sizes
size = smpi_comm_size(comm);
XBT_DEBUG("coll:tuned:reduce_scatter_ompi_basic_recursivehalving, rank %d", rank);
+ if(!smpi_op_is_commute(op))
+ THROWF(arg_error,0, " reduce_scatter ompi_basic_recursivehalving can only be used for commutative operations! ");
/* Find displacements and the like */
disps = (int*) xbt_malloc(sizeof(int) * size);
}
/* Allocate temporary receive buffer. */
- recv_buf_free = (char*) xbt_malloc(buf_size);
+ recv_buf_free = (char*) smpi_get_tmp_recvbuffer(buf_size);
+
recv_buf = recv_buf_free - lb;
if (NULL == recv_buf_free) {
err = MPI_ERR_OTHER;
}
/* allocate temporary buffer for results */
- result_buf_free = (char*) xbt_malloc(buf_size);
+ result_buf_free = (char*) smpi_get_tmp_sendbuffer(buf_size);
+
result_buf = result_buf_free - lb;
/* copy local buffer into the temporary results */
if (rank < 2 * remain) {
if ((rank & 1) == 0) {
smpi_mpi_send(result_buf, count, dtype, rank + 1,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm);
/* we don't participate from here on out */
tmp_rank = -1;
} else {
smpi_mpi_recv(recv_buf, count, dtype, rank - 1,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm, MPI_STATUS_IGNORE);
/* integrate their results into our temp results */
if (send_count > 0 && recv_count != 0) {
request=smpi_mpi_irecv(recv_buf + (ptrdiff_t)tmp_disps[recv_index] * extent,
recv_count, dtype, peer,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm);
if (MPI_SUCCESS != err) {
xbt_free(tmp_rcounts);
if (recv_count > 0 && send_count != 0) {
smpi_mpi_send(result_buf + (ptrdiff_t)tmp_disps[send_index] * extent,
send_count, dtype, peer,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm);
if (MPI_SUCCESS != err) {
xbt_free(tmp_rcounts);
if ((rank & 1) == 0) {
if (rcounts[rank]) {
smpi_mpi_recv(rbuf, rcounts[rank], dtype, rank + 1,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm, MPI_STATUS_IGNORE);
}
} else {
if (rcounts[rank - 1]) {
smpi_mpi_send(result_buf + disps[rank - 1] * extent,
rcounts[rank - 1], dtype, rank - 1,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm);
}
}
cleanup:
if (NULL != disps) xbt_free(disps);
- if (NULL != recv_buf_free) xbt_free(recv_buf_free);
- if (NULL != result_buf_free) xbt_free(result_buf_free);
+ if (NULL != recv_buf_free) smpi_free_tmp_buffer(recv_buf_free);
+ if (NULL != result_buf_free) smpi_free_tmp_buffer(result_buf_free);
return err;
}
max_real_segsize = true_extent + (ptrdiff_t)(max_block_count - 1) * extent;
- accumbuf_free = (char*)xbt_malloc(true_extent + (ptrdiff_t)(total_count - 1) * extent);
+ accumbuf_free = (char*)smpi_get_tmp_recvbuffer(true_extent + (ptrdiff_t)(total_count - 1) * extent);
if (NULL == accumbuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
accumbuf = accumbuf_free - lb;
- inbuf_free[0] = (char*)xbt_malloc(max_real_segsize);
+ inbuf_free[0] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
if (NULL == inbuf_free[0]) { ret = -1; line = __LINE__; goto error_hndl; }
inbuf[0] = inbuf_free[0] - lb;
if (size > 2) {
- inbuf_free[1] = (char*)xbt_malloc(max_real_segsize);
+ inbuf_free[1] = (char*)smpi_get_tmp_sendbuffer(max_real_segsize);
if (NULL == inbuf_free[1]) { ret = -1; line = __LINE__; goto error_hndl; }
inbuf[1] = inbuf_free[1] - lb;
}
inbi = 0;
/* Initialize first receive from the neighbor on the left */
reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm
+ COLL_TAG_REDUCE_SCATTER, comm
);
tmpsend = accumbuf + (ptrdiff_t)displs[recv_from] * extent;
smpi_mpi_send(tmpsend, rcounts[recv_from], dtype, send_to,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm);
for (k = 2; k < size; k++) {
/* Post irecv for the current block */
reqs[inbi]=smpi_mpi_irecv(inbuf[inbi], max_block_count, dtype, recv_from,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm
+ COLL_TAG_REDUCE_SCATTER, comm
);
/* Wait on previous block to arrive */
/* send previous block to send_to */
smpi_mpi_send(tmprecv, rcounts[prevblock], dtype, send_to,
- MCA_COLL_BASE_TAG_REDUCE_SCATTER,
+ COLL_TAG_REDUCE_SCATTER,
comm);
}
if (ret < 0) { line = __LINE__; goto error_hndl; }
if (NULL != displs) xbt_free(displs);
- if (NULL != accumbuf_free) xbt_free(accumbuf_free);
- if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
- if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
+ if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
+ if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
+ if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);
return MPI_SUCCESS;
XBT_DEBUG( "%s:%4d\tRank %d Error occurred %d\n",
__FILE__, line, rank, ret);
if (NULL != displs) xbt_free(displs);
- if (NULL != accumbuf_free) xbt_free(accumbuf_free);
- if (NULL != inbuf_free[0]) xbt_free(inbuf_free[0]);
- if (NULL != inbuf_free[1]) xbt_free(inbuf_free[1]);
+ if (NULL != accumbuf_free) smpi_free_tmp_buffer(accumbuf_free);
+ if (NULL != inbuf_free[0]) smpi_free_tmp_buffer(inbuf_free[0]);
+ if (NULL != inbuf_free[1]) smpi_free_tmp_buffer(inbuf_free[1]);
return ret;
}