X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/1530fcaf86c1f6ef04bcd8e6163c9f85d1c41abd..c91810ea11e12e016eb88444296bf51d401e6be3:/src/smpi/mpi/smpi_request.cpp diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index c9d1068575..2d6fdda1b0 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -1,11 +1,10 @@ -/* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ #include "smpi_request.hpp" -#include "mc/mc.h" #include "private.hpp" #include "simgrid/Exception.hpp" #include "simgrid/s4u/ConditionVariable.hpp" @@ -19,11 +18,13 @@ #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" #include "src/kernel/actor/SimcallObserver.hpp" +#include "src/mc/mc.h" #include "src/mc/mc_replay.hpp" #include "src/smpi/include/smpi_actor.hpp" #include #include +#include // std::scoped_lock and std::unique_lock XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)"); @@ -180,8 +181,8 @@ void Request::init_buffer(int count){ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) { - auto ref = static_cast(a); - auto req = static_cast(b); + auto* ref = static_cast(a); + auto* req = static_cast(b); bool match = match_common(req, req, ref); if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm()) return match; @@ -215,8 +216,8 @@ bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*) bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*) { - auto ref = static_cast(a); - auto req = static_cast(b); + auto* ref = static_cast(a); + auto* req = static_cast(b); return match_common(req, ref, req); } @@ -322,9 +323,9 @@ MPI_Request Request::irecv_init(void *buf, int count, MPI_Datatype datatype, int MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -332,9 +333,9 @@ MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, i MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -342,9 +343,9 @@ MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, in MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); return request; @@ -357,8 +358,8 @@ MPI_Request Request::irecv(void *buf, int count, MPI_Datatype datatype, int src, source = MPI_ANY_SOURCE; else if (src != MPI_PROC_NULL) source = comm->group()->actor(src); - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source, - simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source, + simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV); if(src != MPI_PROC_NULL) request->start(); return request; @@ -368,43 +369,39 @@ int Request::recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, { MPI_Request request = irecv(buf, count, datatype, src, tag, comm); int retval = wait(&request,status); - request = nullptr; return retval; } void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND); if(dst != MPI_PROC_NULL) request->start(); wait(&request, MPI_STATUS_IGNORE); - request = nullptr; } void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); wait(&request, MPI_STATUS_IGNORE); - request = nullptr; } void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm) { - auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), - dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, - MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND); + auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(), + dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm, + MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND); if(dst != MPI_PROC_NULL) request->start(); wait(&request,MPI_STATUS_IGNORE); - request = nullptr; } void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag, @@ -463,9 +460,9 @@ void Request::start() simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); - simgrid::s4u::MutexPtr mut = process->mailboxes_mutex(); + std::unique_lock mut_lock; if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) - mut->lock(); + mut_lock = std::unique_lock(*process->mailboxes_mutex()); bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0); flags_ |= MPI_REQ_PROBE; @@ -513,18 +510,16 @@ void Request::start() process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, - -1.0}; + -1.0, + process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, &observer); XBT_DEBUG("recv simcall posted"); - - if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) - mut->unlock(); } else { /* the RECV flag was not set, so this is a send */ - const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_)); xbt_assert(process, "Actor pid=%ld is gone??", dst_); if (TRACE_smpi_view_internals()) TRACE_smpi_send(src_, src_, dst_, tag_, size_); @@ -534,15 +529,13 @@ void Request::start() comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_); void* buf = buf_; - if ((flags_ & MPI_REQ_SSEND) == 0 && - ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 || - static_cast(size_) < smpi_cfg_detached_send_thresh())) { - void *oldbuf = nullptr; + if ((flags_ & MPI_REQ_SSEND) == 0 && ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 || + static_cast(size_) < smpi_cfg_detached_send_thresh())) { detached_ = true; XBT_DEBUG("Send request %p is detached", this); this->ref(); if (not(type_->flags() & DT_FLAG_DERIVED)) { - oldbuf = buf_; + void* oldbuf = buf_; if (not process->replaying() && oldbuf != nullptr && size_ != 0) { if (smpi_switch_data_segment(simgrid::s4u::Actor::by_pid(src_), buf_)) XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment "); @@ -574,10 +567,9 @@ void Request::start() XBT_DEBUG("sending size of %zu : sleep %f ", size_, sleeptime); } - simgrid::s4u::MutexPtr mut = process->mailboxes_mutex(); - + std::unique_lock mut_lock; if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) - mut->lock(); + mut_lock = std::unique_lock(*process->mailboxes_mutex()); if (not(smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)) { mailbox = process->mailbox(); @@ -618,7 +610,7 @@ void Request::start() &xbt_free_f, // how to free the userdata if a detached send fails process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, // detach if msg size < eager/rdv switch limit - detached_}; + detached_, process->call_location()->get_call_location()}; observer.set_tag(tag_); action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); @@ -629,9 +621,6 @@ void Request::start() boost::static_pointer_cast(action_)->set_tracing_category( smpi_process()->get_tracing_category()); } - - if (smpi_cfg_async_small_thresh() != 0 || ((flags_ & MPI_REQ_RMA) != 0)) - mut->unlock(); } } @@ -678,7 +667,9 @@ int Request::test(MPI_Request * request, MPI_Status * status, int* flag) { if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){ try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(), + process->call_location()->get_call_location()}; *flag = kernel::actor::simcall_answered( [&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer); } catch (const Exception&) { @@ -767,7 +758,8 @@ int Request::testany(int count, MPI_Request requests[], int *index, int* flag, M ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityTestanySimcall observer{issuer, comms}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()}; i = kernel::actor::simcall_answered( [&observer] { return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities()); @@ -869,7 +861,7 @@ void Request::iprobe(int source, int tag, MPI_Comm comm, int* flag, MPI_Status* static int nsleeps = 1; double speed = s4u::this_actor::get_host()->get_speed(); double maxrate = smpi_cfg_iprobe_cpu_usage(); - auto request = + auto* request = new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source), simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE); if (smpi_iprobe_sleep > 0) { @@ -1083,7 +1075,9 @@ int Request::wait(MPI_Request * request, MPI_Status * status) try{ // this is not a detached send kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1, + process->call_location()->get_call_location()}; kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); }, &observer); } catch (const CancelException&) { @@ -1093,9 +1087,8 @@ int Request::wait(MPI_Request * request, MPI_Status * status) if ((*request)->flags_ & MPI_REQ_GENERALIZED) { if (not((*request)->flags_ & MPI_REQ_COMPLETE)) { - ((*request)->generalized_funcs)->mutex->lock(); - ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex); - ((*request)->generalized_funcs)->mutex->unlock(); + const std::scoped_lock lock(*(*request)->generalized_funcs->mutex); + (*request)->generalized_funcs->cond->wait((*request)->generalized_funcs->mutex); } MPI_Status tmp_status; MPI_Status* mystatus; @@ -1154,7 +1147,9 @@ int Request::waitany(int count, MPI_Request requests[], MPI_Status * status) ssize_t i; try{ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1}; + simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid())); + kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1, + process->call_location()->get_call_location()}; i = kernel::actor::simcall_blocking( [&observer] { kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(), @@ -1331,10 +1326,9 @@ int Request::grequest_complete(MPI_Request request) { if ((not(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr) return MPI_ERR_REQUEST; - request->generalized_funcs->mutex->lock(); + const std::scoped_lock lock(*request->generalized_funcs->mutex); request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete request->generalized_funcs->cond->notify_one(); - request->generalized_funcs->mutex->unlock(); return MPI_SUCCESS; }