Instead of a helper process, or of a scheduler as used by MPI implementations, let's play dumb for now and just use a bunch of Isend/Irecv, and just test them all when needed.
pros:
- just .. works ?
- we have the default naive algorithms already implemented for most of the collectives, so in the end we just have to spilt the startall and waitall parts.
cons:
- simple basic algos only.
More advanced ones would need a scheduler with progress management, let's do this later if needed (and MPI implementations have the odd idea that it's fine to advance the scheduler's round only when MPI_Test is called, so we might actually be faster in the end).
MPI_CALL(XBT_PUBLIC int, MPI_Bcast, (void* buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm));
MPI_CALL(XBT_PUBLIC int, MPI_Barrier, (MPI_Comm comm));
+MPI_CALL(XBT_PUBLIC int, MPI_Ibarrier, (MPI_Comm comm, MPI_Request *request));
MPI_CALL(XBT_PUBLIC int, MPI_Gather, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount,
MPI_Datatype recvtype, int root, MPI_Comm comm));
MPI_CALL(XBT_PUBLIC int, MPI_Gatherv, (void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf,
WRAPPED_PMPI_CALL(int,MPI_Group_size,(MPI_Group group, int *size),(group, size))
WRAPPED_PMPI_CALL(int,MPI_Group_translate_ranks,(MPI_Group group1, int n, int *ranks1, MPI_Group group2, int *ranks2),(group1, n, ranks1, group2, ranks2))
WRAPPED_PMPI_CALL(int,MPI_Group_union,(MPI_Group group1, MPI_Group group2, MPI_Group * newgroup),(group1, group2, newgroup))
+WRAPPED_PMPI_CALL(int,MPI_Ibarrier,(MPI_Comm comm, MPI_Request *request),(comm,request))
WRAPPED_PMPI_CALL(int,MPI_Info_create,( MPI_Info *info),( info))
WRAPPED_PMPI_CALL(int,MPI_Info_delete,(MPI_Info info, char *key),(info, key))
WRAPPED_PMPI_CALL(int,MPI_Info_dup,(MPI_Info info, MPI_Info *newinfo),(info, newinfo))
#include "private.hpp"
#include "smpi_coll.hpp"
#include "smpi_comm.hpp"
+#include "smpi_request.hpp"
#include "smpi_datatype_derived.hpp"
#include "smpi_op.hpp"
#include "src/smpi/include/smpi_actor.hpp"
return retval;
}
+int PMPI_Ibarrier(MPI_Comm comm, MPI_Request *request)
+{
+ int retval = 0;
+ smpi_bench_end();
+ if (comm == MPI_COMM_NULL) {
+ retval = MPI_ERR_COMM;
+ } else if(request == nullptr){
+ retval = MPI_ERR_ARG;
+ }else{
+ int rank = simgrid::s4u::this_actor::get_pid();
+ TRACE_smpi_comm_in(rank, __func__, new simgrid::instr::NoOpTIData("ibarrier"));
+ simgrid::smpi::Colls::Ibarrier(comm, request);
+ TRACE_smpi_comm_out(rank);
+ }
+ smpi_bench_begin();
+ return retval;
+}
+
int PMPI_Gather(void *sendbuf, int sendcount, MPI_Datatype sendtype,void *recvbuf, int recvcount, MPI_Datatype recvtype,
int root, MPI_Comm comm)
{
--- /dev/null
+/* Asynchronous parts of the basic collective algorithms, meant to be used both for the naive default implementation, but also for non blocking collectives */
+
+/* Copyright (c) 2009-2019. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "colls_private.hpp"
+#include "src/smpi/include/smpi_actor.hpp"
+
+namespace simgrid{
+namespace smpi{
+
+
+int Colls::Ibarrier(MPI_Comm comm, MPI_Request* request)
+{
+ int i;
+ int size = comm->size();
+ int rank = comm->rank();
+ MPI_Request* requests;
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ rank,rank, COLL_TAG_BARRIER, comm, MPI_REQ_NON_PERSISTENT);
+ (*request)->ref();
+ if (rank > 0) {
+ requests = new MPI_Request[2];
+ requests[0] = Request::isend (nullptr, 0, MPI_BYTE, 0,
+ COLL_TAG_BARRIER,
+ comm);
+ requests[1] = Request::irecv (nullptr, 0, MPI_BYTE, 0,
+ COLL_TAG_BARRIER,
+ comm);
+ (*request)->set_nbc_requests(requests, 2);
+ }
+ else {
+ requests = new MPI_Request[(size-1)*2];
+ for (i = 1; i < 2*size-1; i+=2) {
+ requests[i-1] = Request::irecv(nullptr, 0, MPI_BYTE, MPI_ANY_SOURCE,
+ COLL_TAG_BARRIER, comm
+ );
+ requests[i] = Request::isend(nullptr, 0, MPI_BYTE, (i+1)/2,
+ COLL_TAG_BARRIER,
+ comm
+ );
+ }
+ (*request)->set_nbc_requests(requests, 2*(size-1));
+ }
+ return MPI_SUCCESS;
+}
+
+}
+}
MPI_Datatype recvtype, int root, MPI_Comm comm);
static int scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
static int exscan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
+
+ //async collectives
+ static int Ibarrier(MPI_Comm comm, MPI_Request* request);
static void (*smpi_coll_cleanup_callback)();
};
MPI_Op op_;
int cancelled_;
smpi_mpi_generalized_request_funcs generalized_funcs;
+ MPI_Request* nbc_requests_;
+ int nbc_requests_size_;
public:
Request() = default;
void start();
void cancel();
void ref();
+ void set_nbc_requests(MPI_Request* reqs, int size);
static void finish_wait(MPI_Request* request, MPI_Status* status);
static void unref(MPI_Request* request);
static int wait(MPI_Request* req, MPI_Status* status);
refcount_ = 0;
op_ = MPI_REPLACE;
cancelled_ = 0;
+ generalized_funcs=nullptr;
+ nbc_requests_=nullptr;
+ nbc_requests_size_=0;
}
void Request::ref(){
// multiplier to the sleeptime, to increase speed of execution, each failed test will increase it
static int nsleeps = 1;
int ret = MPI_SUCCESS;
+
+ // are we testing a request meant for non blocking comms ?
+ // If so, test all the subrequests.
+ if ((*request)->nbc_requests_size_>0){
+ ret = testall((*request)->nbc_requests_size_, (*request)->nbc_requests_, flag, MPI_STATUSES_IGNORE);
+ if(*flag){
+ delete[] (*request)->nbc_requests_;
+ (*request)->nbc_requests_size_=0;
+ unref(request);
+ }
+ return ret;
+ }
+
if(smpi_test_sleep > 0)
simcall_process_sleep(nsleeps*smpi_test_sleep);
status->MPI_ERROR = req->truncated_ != 0 ? MPI_ERR_TRUNCATE : MPI_SUCCESS;
// this handles the case were size in receive differs from size in send
status->count = req->real_size_;
+// int flag;
+// Request::get_status(req,&flag,status);
}
req->print_request("Finishing");
return MPI_SUCCESS;
}
+void Request::set_nbc_requests(MPI_Request* reqs, int size){
+ nbc_requests_=reqs;
+ nbc_requests_size_=size;
+}
+
}
}
coll2 coll3 coll4 coll5 coll6 coll7 coll8 coll9 coll10 coll11 coll12 coll13
exscan exscan2
gather gather2 gather_big
- # iallred ibarrier icallgather icallgatherv icallreduce
+ ibarrier
+ # iallred icallgather icallgatherv icallreduce
# icalltoall icalltoallv icalltoallw icbarrier icbcast
# icgather icgatherv icreduce icscatter icscatterv
longuser
iallred 2 mpiversion=3.0
# ibarrier will hang forever if it fails, but will complete quickly if it
# succeeds
-ibarrier 2 mpiversion=3.0 timeLimit=30
+ibarrier 2 timeLimit=30
# run some of the tests, relinked with the nbc_pmpi_adaptor.o file
nballtoall1 8 mpiversion=3.0
src/smpi/colls/reduce/reduce-rab.cpp
src/smpi/colls/scatter/scatter-ompi.cpp
src/smpi/colls/scatter/scatter-mvapich-two-level.cpp
+ src/smpi/colls/smpi_nbc_impl.cpp
src/smpi/colls/smpi_automatic_selector.cpp
src/smpi/colls/smpi_default_selector.cpp
src/smpi/colls/smpi_mpich_selector.cpp