X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/5ed37babb2fa9097abe82df299c0aa259ed84d5a..8bf7ffc43ad5507982e924a7f05bbb13c89965cb:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index fd1a79cbae..64008cd40c 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -5,7 +5,6 @@ #include "smpi_request.hpp" -#include "mc/mc.h" #include "private.hpp" #include "simgrid/Exception.hpp" #include "simgrid/s4u/ConditionVariable.hpp" @@ -19,11 +18,13 @@ #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" #include "src/kernel/actor/SimcallObserver.hpp" +#include "src/mc/mc.h" #include "src/mc/mc_replay.hpp" #include "src/smpi/include/smpi_actor.hpp" #include #include +#include // std::scoped_lock and std::unique_lock XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)"); @@ -68,7 +69,6 @@ Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, a refcount_ = 1; else refcount_ = 0; - message_id_ = 0; init_buffer(count); this->add_f(); } @@ -180,15 +180,16 @@ void Request::init_buffer(int count){ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) { - auto ref = static_cast(a); - auto req = static_cast(b); + auto* ref = static_cast(a); + auto* req = static_cast(b); bool match = match_common(req, req, ref); if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm()) return match; - - if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), - ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_) { + auto it = std::find(req->message_id_.begin(), req->message_id_.end(), ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), + ref->comm_->group()->rank(req->dst_), req->tag_)); + if (it != req->message_id_.end()) { if (((ref->flags_ & MPI_REQ_PROBE) == 0) && ((req->flags_ & MPI_REQ_PROBE) == 0)) { + req->message_id_.erase(it); XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), @@ -203,20 +204,20 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) match = false; req->flags_ &= ~MPI_REQ_MATCHED; ref->detached_sender_ = nullptr; - XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, " + XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u, " "from pid %ld to pid %ld, with tag %d", ref->comm_, ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_), ref->comm_->group()->rank(req->dst_), req->tag_), - req->message_id_, req->src_, req->dst_, req->tag_); + req->src_, req->dst_, req->tag_); } return match; } bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*) { - auto ref = static_cast(a); - auto req = static_cast(b); + auto* ref = static_cast(a); + auto* req = static_cast(b); return match_common(req, ref, req); } @@ -322,9 +323,9 @@ MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -332,9 +333,9 @@ MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, i MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -342,9 +343,9 @@ MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, in MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -357,8 +358,8 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, source = MPI_ANY_SOURCE; else if (src != MPI_PROC_NULL) source = comm->group()->actor(src); - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source, - simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source, + simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV); if(src != MPI_PROC_NULL) request->start(); return request; @@ -368,43 +369,39 @@ int Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, { MPI_Request request = irecv(buf, count, datatype, src, tag, comm); int retval = wait(&request,status); - request = nullptr; return retval; } void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND); if(dst != MPI_PROC_NULL) request->start(); wait(&request, MPI_STATUS_IGNORE); - request = nullptr; } void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); wait(&request, MPI_STATUS_IGNORE); - request = nullptr; } void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); wait(&request,MPI_STATUS_IGNORE); - request = nullptr; } void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag, @@ -442,6 +439,29 @@ void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype } } +void Request::isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag, + void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag, + MPI_Comm comm, MPI_Request* request) +{ + aid_t source = MPI_PROC_NULL; + if (src == MPI_ANY_SOURCE) + source = MPI_ANY_SOURCE; + else if (src != MPI_PROC_NULL) + source = comm->group()->actor(src); + aid_t destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL; + + (*request) = new Request( nullptr, 0, MPI_BYTE, + src,dst, sendtag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC); + std::vector requests; + if (aid_t myid = simgrid::s4u::this_actor::get_pid(); (destination == myid) && (source == myid)) { + Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype); + return; + } + requests.push_back(isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm)); + requests.push_back(irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm)); + (*request)->start_nbc_requests(requests); +} + void Request::start() { s4u::Mailbox* mailbox; @@ -463,9 +483,9 @@ void Request::start() simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); - simgrid::s4u::MutexPtr mut = process->mailboxes_mutex(); + std::unique_lock mut_lock; if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) - mut->lock(); + mut_lock = std::unique_lock(*process->mailboxes_mutex()); bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0); flags_ |= MPI_REQ_PROBE; @@ -513,36 +533,32 @@ void Request::start() process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, - -1.0}; + -1.0, + process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, &observer); XBT_DEBUG("recv simcall posted"); - - if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) - mut->unlock(); } else { /* the RECV flag was not set, so this is a send */ - const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); xbt_assert(process, "Actor pid=%ld is gone??", dst_); if (TRACE_smpi_view_internals()) TRACE_smpi_send(src_, src_, dst_, tag_, size_); this->print_request("New send"); - message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_); + message_id_.push_back(comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_)); comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_); void* buf = buf_; - if ((flags_ & MPI_REQ_SSEND) == 0 && - ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 || - static_cast(size_) < smpi_cfg_detached_send_thresh())) { - void *oldbuf = nullptr; + if ((flags_ & MPI_REQ_SSEND) == 0 && ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 || + static_cast(size_) < smpi_cfg_detached_send_thresh())) { detached_ = true; XBT_DEBUG("Send request %p is detached", this); this->ref(); if (not(type_->flags() & DT_FLAG_DERIVED)) { - oldbuf = buf_; + void* oldbuf = buf_; if (not process->replaying() && oldbuf != nullptr && size_ != 0) { if (smpi_switch_data_segment(simgrid::s4u::Actor::by_pid(src_), buf_)) XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment "); @@ -574,10 +590,9 @@ void Request::start() XBT_DEBUG("sending size of %zu : sleep %f ", size_, sleeptime); } - simgrid::s4u::MutexPtr mut = process->mailboxes_mutex(); - + std::unique_lock mut_lock; if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) - mut->lock(); + mut_lock = std::unique_lock(*process->mailboxes_mutex()); if (not(smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)) { mailbox = process->mailbox(); @@ -618,7 +633,7 @@ void Request::start() &xbt_free_f, // how to free the userdata if a detached send fails process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, // detach if msg size < eager/rdv switch limit - detached_}; + detached_, process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); @@ -629,9 +644,6 @@ void Request::start() boost::static_pointer_cast(action_)->set_tracing_category( smpi_process()->get_tracing_category()); } - - if (smpi_cfg_async_small_thresh() != 0 || ((flags_ & MPI_REQ_RMA) != 0)) - mut->unlock(); } } @@ -678,7 +690,9 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){ try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(), + process->call_location()->get_call_location()}; *flag = kernel::actor::simcall_answered( [&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer); } catch (const Exception&) { @@ -767,7 +781,8 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestanySimcall observer{issuer, comms}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()}; i = kernel::actor::simcall_answered( [&observer] { return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); @@ -869,7 +884,7 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* static int nsleeps = 1; double speed = s4u::this_actor::get_host()->get_speed(); double maxrate = smpi_cfg_iprobe_cpu_usage(); - auto request = + auto* request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source), simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE); if (smpi_iprobe_sleep > 0) { @@ -1083,7 +1098,9 @@ int Request::wait(MPI_Request * request, MPI_Status * status) try{ // this is not a detached send kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1, + process->call_location()->get_call_location()}; kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); }, &observer); } catch (const CancelException&) { @@ -1093,9 +1110,8 @@ int Request::wait(MPI_Request * request, MPI_Status * status) if ((*request)->flags_ & MPI_REQ_GENERALIZED) { if (not((*request)->flags_ & MPI_REQ_COMPLETE)) { - ((*request)->generalized_funcs)->mutex->lock(); - ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex); - ((*request)->generalized_funcs)->mutex->unlock(); + const std::scoped_lock lock(*(*request)->generalized_funcs->mutex); + (*request)->generalized_funcs->cond->wait((*request)->generalized_funcs->mutex); } MPI_Status tmp_status; MPI_Status* mystatus; @@ -1154,7 +1170,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1, + process->call_location()->get_call_location()}; i = kernel::actor::simcall_blocking( [&observer] { kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(), @@ -1331,10 +1349,9 @@ int Request::grequest_complete(MPI_Request request) { if ((not(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr) return MPI_ERR_REQUEST; - request->generalized_funcs->mutex->lock(); + const std::scoped_lock lock(*request->generalized_funcs->mutex); request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete request->generalized_funcs->cond->notify_one(); - request->generalized_funcs->mutex->unlock(); return MPI_SUCCESS; }