X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/68f707462521f974a8839675ab66e3527125ccbc..6d004c352f7b26fba38486001f874e65466b5bee:/src/smpi/colls/smpi_default_selector.cpp diff --git a/src/smpi/colls/smpi_default_selector.cpp b/src/smpi/colls/smpi_default_selector.cpp index f2751b96ef..e43e0119ec 100644 --- a/src/smpi/colls/smpi_default_selector.cpp +++ b/src/smpi/colls/smpi_default_selector.cpp @@ -22,24 +22,15 @@ int Coll_barrier_default::barrier(MPI_Comm comm) } -int Coll_gather_default::gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, +int Coll_gather_default::gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { MPI_Request request; Colls::igather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, &request); - MPI_Request* requests = request->get_nbc_requests(); - int count = request->get_nbc_requests_size(); - Request::waitall(count, requests, MPI_STATUS_IGNORE); - for (int i = 0; i < count; i++) { - if(requests[i]!=MPI_REQUEST_NULL) - Request::unref(&requests[i]); - } - delete[] requests; - Request::unref(&request); - return MPI_SUCCESS; + return Request::wait(&request, MPI_STATUS_IGNORE); } -int Coll_reduce_scatter_default::reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, +int Coll_reduce_scatter_default::reduce_scatter(const void *sendbuf, void *recvbuf, const int *recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { int rank = comm->rank(); @@ -47,40 +38,32 @@ int Coll_reduce_scatter_default::reduce_scatter(void *sendbuf, void *recvbuf, in /* arbitrarily choose root as rank 0 */ int size = comm->size(); int count = 0; - int *displs = xbt_new(int, size); + int* displs = new int[size]; for (int i = 0; i < size; i++) { displs[i] = count; count += recvcounts[i]; } - void *tmpbuf = static_cast(smpi_get_tmp_sendbuffer(count*datatype->get_extent())); + unsigned char* tmpbuf = smpi_get_tmp_sendbuffer(count * datatype->get_extent()); int ret = Coll_reduce_default::reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm); if(ret==MPI_SUCCESS) ret = Colls::scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm); - xbt_free(displs); + delete[] displs; smpi_free_tmp_buffer(tmpbuf); return ret; } -int Coll_allgather_default::allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype, +int Coll_allgather_default::allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { MPI_Request request; Colls::iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, &request); - MPI_Request* requests = request->get_nbc_requests(); - int count = request->get_nbc_requests_size(); - Request::waitall(count, requests, MPI_STATUS_IGNORE); - for (int other = 0; other < count; other++) { - Request::unref(&requests[other]); - } - delete[] requests; - Request::unref(&request); - return MPI_SUCCESS; + return Request::wait(&request, MPI_STATUS_IGNORE); } -int Coll_allgatherv_default::allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, - int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm) +int Coll_allgatherv_default::allgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, + const int *recvcounts, const int *displs, MPI_Datatype recvtype, MPI_Comm comm) { MPI_Request request; Colls::iallgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm, &request); @@ -95,96 +78,31 @@ int Coll_allgatherv_default::allgatherv(void *sendbuf, int sendcount, MPI_Dataty return MPI_SUCCESS; } -int Coll_scatter_default::scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype, +int Coll_scatter_default::scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { MPI_Request request; Colls::iscatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, &request); - MPI_Request* requests = request->get_nbc_requests(); - int count = request->get_nbc_requests_size(); - Request::waitall(count, requests, MPI_STATUS_IGNORE); - for (int dst = 0; dst < count; dst++) { - if(requests[dst]!=MPI_REQUEST_NULL) - Request::unref(&requests[dst]); - } - delete[] requests; - Request::unref(&request); - return MPI_SUCCESS; + return Request::wait(&request, MPI_STATUS_IGNORE); } - - -int Coll_reduce_default::reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, +int Coll_reduce_default::reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { - const int system_tag = COLL_TAG_REDUCE; - MPI_Aint lb = 0; - MPI_Aint dataext = 0; - - char* sendtmpbuf = static_cast(sendbuf); - - int rank = comm->rank(); - int size = comm->size(); - if (size <= 0) - return MPI_ERR_COMM; //non commutative case, use a working algo from openmpi - if (op != MPI_OP_NULL && not op->is_commutative()) { - return Coll_reduce_ompi_basic_linear::reduce(sendtmpbuf, recvbuf, count, datatype, op, root, comm); - } - - if( sendbuf == MPI_IN_PLACE ) { - sendtmpbuf = static_cast(smpi_get_tmp_sendbuffer(count*datatype->get_extent())); - Datatype::copy(recvbuf, count, datatype,sendtmpbuf, count, datatype); + if (op != MPI_OP_NULL && (datatype->flags() & DT_FLAG_DERIVED || not op->is_commutative())) { + return Coll_reduce_ompi_basic_linear::reduce(sendbuf, recvbuf, count, datatype, op, root, comm); } - - if(rank != root) { - // Send buffer to root - Request::send(sendtmpbuf, count, datatype, root, system_tag, comm); - } else { - datatype->extent(&lb, &dataext); - // Local copy from root - if (sendtmpbuf != nullptr && recvbuf != nullptr) - Datatype::copy(sendtmpbuf, count, datatype, recvbuf, count, datatype); - // Receive buffers from senders - MPI_Request *requests = xbt_new(MPI_Request, size - 1); - void **tmpbufs = xbt_new(void *, size - 1); - int index = 0; - for (int src = 0; src < size; src++) { - if (src != root) { - tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext); - requests[index] = - Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm); - index++; - } - } - // Wait for completion of irecv's. - Request::startall(size - 1, requests); - for (int src = 0; src < size - 1; src++) { - index = Request::waitany(size - 1, requests, MPI_STATUS_IGNORE); - XBT_DEBUG("finished waiting any request with index %d", index); - if(index == MPI_UNDEFINED) { - break; - }else{ - Request::unref(&requests[index]); - } - if(op) /* op can be MPI_OP_NULL that does nothing */ - if(op!=MPI_OP_NULL) op->apply( tmpbufs[index], recvbuf, &count, datatype); - } - for(index = 0; index < size - 1; index++) { - smpi_free_tmp_buffer(tmpbufs[index]); - } - xbt_free(tmpbufs); - xbt_free(requests); - - } - if( sendbuf == MPI_IN_PLACE ) { - smpi_free_tmp_buffer(sendtmpbuf); - } - return MPI_SUCCESS; + MPI_Request request; + Colls::ireduce(sendbuf, recvbuf, count, datatype, op, root, comm, &request); + return Request::wait(&request, MPI_STATUS_IGNORE); } -int Coll_allreduce_default::allreduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) +int Coll_allreduce_default::allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { + //FIXME: have mpi_ireduce and iallreduce handle derived datatypes correctly + if(datatype->flags() & DT_FLAG_DERIVED) + return Coll_allreduce_ompi::allreduce(sendbuf, recvbuf, count, datatype, op, comm); int ret; ret = Coll_reduce_default::reduce(sendbuf, recvbuf, count, datatype, op, 0, comm); if(ret==MPI_SUCCESS) @@ -192,27 +110,17 @@ int Coll_allreduce_default::allreduce(void *sendbuf, void *recvbuf, int count, M return ret; } -int Coll_alltoall_default::alltoall( void *sbuf, int scount, MPI_Datatype sdtype, void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm) +int Coll_alltoall_default::alltoall(const void *sbuf, int scount, MPI_Datatype sdtype, void* rbuf, int rcount, MPI_Datatype rdtype, MPI_Comm comm) { return Coll_alltoall_ompi::alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm); } -int Coll_alltoallv_default::alltoallv(void *sendbuf, int *sendcounts, int *senddisps, MPI_Datatype sendtype, - void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm) +int Coll_alltoallv_default::alltoallv(const void *sendbuf, const int *sendcounts, const int *senddisps, MPI_Datatype sendtype, + void *recvbuf, const int *recvcounts, const int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm) { MPI_Request request; - int err = Colls::ialltoallv(sendbuf, sendcounts, senddisps, sendtype, recvbuf, recvcounts, recvdisps, recvtype, comm, &request); - MPI_Request* requests = request->get_nbc_requests(); - int count = request->get_nbc_requests_size(); - XBT_DEBUG("<%d> wait for %d requests", comm->rank(), count); - Request::waitall(count, requests, MPI_STATUS_IGNORE); - for (int i = 0; i < count; i++) { - if(requests[i]!=MPI_REQUEST_NULL) - Request::unref(&requests[i]); - } - delete[] requests; - Request::unref(&request); - return err; + Colls::ialltoallv(sendbuf, sendcounts, senddisps, sendtype, recvbuf, recvcounts, recvdisps, recvtype, comm, &request); + return Request::wait(&request, MPI_STATUS_IGNORE); } }