From: degomme Date: Thu, 28 Mar 2019 11:12:01 +0000 (+0100) Subject: Test of an MPI_Ibarrier implementation. X-Git-Tag: v3_22~25^2~3 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/18449bfffced07ec0a49297f1eb76c4fe6792895?ds=sidebyside Test of an MPI_Ibarrier implementation. Instead of a helper process, or of a scheduler as used by MPI implementations, let's play dumb for now and just use a bunch of Isend/Irecv, and just test them all when needed. pros: - just .. works ? - we have the default naive algorithms already implemented for most of the collectives, so in the end we just have to spilt the startall and waitall parts. cons: - simple basic algos only. More advanced ones would need a scheduler with progress management, let's do this later if needed (and MPI implementations have the odd idea that it's fine to advance the scheduler's round only when MPI_Test is called, so we might actually be faster in the end). --- diff --git a/include/smpi/smpi.h b/include/smpi/smpi.h index a0835daeab..05683997f5 100644 --- a/include/smpi/smpi.h +++ b/include/smpi/smpi.h @@ -569,6 +569,7 @@ MPI_CALL(XBT_PUBLIC MPI_Fint, MPI_Request_c2f, (MPI_Request request)); MPI_CALL(XBT_PUBLIC int, MPI_Bcast, (void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm)); MPI_CALL(XBT_PUBLIC int, MPI_Barrier, (MPI_Comm comm)); +MPI_CALL(XBT_PUBLIC int, MPI_Ibarrier, (MPI_Comm comm, MPI_Request *request)); MPI_CALL(XBT_PUBLIC int, MPI_Gather, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)); MPI_CALL(XBT_PUBLIC int, MPI_Gatherv, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, diff --git a/src/smpi/bindings/smpi_mpi.cpp b/src/smpi/bindings/smpi_mpi.cpp index 5c905b79d3..fad1c55060 100644 --- a/src/smpi/bindings/smpi_mpi.cpp +++ b/src/smpi/bindings/smpi_mpi.cpp @@ -140,6 +140,7 @@ WRAPPED_PMPI_CALL(int,MPI_Group_rank,(MPI_Group group, int *rank),(group, rank)) WRAPPED_PMPI_CALL(int,MPI_Group_size,(MPI_Group group, int *size),(group, size)) WRAPPED_PMPI_CALL(int,MPI_Group_translate_ranks,(MPI_Group group1, int n, int *ranks1, MPI_Group group2, int *ranks2),(group1, n, ranks1, group2, ranks2)) WRAPPED_PMPI_CALL(int,MPI_Group_union,(MPI_Group group1, MPI_Group group2, MPI_Group * newgroup),(group1, group2, newgroup)) +WRAPPED_PMPI_CALL(int,MPI_Ibarrier,(MPI_Comm comm, MPI_Request *request),(comm,request)) WRAPPED_PMPI_CALL(int,MPI_Info_create,( MPI_Info *info),( info)) WRAPPED_PMPI_CALL(int,MPI_Info_delete,(MPI_Info info, char *key),(info, key)) WRAPPED_PMPI_CALL(int,MPI_Info_dup,(MPI_Info info, MPI_Info *newinfo),(info, newinfo)) diff --git a/src/smpi/bindings/smpi_pmpi_coll.cpp b/src/smpi/bindings/smpi_pmpi_coll.cpp index 93202ebd3f..755c0dbb77 100644 --- a/src/smpi/bindings/smpi_pmpi_coll.cpp +++ b/src/smpi/bindings/smpi_pmpi_coll.cpp @@ -6,6 +6,7 @@ #include "private.hpp" #include "smpi_coll.hpp" #include "smpi_comm.hpp" +#include "smpi_request.hpp" #include "smpi_datatype_derived.hpp" #include "smpi_op.hpp" #include "src/smpi/include/smpi_actor.hpp" @@ -66,6 +67,24 @@ int PMPI_Barrier(MPI_Comm comm) return retval; } +int PMPI_Ibarrier(MPI_Comm comm, MPI_Request *request) +{ + int retval = 0; + smpi_bench_end(); + if (comm == MPI_COMM_NULL) { + retval = MPI_ERR_COMM; + } else if(request == nullptr){ + retval = MPI_ERR_ARG; + }else{ + int rank = simgrid::s4u::this_actor::get_pid(); + TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("ibarrier")); + simgrid::smpi::Colls::Ibarrier(comm, request); + TRACE_smpi_comm_out(rank); + } + smpi_bench_begin(); + return retval; +} + int PMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { diff --git a/src/smpi/colls/smpi_nbc_impl.cpp b/src/smpi/colls/smpi_nbc_impl.cpp new file mode 100644 index 0000000000..3cccebb011 --- /dev/null +++ b/src/smpi/colls/smpi_nbc_impl.cpp @@ -0,0 +1,51 @@ +/* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */ + +/* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "colls_private.hpp" +#include "src/smpi/include/smpi_actor.hpp" + +namespace simgrid{ +namespace smpi{ + + +int Colls::Ibarrier(MPI_Comm comm, MPI_Request* request) +{ + int i; + int size = comm->size(); + int rank = comm->rank(); + MPI_Request* requests; + (*request) = new Request( nullptr, 0, MPI_BYTE, + rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_NON_PERSISTENT); + (*request)->ref(); + if (rank > 0) { + requests = new MPI_Request[2]; + requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0, + COLL_TAG_BARRIER, + comm); + requests[1] = Request::irecv (nullptr, 0, MPI_BYTE, 0, + COLL_TAG_BARRIER, + comm); + (*request)->set_nbc_requests(requests, 2); + } + else { + requests = new MPI_Request[(size-1)*2]; + for (i = 1; i < 2*size-1; i+=2) { + requests[i-1] = Request::irecv(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE, + COLL_TAG_BARRIER, comm + ); + requests[i] = Request::isend(nullptr, 0, MPI_BYTE, (i+1)/2, + COLL_TAG_BARRIER, + comm + ); + } + (*request)->set_nbc_requests(requests, 2*(size-1)); + } + return MPI_SUCCESS; +} + +} +} diff --git a/src/smpi/include/smpi_coll.hpp b/src/smpi/include/smpi_coll.hpp index 0299f6c9e0..245a854f3b 100644 --- a/src/smpi/include/smpi_coll.hpp +++ b/src/smpi/include/smpi_coll.hpp @@ -115,6 +115,9 @@ public: MPI_Datatype recvtype, int root, MPI_Comm comm); static int scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); static int exscan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); + + //async collectives + static int Ibarrier(MPI_Comm comm, MPI_Request* request); static void (*smpi_coll_cleanup_callback)(); }; diff --git a/src/smpi/include/smpi_request.hpp b/src/smpi/include/smpi_request.hpp index e15735375e..3224828b47 100644 --- a/src/smpi/include/smpi_request.hpp +++ b/src/smpi/include/smpi_request.hpp @@ -49,6 +49,8 @@ class Request : public F2C { MPI_Op op_; int cancelled_; smpi_mpi_generalized_request_funcs generalized_funcs; + MPI_Request* nbc_requests_; + int nbc_requests_size_; public: Request() = default; @@ -66,6 +68,7 @@ public: void start(); void cancel(); void ref(); + void set_nbc_requests(MPI_Request* reqs, int size); static void finish_wait(MPI_Request* request, MPI_Status* status); static void unref(MPI_Request* request); static int wait(MPI_Request* req, MPI_Status* status); diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 694a2b18d9..437b892c50 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -71,6 +71,9 @@ Request::Request(void* buf, int count, MPI_Datatype datatype, int src, int dst, refcount_ = 0; op_ = MPI_REPLACE; cancelled_ = 0; + generalized_funcs=nullptr; + nbc_requests_=nullptr; + nbc_requests_size_=0; } void Request::ref(){ @@ -520,6 +523,19 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { // multiplier to the sleeptime, to increase speed of execution, each failed test will increase it static int nsleeps = 1; int ret = MPI_SUCCESS; + + // are we testing a request meant for non blocking comms ? + // If so, test all the subrequests. + if ((*request)->nbc_requests_size_>0){ + ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE); + if(*flag){ + delete[] (*request)->nbc_requests_; + (*request)->nbc_requests_size_=0; + unref(request); + } + return ret; + } + if(smpi_test_sleep > 0) simcall_process_sleep(nsleeps*smpi_test_sleep); @@ -789,6 +805,8 @@ void Request::finish_wait(MPI_Request* request, MPI_Status * status) status->MPI_ERROR = req->truncated_ != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS; // this handles the case were size in receive differs from size in send status->count = req->real_size_; +// int flag; +// Request::get_status(req,&flag,status); } req->print_request("Finishing"); @@ -1110,5 +1128,10 @@ int Request::grequest_complete( MPI_Request request){ return MPI_SUCCESS; } +void Request::set_nbc_requests(MPI_Request* reqs, int size){ + nbc_requests_=reqs; + nbc_requests_size_=size; +} + } } diff --git a/teshsuite/smpi/mpich3-test/coll/CMakeLists.txt b/teshsuite/smpi/mpich3-test/coll/CMakeLists.txt index 1c28008959..cb7d90f118 100644 --- a/teshsuite/smpi/mpich3-test/coll/CMakeLists.txt +++ b/teshsuite/smpi/mpich3-test/coll/CMakeLists.txt @@ -18,7 +18,8 @@ if(enable_smpi AND enable_smpi_MPICH3_testsuite) coll2 coll3 coll4 coll5 coll6 coll7 coll8 coll9 coll10 coll11 coll12 coll13 exscan exscan2 gather gather2 gather_big - # iallred ibarrier icallgather icallgatherv icallreduce + ibarrier + # iallred icallgather icallgatherv icallreduce # icalltoall icalltoallv icalltoallw icbarrier icbcast # icgather icgatherv icreduce icscatter icscatterv longuser diff --git a/teshsuite/smpi/mpich3-test/coll/testlist b/teshsuite/smpi/mpich3-test/coll/testlist index 6158b7a2f9..3281f6070c 100644 --- a/teshsuite/smpi/mpich3-test/coll/testlist +++ b/teshsuite/smpi/mpich3-test/coll/testlist @@ -144,7 +144,7 @@ nonblocking3 10 timeLimit=600 mpiversion=3.0 iallred 2 mpiversion=3.0 # ibarrier will hang forever if it fails, but will complete quickly if it # succeeds -ibarrier 2 mpiversion=3.0 timeLimit=30 +ibarrier 2 timeLimit=30 # run some of the tests, relinked with the nbc_pmpi_adaptor.o file nballtoall1 8 mpiversion=3.0 diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index e5041182cc..e5a6a85eab 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -203,6 +203,7 @@ set(SMPI_SRC src/smpi/colls/reduce/reduce-rab.cpp src/smpi/colls/scatter/scatter-ompi.cpp src/smpi/colls/scatter/scatter-mvapich-two-level.cpp + src/smpi/colls/smpi_nbc_impl.cpp src/smpi/colls/smpi_automatic_selector.cpp src/smpi/colls/smpi_default_selector.cpp src/smpi/colls/smpi_mpich_selector.cpp