From: pini Date: Fri, 29 Jan 2010 15:21:58 +0000 (+0000) Subject: Added some missing MPI calls. X-Git-Tag: SVN~693 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/ae2d6a900ad8864d500f3e6fb63b6074519cb364?ds=sidebyside Added some missing MPI calls. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@7040 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/src/smpi/smpi_mpi.c b/src/smpi/smpi_mpi.c index 2586730253..20ff68f150 100644 --- a/src/smpi/smpi_mpi.c +++ b/src/smpi/smpi_mpi.c @@ -460,8 +460,9 @@ int smpi_mpi_reduce(void *sendbuf, void *recvbuf, int count, xbt_free(tmpbufs[index]); /* FIXME: with the following line, it generates an * [xbt_ex/CRITICAL] Conditional list not empty 162518800. + * Fixed ? */ - // xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); + xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); } xbt_free(requests); xbt_free(tmpbufs); @@ -788,4 +789,220 @@ double SMPI_MPI_Wtime(void) return time; } +int SMPI_MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, + void* recvbuf, int recvcount, MPI_Datatype recvtype, + int root, MPI_Comm comm) +{ + int retval = MPI_SUCCESS; + int system_tag = 666; + int rank, size; + + smpi_bench_end(); + rank = smpi_mpi_comm_rank(comm); + size = comm->size; + if(rank != root) { + // Send buffer to root + smpi_mpi_request_t request; + + retval = smpi_create_request(sendbuf, sendcount, sendtype, + rank, root, system_tag, comm, &request); + smpi_mpi_isend(request); + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); + } else { + // Receive buffers from senders + int src; + smpi_mpi_request_t* requests; + + requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); + for(src = 0; src < size; src++) { + if(src == root) { + // Local copy from root + memcpy(&((char*)recvbuf)[src*recvcount*recvtype->size], + sendbuf, sendcount*sendtype->size*sizeof(char)); + } else { + int index = src < root ? src : src - 1; + retval = smpi_create_request(&((char*)recvbuf)[src*recvcount*recvtype->size], + recvcount, recvtype, src, root, system_tag, + comm, &requests[index]); + if(NULL != requests[index] && MPI_SUCCESS == retval) { + smpi_mpi_irecv(requests[index]); + } + } + } + // Wait for completion of irecv's. + for(src = 0; src < size - 1; src++) { + int index = MPI_UNDEFINED; + smpi_mpi_waitany(size - 1, requests, &index, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); + } + xbt_free(requests); + } + smpi_bench_begin(); + return retval; +} + +int SMPI_MPI_Gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, + void* recvbuf, int* recvcounts, int* displs, MPI_Datatype recvtype, + int root, MPI_Comm comm) +{ + int retval = MPI_SUCCESS; + int system_tag = 666; + int rank, size; + smpi_bench_end(); + rank = smpi_mpi_comm_rank(comm); + size = comm->size; + if(rank != root) { + // Send buffer to root + smpi_mpi_request_t request; + + retval = smpi_create_request(sendbuf, sendcount, sendtype, + rank, root, system_tag, comm, &request); + smpi_mpi_isend(request); + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); + } else { + // Receive buffers from senders + int src; + smpi_mpi_request_t* requests; + + requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); + for(src = 0; src < size; src++) { + if(src == root) { + // Local copy from root + memcpy(&((char*)recvbuf)[displs[src]], + sendbuf, sendcount*sendtype->size*sizeof(char)); + } else { + int index = src < root ? src : src - 1; + retval = smpi_create_request(&((char*)recvbuf)[displs[src]], + recvcounts[src], recvtype, src, root, system_tag, + comm, &requests[index]); + if(NULL != requests[index] && MPI_SUCCESS == retval) { + smpi_mpi_irecv(requests[index]); + } + } + } + // Wait for completion of irecv's. + for(src = 0; src < size - 1; src++) { + int index = MPI_UNDEFINED; + smpi_mpi_waitany(size - 1, requests, &index, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); + } + xbt_free(requests); + } + smpi_bench_begin(); + return retval; +} + +int SMPI_MPI_Scatterv(void* sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, + void* recvbuf, int recvcount, MPI_Datatype recvtype, + int root, MPI_Comm comm) +{ + int retval = MPI_SUCCESS; + int system_tag = 666; + int rank, size; + + smpi_bench_end(); + rank = smpi_mpi_comm_rank(comm); + size = comm->size; + if(rank != root) { + // Receive buffer from root + smpi_mpi_request_t request; + + retval = smpi_create_request(recvbuf, recvcount, recvtype, + root, rank, system_tag, comm, &request); + smpi_mpi_isend(request); + smpi_mpi_wait(request, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, request); + } else { + // Send buffers to receivers + int dst; + smpi_mpi_request_t* requests; + + requests = xbt_malloc((size-1) * sizeof(smpi_mpi_request_t)); + for(dst = 0; dst < size; dst++) { + if(dst == root) { + // Local copy from root + memcpy(recvbuf, &((char*)sendbuf)[displs[dst]], + sendcounts[dst]*sendtype->size*sizeof(char)); + } else { + int index = dst < root ? dst : dst - 1; + retval = smpi_create_request(&((char*)sendbuf)[displs[dst]], sendcounts[dst], sendtype, + root, dst, system_tag, comm, &requests[index]); + if(NULL != requests[index] && MPI_SUCCESS == retval) { + smpi_mpi_isend(requests[index]); + } + } + } + // Wait for completion of isend's. + for(dst = 0; dst < size - 1; dst++) { + int index = MPI_UNDEFINED; + smpi_mpi_waitany(size - 1, requests, &index, MPI_STATUS_IGNORE); + xbt_mallocator_release(smpi_global->request_mallocator, requests[index]); + } + xbt_free(requests); + } + smpi_bench_begin(); + return retval; +} + +int SMPI_MPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts, + MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) +{ + // FIXME: Suboptimal implementation + int retval = MPI_SUCCESS; + int count = 0; + int root = 0; + int i, rank; + int* displs; + + smpi_bench_end(); + rank = smpi_mpi_comm_rank(comm); + displs = xbt_new(int, comm->size); + for(i = 0; i < comm->size; i++) { + count += recvcounts[i]; + displs[i] = 0; + } + retval = smpi_mpi_reduce(sendbuf, recvbuf, count, datatype, op, root, comm); + retval = SMPI_MPI_Scatterv(recvbuf, recvcounts, displs, datatype, recvbuf, recvcounts[rank], datatype, root, comm); + xbt_free(displs); + smpi_bench_begin(); + return retval; +} + +int SMPI_MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, + void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) +{ + // FIXME: Suboptimal implementation + int root = 0; + int retval; + + smpi_bench_end(); + retval = SMPI_MPI_Gather(sendbuf, sendcount, sendtype, + recvbuf, recvcount, recvtype, root, comm); + if(retval == MPI_SUCCESS) { + retval = SMPI_MPI_Bcast(recvbuf, recvcount, recvtype, root, comm); + } + smpi_bench_begin(); + return retval; +} + +int SMPI_MPI_Allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, + void* recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, + MPI_Comm comm) +{ + // FIXME: Suboptimal implementation + int root = 0; + int last, retval; + + smpi_bench_end(); + retval = SMPI_MPI_Gatherv(sendbuf, sendcount, sendtype, + recvbuf, recvcounts, displs, recvtype, root, comm); + if(retval == MPI_SUCCESS) { + last = comm->size - 1; + retval = SMPI_MPI_Bcast(recvbuf, displs[last] + recvcounts[last], recvtype, root, comm); + } + smpi_bench_begin(); + return retval; +}