X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/f440f91c0d78d785b2a9cf87c718a54af453cf26..deb40f5c576f72119cf77e575d4cc068c12e66cf:/src/smpi/smpi_base.c diff --git a/src/smpi/smpi_base.c b/src/smpi/smpi_base.c index 73fe79ad64..ec16dbe55f 100644 --- a/src/smpi/smpi_base.c +++ b/src/smpi/smpi_base.c @@ -1247,7 +1247,7 @@ void smpi_mpi_allreduce(void *sendbuf, void *recvbuf, int count, void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { - int system_tag = 888; + int system_tag = -888; int rank, size, other, index; MPI_Aint lb = 0, dataext = 0; MPI_Request *requests; @@ -1282,14 +1282,94 @@ void smpi_mpi_scan(void *sendbuf, void *recvbuf, int count, } // Wait for completion of all comms. smpi_mpi_startall(size - 1, requests); - for(other = 0; other < size - 1; other++) { - index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); - if(index == MPI_UNDEFINED) { - break; + + if(smpi_op_is_commute(op)){ + for(other = 0; other < size - 1; other++) { + index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); + if(index == MPI_UNDEFINED) { + break; + } + if(index < rank) { + // #Request is below rank: it's a irecv + smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); + } } - if(index < rank) { - // #Request is below rank: it's a irecv - smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); + }else{ + //non commutative case, wait in order + for(other = 0; other < size - 1; other++) { + smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE); + if(index < rank) { + smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); + } + } + } + for(index = 0; index < rank; index++) { + xbt_free(tmpbufs[index]); + } + xbt_free(tmpbufs); + xbt_free(requests); +} + +void smpi_mpi_exscan(void *sendbuf, void *recvbuf, int count, + MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) +{ + int system_tag = -888; + int rank, size, other, index; + MPI_Aint lb = 0, dataext = 0; + MPI_Request *requests; + void **tmpbufs; + int recvbuf_is_empty=1; + rank = smpi_comm_rank(comm); + size = smpi_comm_size(comm); + + // FIXME: check for errors + smpi_datatype_extent(datatype, &lb, &dataext); + + // Send/Recv buffers to/from others; + requests = xbt_new(MPI_Request, size - 1); + tmpbufs = xbt_new(void *, rank); + index = 0; + for(other = 0; other < rank; other++) { + // FIXME: possibly overkill we we have contiguous/noncontiguous data + // mapping... + tmpbufs[index] = xbt_malloc(count * dataext); + requests[index] = + smpi_irecv_init(tmpbufs[index], count, datatype, other, system_tag, + comm); + index++; + } + for(other = rank + 1; other < size; other++) { + requests[index] = + smpi_isend_init(sendbuf, count, datatype, other, system_tag, comm); + index++; + } + // Wait for completion of all comms. + smpi_mpi_startall(size - 1, requests); + if(smpi_op_is_commute(op)){ + for(other = 0; other < size - 1; other++) { + index = smpi_mpi_waitany(size - 1, requests, MPI_STATUS_IGNORE); + if(index == MPI_UNDEFINED) { + break; + } + if(index < rank) { + if(recvbuf_is_empty){ + smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype); + recvbuf_is_empty=0; + }else + // #Request is below rank: it's a irecv + smpi_op_apply(op, tmpbufs[index], recvbuf, &count, &datatype); + } + } + }else{ + //non commutative case, wait in order + for(other = 0; other < size - 1; other++) { + smpi_mpi_wait(&(requests[other]), MPI_STATUS_IGNORE); + if(index < rank) { + if(recvbuf_is_empty){ + smpi_datatype_copy(tmpbufs[index], count, datatype, recvbuf, count, datatype); + recvbuf_is_empty=0; + }else smpi_op_apply(op, tmpbufs[other], recvbuf, &count, &datatype); + } } } for(index = 0; index < rank; index++) {