From deb40f5c576f72119cf77e575d4cc068c12e66cf Mon Sep 17 00:00:00 2001 From: Augustin Degomme Date: Mon, 8 Jul 2013 16:51:55 +0200 Subject: [PATCH] handle non commutative case for scan/exscan --- src/smpi/smpi_base.c | 61 ++++++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 8afbcae3cb..ec16dbe55f 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -1247,7 +1247,7 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { - int system_tag = 888; + int system_tag = -888; int rank, size, other, index; MPI_Aint lb = 0, dataext = 0; MPI_Request *requests; @@ -1282,14 +1282,25 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, } // Wait for completion of all comms. smpi_mpi_startall(size - 1, requests); - for(other = 0; other < size - 1; other++) { - index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); - if(index == MPI_UNDEFINED) { - break; + + if(smpi_op_is_commute(op)){ + for(other = 0; other < size - 1; other++) { + index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); + if(index == MPI_UNDEFINED) { + break; + } + if(index < rank) { + // #Request is below rank: it's a irecv + smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); + } } - if(index < rank) { - // #Request is below rank: it's a irecv - smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); + }else{ + //non commutative case, wait in order + for(other = 0; other < size - 1; other++) { + smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE); + if(index < rank) { + smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); + } } } for(index = 0; index < rank; index++) { @@ -1302,7 +1313,7 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { - int system_tag = 888; + int system_tag = -888; int rank, size, other, index; MPI_Aint lb = 0, dataext = 0; MPI_Request *requests; @@ -1334,15 +1345,31 @@ void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, } // Wait for completion of all comms. smpi_mpi_startall(size - 1, requests); - for(other = 0; other < size - 1; other++) { - index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); - if(index == MPI_UNDEFINED) { - break; + if(smpi_op_is_commute(op)){ + for(other = 0; other < size - 1; other++) { + index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); + if(index == MPI_UNDEFINED) { + break; + } + if(index < rank) { + if(recvbuf_is_empty){ + smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype); + recvbuf_is_empty=0; + }else + // #Request is below rank: it's a irecv + smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); + } } - if(index < rank) { - if(recvbuf_is_empty) smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype); - // #Request is below rank: it's a irecv - else smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); + }else{ + //non commutative case, wait in order + for(other = 0; other < size - 1; other++) { + smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE); + if(index < rank) { + if(recvbuf_is_empty){ + smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype); + recvbuf_is_empty=0; + }else smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); + } } } for(index = 0; index < rank; index++) { -- 2.20.1