int tag = 4321;
MPI_Aint extent;
void *tmp_buf;
-
+ MPI_Aint true_lb, true_extent;
if (count == 0)
return 0;
rank = smpi_comm_rank(comm);
extent = smpi_datatype_get_extent(datatype);
tmp_buf = (void *) xbt_malloc(count * extent);
-
+ int is_commutative = smpi_op_is_commute(op);
smpi_mpi_sendrecv(sendbuf, count, datatype, rank, tag,
recvbuf, count, datatype, rank, tag, comm, &status);
mask = 1;
- relrank = (rank - root + comm_size) % comm_size;
+
+ int lroot;
+ if (is_commutative)
+ lroot = root;
+ else
+ lroot = 0;
+ relrank = (rank - lroot + comm_size) % comm_size;
+
+ smpi_datatype_extent(datatype, &true_lb, &true_extent);
+
+ /* adjust for potential negative lower bound in datatype */
+ tmp_buf = (void *)((char*)tmp_buf - true_lb);
+
+ /* If I'm not the root, then my recvbuf may not be valid, therefore
+ I have to allocate a temporary one */
+ if (rank != root) {
+ recvbuf = (void *) malloc(count*(max(extent,true_extent)));
+ recvbuf = (void *)((char*)recvbuf - true_lb);
+ }
+ if ((rank != root) || (sendbuf != MPI_IN_PLACE)) {
+ smpi_datatype_copy(sendbuf, count, datatype, recvbuf,count, datatype);
+ }
while (mask < comm_size) {
/* Receive */
if ((mask & relrank) == 0) {
source = (relrank | mask);
if (source < comm_size) {
- source = (source + root) % comm_size;
+ source = (source + lroot) % comm_size;
smpi_mpi_recv(tmp_buf, count, datatype, source, tag, comm, &status);
- smpi_op_apply(op, tmp_buf, recvbuf, &count, &datatype);
+
+ if (is_commutative) {
+ smpi_op_apply(op, tmp_buf, recvbuf, &count, &datatype);
+ } else {
+ smpi_op_apply(op, recvbuf, tmp_buf, &count, &datatype);
+ smpi_datatype_copy(tmp_buf, count, datatype,recvbuf, count, datatype);
+ }
}
} else {
- dst = ((relrank & (~mask)) + root) % comm_size;
+ dst = ((relrank & (~mask)) + lroot) % comm_size;
smpi_mpi_send(recvbuf, count, datatype, dst, tag, comm);
break;
}
mask <<= 1;
}
+if (!is_commutative && (root != 0)){
+ if (rank == 0){
+ smpi_mpi_send(recvbuf, count, datatype, root,tag, comm);
+ }else if (rank == root){
+ smpi_mpi_recv(recvbuf, count, datatype, 0, tag, comm, &status);
+ }
+}
+
free(tmp_buf);
return 0;
sendbuf to the accumbuf, in order to simplfy the loops */
if (!smpi_op_is_commute(op)) {
smpi_datatype_copy(
- (char*)accumbuf, original_count, datatype,
- (char*)sendtmpbuf, original_count, datatype);
+ (char*)sendtmpbuf, original_count, datatype,
+ (char*)accumbuf, original_count, datatype);
}
/* Allocate two buffers for incoming segments */
real_segment_size = true_extent + (count_by_segment - 1) * extent;
return MPI_ERR_INTERN;
}
smpi_datatype_copy (
- (char*)tmpbuf, count, datatype,
- (char*)recvbuf, count, datatype);
+ (char*)recvbuf, count, datatype,
+ (char*)tmpbuf, count, datatype);
use_this_sendbuf = tmpbuf;
} else if (io_root == rank) {
tmpbuf = (char *) malloc(text + (count - 1) * ext);
/* Initialize the receive buffer. */
if (rank == (size - 1)) {
- smpi_datatype_copy((char*)rbuf, count, dtype,
- (char*)sbuf, count, dtype);
+ smpi_datatype_copy((char*)sbuf, count, dtype,(char*)rbuf, count, dtype);
} else {
smpi_mpi_recv(rbuf, count, dtype, size - 1,
MCA_COLL_BASE_TAG_REDUCE, comm,
}
if (NULL != inplace_temp) {
- smpi_datatype_copy((char*)sbuf, count, dtype,
- inplace_temp,count , dtype);
+ smpi_datatype_copy(inplace_temp, count, dtype,(char*)sbuf
+ ,count , dtype);
free(inplace_temp);
}
if (NULL != free_buffer) {
#include "simix/smx_private.h"
#include "surf/surf.h"
#include "simgrid/sg_config.h"
-
+#include "colls/colls.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_base, smpi, "Logging specific to SMPI (base)");
rank = smpi_comm_rank(comm);
size = smpi_comm_size(comm);
+ //non commutative case, use a working algo from openmpi
+ if(!smpi_op_is_commute(op)){
+ smpi_coll_tuned_reduce_ompi_basic_linear(sendbuf, recvbuf, count,
+ datatype, op, root, comm);
+ return;
+ }
+
if(rank != root) {
// Send buffer to root
smpi_mpi_send(sendbuf, count, datatype, root, system_tag, comm);
RunTest coll9 4
-#smpi does not handle non commutative operations, removed
-#RunTest coll10 4
+RunTest coll10 4
-#smpi does not handle non commutative operations, removed
RunTest coll11 4
#weird manipulations of ranks in split, and comms -> deadlock, removed