X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/061f1e9720921cc227dab4f6ddeca9d30028e5d4..845590d7b4f0790c34191697d1b6c28d50d1ccbd:/src/smpi/colls/smpi_default_selector.cpp?ds=sidebyside diff --git a/src/smpi/colls/smpi_default_selector.cpp b/src/smpi/colls/smpi_default_selector.cpp index dca9430c06..1aebcf6a9b 100644 --- a/src/smpi/colls/smpi_default_selector.cpp +++ b/src/smpi/colls/smpi_default_selector.cpp @@ -89,70 +89,13 @@ int Coll_scatter_default::scatter(void *sendbuf, int sendcount, MPI_Datatype sen int Coll_reduce_default::reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { - const int system_tag = COLL_TAG_REDUCE; - MPI_Aint lb = 0; - MPI_Aint dataext = 0; - - char* sendtmpbuf = static_cast(sendbuf); - - int rank = comm->rank(); - int size = comm->size(); - if (size <= 0) - return MPI_ERR_COMM; //non commutative case, use a working algo from openmpi if (op != MPI_OP_NULL && not op->is_commutative()) { - return Coll_reduce_ompi_basic_linear::reduce(sendtmpbuf, recvbuf, count, datatype, op, root, comm); + return Coll_reduce_ompi_basic_linear::reduce(sendbuf, recvbuf, count, datatype, op, root, comm); } - - if( sendbuf == MPI_IN_PLACE ) { - sendtmpbuf = static_cast(smpi_get_tmp_sendbuffer(count*datatype->get_extent())); - Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype); - } - - if(rank != root) { - // Send buffer to root - Request::send(sendtmpbuf, count, datatype, root, system_tag, comm); - } else { - datatype->extent(&lb, &dataext); - // Local copy from root - if (sendtmpbuf != nullptr && recvbuf != nullptr) - Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype); - // Receive buffers from senders - MPI_Request *requests = xbt_new(MPI_Request, size - 1); - void **tmpbufs = xbt_new(void *, size - 1); - int index = 0; - for (int src = 0; src < size; src++) { - if (src != root) { - tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext); - requests[index] = - Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm); - index++; - } - } - // Wait for completion of irecv's. - Request::startall(size - 1, requests); - for (int src = 0; src < size - 1; src++) { - index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE); - XBT_DEBUG("finished waiting any request with index %d", index); - if(index == MPI_UNDEFINED) { - break; - }else{ - Request::unref(&requests[index]); - } - if(op) /* op can be MPI_OP_NULL that does nothing */ - if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype); - } - for(index = 0; index < size - 1; index++) { - smpi_free_tmp_buffer(tmpbufs[index]); - } - xbt_free(tmpbufs); - xbt_free(requests); - - } - if( sendbuf == MPI_IN_PLACE ) { - smpi_free_tmp_buffer(sendtmpbuf); - } - return MPI_SUCCESS; + MPI_Request request; + Colls::ireduce(sendbuf, recvbuf, count, datatype, op, root, comm, &request); + return Request::wait(&request, MPI_STATUS_IGNORE); } int Coll_allreduce_default::allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)