X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/e10bd21dc31e83417de6984b8790e7a147aaa7b5..8b9fbe2cd65513016e1a70b47cabfe0f688285fa:/src/smpi/colls/smpi_default_selector.cpp diff --git a/src/smpi/colls/smpi_default_selector.cpp b/src/smpi/colls/smpi_default_selector.cpp index aa44938f40..dca9430c06 100644 --- a/src/smpi/colls/smpi_default_selector.cpp +++ b/src/smpi/colls/smpi_default_selector.cpp @@ -25,39 +25,9 @@ int Coll_barrier_default::barrier(MPI_Comm comm) int Coll_gather_default::gather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { - const int system_tag = COLL_TAG_GATHER; - MPI_Aint lb = 0; - MPI_Aint recvext = 0; - - int rank = comm->rank(); - int size = comm->size(); - if(rank != root) { - // Send buffer to root - Request::send(sendbuf, sendcount, sendtype, root, system_tag, comm); - } else { - recvtype->extent(&lb, &recvext); - // Local copy from root - Datatype::copy(sendbuf, sendcount, sendtype, static_cast(recvbuf) + root * recvcount * recvext, - recvcount, recvtype); - // Receive buffers from senders - MPI_Request *requests = xbt_new(MPI_Request, size - 1); - int index = 0; - for (int src = 0; src < size; src++) { - if(src != root) { - requests[index] = Request::irecv_init(static_cast(recvbuf) + src * recvcount * recvext, recvcount, recvtype, - src, system_tag, comm); - index++; - } - } - // Wait for completion of irecv's. - Request::startall(size - 1, requests); - Request::waitall(size - 1, requests, MPI_STATUS_IGNORE); - for (int src = 0; src < size-1; src++) { - Request::unref(&requests[src]); - } - xbt_free(requests); - } - return MPI_SUCCESS; + MPI_Request request; + Colls::igather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, &request); + return Request::wait(&request, MPI_STATUS_IGNORE); } int Coll_reduce_scatter_default::reduce_scatter(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, @@ -74,9 +44,8 @@ int Coll_reduce_scatter_default::reduce_scatter(void *sendbuf, void *recvbuf, in count += recvcounts[i]; } void *tmpbuf = static_cast(smpi_get_tmp_sendbuffer(count*datatype->get_extent())); - int ret = MPI_SUCCESS; - ret = Coll_reduce_default::reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm); + int ret = Coll_reduce_default::reduce(sendbuf, tmpbuf, count, datatype, op, 0, comm); if(ret==MPI_SUCCESS) ret = Colls::scatterv(tmpbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, 0, comm); xbt_free(displs); @@ -88,119 +57,35 @@ int Coll_reduce_scatter_default::reduce_scatter(void *sendbuf, void *recvbuf, in int Coll_allgather_default::allgather(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { - const int system_tag = COLL_TAG_ALLGATHER; - MPI_Aint lb = 0; - MPI_Aint recvext = 0; - MPI_Request *requests; - - int rank = comm->rank(); - int size = comm->size(); - // FIXME: check for errors - recvtype->extent(&lb, &recvext); - // Local copy from self - Datatype::copy(sendbuf, sendcount, sendtype, static_cast(recvbuf) + rank * recvcount * recvext, recvcount, - recvtype); - // Send/Recv buffers to/from others; - requests = xbt_new(MPI_Request, 2 * (size - 1)); - int index = 0; - for (int other = 0; other < size; other++) { - if(other != rank) { - requests[index] = Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag,comm); - index++; - requests[index] = Request::irecv_init(static_cast(recvbuf) + other * recvcount * recvext, recvcount, recvtype, - other, system_tag, comm); - index++; - } - } - // Wait for completion of all comms. - Request::startall(2 * (size - 1), requests); - Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE); - for (int other = 0; other < 2*(size-1); other++) { - Request::unref(&requests[other]); - } - xbt_free(requests); - return MPI_SUCCESS; + MPI_Request request; + Colls::iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm, &request); + return Request::wait(&request, MPI_STATUS_IGNORE); } int Coll_allgatherv_default::allgatherv(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm) { - const int system_tag = COLL_TAG_ALLGATHERV; - MPI_Aint lb = 0; - MPI_Aint recvext = 0; - - int rank = comm->rank(); - int size = comm->size(); - recvtype->extent(&lb, &recvext); - // Local copy from self - Datatype::copy(sendbuf, sendcount, sendtype, - static_cast(recvbuf) + displs[rank] * recvext,recvcounts[rank], recvtype); - // Send buffers to others; - MPI_Request *requests = xbt_new(MPI_Request, 2 * (size - 1)); - int index = 0; - for (int other = 0; other < size; other++) { - if(other != rank) { - requests[index] = - Request::isend_init(sendbuf, sendcount, sendtype, other, system_tag, comm); - index++; - requests[index] = Request::irecv_init(static_cast(recvbuf) + displs[other] * recvext, recvcounts[other], - recvtype, other, system_tag, comm); - index++; - } - } - // Wait for completion of all comms. - Request::startall(2 * (size - 1), requests); - Request::waitall(2 * (size - 1), requests, MPI_STATUS_IGNORE); - for (int other = 0; other < 2*(size-1); other++) { + MPI_Request request; + Colls::iallgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm, &request); + MPI_Request* requests = request->get_nbc_requests(); + int count = request->get_nbc_requests_size(); + Request::waitall(count, requests, MPI_STATUS_IGNORE); + for (int other = 0; other < count; other++) { Request::unref(&requests[other]); } - xbt_free(requests); + delete[] requests; + Request::unref(&request); return MPI_SUCCESS; } int Coll_scatter_default::scatter(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { - const int system_tag = COLL_TAG_SCATTER; - MPI_Aint lb = 0; - MPI_Aint sendext = 0; - MPI_Request *requests; - - int rank = comm->rank(); - int size = comm->size(); - if(rank != root) { - // Recv buffer from root - Request::recv(recvbuf, recvcount, recvtype, root, system_tag, comm, MPI_STATUS_IGNORE); - } else { - sendtype->extent(&lb, &sendext); - // Local copy from root - if(recvbuf!=MPI_IN_PLACE){ - Datatype::copy(static_cast(sendbuf) + root * sendcount * sendext, - sendcount, sendtype, recvbuf, recvcount, recvtype); - } - // Send buffers to receivers - requests = xbt_new(MPI_Request, size - 1); - int index = 0; - for(int dst = 0; dst < size; dst++) { - if(dst != root) { - requests[index] = Request::isend_init(static_cast(sendbuf) + dst * sendcount * sendext, sendcount, sendtype, - dst, system_tag, comm); - index++; - } - } - // Wait for completion of isend's. - Request::startall(size - 1, requests); - Request::waitall(size - 1, requests, MPI_STATUS_IGNORE); - for (int dst = 0; dst < size-1; dst++) { - Request::unref(&requests[dst]); - } - xbt_free(requests); - } - return MPI_SUCCESS; + MPI_Request request; + Colls::iscatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm, &request); + return Request::wait(&request, MPI_STATUS_IGNORE); } - - int Coll_reduce_default::reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { @@ -238,10 +123,7 @@ int Coll_reduce_default::reduce(void *sendbuf, void *recvbuf, int count, MPI_Dat int index = 0; for (int src = 0; src < size; src++) { if (src != root) { - if (not smpi_process()->replaying()) - tmpbufs[index] = xbt_malloc(count * dataext); - else - tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext); + tmpbufs[index] = smpi_get_tmp_sendbuffer(count * dataext); requests[index] = Request::irecv_init(tmpbufs[index], count, datatype, src, system_tag, comm); index++; @@ -287,61 +169,12 @@ int Coll_alltoall_default::alltoall( void *sbuf, int scount, MPI_Datatype sdtype return Coll_alltoall_ompi::alltoall(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm); } - - int Coll_alltoallv_default::alltoallv(void *sendbuf, int *sendcounts, int *senddisps, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *recvdisps, MPI_Datatype recvtype, MPI_Comm comm) { - const int system_tag = 889; - MPI_Aint lb = 0; - MPI_Aint sendext = 0; - MPI_Aint recvext = 0; - MPI_Request *requests; - - /* Initialize. */ - int rank = comm->rank(); - int size = comm->size(); - XBT_DEBUG("<%d> algorithm basic_alltoallv() called.", rank); - sendtype->extent(&lb, &sendext); - recvtype->extent(&lb, &recvext); - /* Local copy from self */ - int err = Datatype::copy(static_cast(sendbuf) + senddisps[rank] * sendext, sendcounts[rank], sendtype, - static_cast(recvbuf) + recvdisps[rank] * recvext, recvcounts[rank], recvtype); - if (err == MPI_SUCCESS && size > 1) { - /* Initiate all send/recv to/from others. */ - requests = xbt_new(MPI_Request, 2 * (size - 1)); - int count = 0; - /* Create all receives that will be posted first */ - for (int i = 0; i < size; ++i) { - if (i != rank && recvcounts[i] != 0) { - requests[count] = Request::irecv_init(static_cast(recvbuf) + recvdisps[i] * recvext, - recvcounts[i], recvtype, i, system_tag, comm); - count++; - }else{ - XBT_DEBUG("<%d> skip request creation [src = %d, recvcounts[src] = %d]", rank, i, recvcounts[i]); - } - } - /* Now create all sends */ - for (int i = 0; i < size; ++i) { - if (i != rank && sendcounts[i] != 0) { - requests[count] = Request::isend_init(static_cast(sendbuf) + senddisps[i] * sendext, - sendcounts[i], sendtype, i, system_tag, comm); - count++; - }else{ - XBT_DEBUG("<%d> skip request creation [dst = %d, sendcounts[dst] = %d]", rank, i, sendcounts[i]); - } - } - /* Wait for them all. */ - Request::startall(count, requests); - XBT_DEBUG("<%d> wait for %d requests", rank, count); - Request::waitall(count, requests, MPI_STATUS_IGNORE); - for (int i = 0; i < count; i++) { - if(requests[i]!=MPI_REQUEST_NULL) - Request::unref(&requests[i]); - } - xbt_free(requests); - } - return err; + MPI_Request request; + Colls::ialltoallv(sendbuf, sendcounts, senddisps, sendtype, recvbuf, recvcounts, recvdisps, recvtype, comm, &request); + return Request::wait(&request, MPI_STATUS_IGNORE); } }