-/* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved. */
+/* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include "smpi_request.hpp"
-#include "mc/mc.h"
#include "private.hpp"
#include "simgrid/Exception.hpp"
#include "simgrid/s4u/ConditionVariable.hpp"
#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
#include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/mc/mc.h"
#include "src/mc/mc_replay.hpp"
#include "src/smpi/include/smpi_actor.hpp"
#include <algorithm>
#include <array>
+#include <mutex> // std::scoped_lock and std::unique_lock
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
static simgrid::config::Flag<double> smpi_test_sleep(
"smpi/test", "Minimum time to inject inside a call to MPI_Test", 1e-4);
-std::vector<s_smpi_factor_t> smpi_ois_values;
-
extern std::function<void(simgrid::kernel::activity::CommImpl*, void*, size_t)> smpi_comm_copy_data_callback;
-namespace simgrid{
-namespace smpi{
+namespace simgrid::smpi {
Request::Request(const void* buf, int count, MPI_Datatype datatype, aid_t src, aid_t dst, int tag, MPI_Comm comm,
unsigned flags, MPI_Op op)
refcount_ = 1;
else
refcount_ = 0;
- message_id_ = 0;
init_buffer(count);
this->add_f();
}
(stype->duplicated_datatype()!=MPI_DATATYPE_NULL && match_types(stype->duplicated_datatype(), rtype)) ||
(rtype->duplicated_datatype()!=MPI_DATATYPE_NULL && match_types(stype, rtype->duplicated_datatype())))
match = true;
- if (!match)
+ if (not match)
XBT_WARN("Mismatched datatypes : sending %s and receiving %s", stype->name().c_str(), rtype->name().c_str());
return match;
}
receiver->truncated_ = true;
}
//0-sized datatypes/counts should not interfere and match
- if ( sender->real_size_ != 0 && receiver->real_size_ != 0 &&
- !match_types(sender->type_, receiver->type_))
+ if (sender->real_size_ != 0 && receiver->real_size_ != 0 && not match_types(sender->type_, receiver->type_))
receiver->unmatched_types_ = true;
if (sender->detached_)
receiver->detached_sender_ = sender; // tie the sender to the receiver, as it is detached and has to be freed in
bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
{
- auto ref = static_cast<MPI_Request>(a);
- auto req = static_cast<MPI_Request>(b);
+ auto* ref = static_cast<MPI_Request>(a);
+ auto* req = static_cast<MPI_Request>(b);
bool match = match_common(req, req, ref);
if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm())
return match;
-
- if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
- ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_) {
+ auto it = std::find(req->message_id_.begin(), req->message_id_.end(), ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
+ ref->comm_->group()->rank(req->dst_), req->tag_));
+ if (it != req->message_id_.end()) {
if (((ref->flags_ & MPI_REQ_PROBE) == 0) && ((req->flags_ & MPI_REQ_PROBE) == 0)) {
+ req->message_id_.erase(it);
XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_,
ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
ref->comm_->group()->rank(req->dst_), req->tag_),
match = false;
req->flags_ &= ~MPI_REQ_MATCHED;
ref->detached_sender_ = nullptr;
- XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, "
+ XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u, "
"from pid %ld to pid %ld, with tag %d",
ref->comm_,
ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
ref->comm_->group()->rank(req->dst_), req->tag_),
- req->message_id_, req->src_, req->dst_, req->tag_);
+ req->src_, req->dst_, req->tag_);
}
return match;
}
bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*)
{
- auto ref = static_cast<MPI_Request>(a);
- auto req = static_cast<MPI_Request>(b);
+ auto* ref = static_cast<MPI_Request>(a);
+ auto* req = static_cast<MPI_Request>(b);
return match_common(req, ref, req);
}
MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
if(dst != MPI_PROC_NULL)
request->start();
return request;
MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
return request;
MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
return request;
source = MPI_ANY_SOURCE;
else if (src != MPI_PROC_NULL)
source = comm->group()->actor(src);
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
- simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
+ simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
if(src != MPI_PROC_NULL)
request->start();
return request;
{
MPI_Request request = irecv(buf, count, datatype, src, tag, comm);
int retval = wait(&request,status);
- request = nullptr;
return retval;
}
void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
if(dst != MPI_PROC_NULL)
request->start();
wait(&request, MPI_STATUS_IGNORE);
- request = nullptr;
}
void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
wait(&request, MPI_STATUS_IGNORE);
- request = nullptr;
}
void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
wait(&request,MPI_STATUS_IGNORE);
- request = nullptr;
}
void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag,
}
}
+void Request::isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag,
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag,
+ MPI_Comm comm, MPI_Request* request)
+{
+ aid_t source = MPI_PROC_NULL;
+ if (src == MPI_ANY_SOURCE)
+ source = MPI_ANY_SOURCE;
+ else if (src != MPI_PROC_NULL)
+ source = comm->group()->actor(src);
+ aid_t destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL;
+
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ src,dst, sendtag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
+ std::vector<MPI_Request> requests;
+ if (aid_t myid = simgrid::s4u::this_actor::get_pid(); (destination == myid) && (source == myid)) {
+ Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
+ return;
+ }
+ requests.push_back(isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm));
+ requests.push_back(irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm));
+ (*request)->start_nbc_requests(requests);
+}
+
void Request::start()
{
s4u::Mailbox* mailbox;
simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
- simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
+ std::unique_lock<s4u::Mutex> mut_lock;
if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
- mut->lock();
+ mut_lock = std::unique_lock(*process->mailboxes_mutex());
bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0);
flags_ |= MPI_REQ_PROBE;
XBT_DEBUG("yes there was something for us in the small mailbox");
}
}
- if(!is_probe)
+ if (not is_probe)
flags_ &= ~MPI_REQ_PROBE;
kernel::actor::CommIrecvSimcall observer{process->get_actor()->get_impl(),
mailbox->get_impl(),
process->replaying() ? &smpi_comm_null_copy_buffer_callback
: smpi_comm_copy_data_callback,
this,
- -1.0};
+ -1.0,
+ process->call_location()->get_call_location()};
observer.set_tag(tag_);
action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
&observer);
XBT_DEBUG("recv simcall posted");
-
- if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
- mut->unlock();
} else { /* the RECV flag was not set, so this is a send */
- const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
xbt_assert(process, "Actor pid=%ld is gone??", dst_);
if (TRACE_smpi_view_internals())
TRACE_smpi_send(src_, src_, dst_, tag_, size_);
this->print_request("New send");
- message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+ message_id_.push_back(comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_));
comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
void* buf = buf_;
- if ((flags_ & MPI_REQ_SSEND) == 0 &&
- ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 ||
- static_cast<int>(size_) < smpi_cfg_detached_send_thresh())) {
- void *oldbuf = nullptr;
+ if ((flags_ & MPI_REQ_SSEND) == 0 && ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 ||
+ static_cast<int>(size_) < smpi_cfg_detached_send_thresh())) {
detached_ = true;
XBT_DEBUG("Send request %p is detached", this);
this->ref();
if (not(type_->flags() & DT_FLAG_DERIVED)) {
- oldbuf = buf_;
+ void* oldbuf = buf_;
if (not process->replaying() && oldbuf != nullptr && size_ != 0) {
if (smpi_switch_data_segment(simgrid::s4u::Actor::by_pid(src_), buf_))
XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
XBT_DEBUG("sending size of %zu : sleep %f ", size_, sleeptime);
}
- simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
-
+ std::unique_lock<s4u::Mutex> mut_lock;
if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
- mut->lock();
+ mut_lock = std::unique_lock(*process->mailboxes_mutex());
if (not(smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
mailbox = process->mailbox();
} else {
XBT_DEBUG("Yes there was something for us in the large mailbox");
}
- if(!is_probe)
+ if (not is_probe)
flags_ &= ~MPI_REQ_PROBE;
} else {
mailbox = process->mailbox();
&xbt_free_f, // how to free the userdata if a detached send fails
process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this,
// detach if msg size < eager/rdv switch limit
- detached_};
+ detached_, process->call_location()->get_call_location()};
observer.set_tag(tag_);
action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
&observer);
boost::static_pointer_cast<kernel::activity::CommImpl>(action_)->set_tracing_category(
smpi_process()->get_tracing_category());
}
-
- if (smpi_cfg_async_small_thresh() != 0 || ((flags_ & MPI_REQ_RMA) != 0))
- mut->unlock();
}
}
if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
try{
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(),
+ process->call_location()->get_call_location()};
*flag = kernel::actor::simcall_answered(
[&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer);
} catch (const Exception&) {
ssize_t i;
try{
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityTestanySimcall observer{issuer, comms};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()};
i = kernel::actor::simcall_answered(
[&observer] {
return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
static int nsleeps = 1;
double speed = s4u::this_actor::get_host()->get_speed();
double maxrate = smpi_cfg_iprobe_cpu_usage();
- auto request =
+ auto* request =
new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source),
simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE);
if (smpi_iprobe_sleep > 0) {
try{
// this is not a detached send
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1,
+ process->call_location()->get_call_location()};
kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); },
&observer);
} catch (const CancelException&) {
if ((*request)->flags_ & MPI_REQ_GENERALIZED) {
if (not((*request)->flags_ & MPI_REQ_COMPLETE)) {
- ((*request)->generalized_funcs)->mutex->lock();
- ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
- ((*request)->generalized_funcs)->mutex->unlock();
+ const std::scoped_lock lock(*(*request)->generalized_funcs->mutex);
+ (*request)->generalized_funcs->cond->wait((*request)->generalized_funcs->mutex);
}
MPI_Status tmp_status;
MPI_Status* mystatus;
ssize_t i;
try{
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1,
+ process->call_location()->get_call_location()};
i = kernel::actor::simcall_blocking(
[&observer] {
kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
int Request::get_status(const Request* req, int* flag, MPI_Status* status)
{
- *flag=0;
-
if(req != MPI_REQUEST_NULL && req->action_ != nullptr) {
req->iprobe(req->comm_->group()->rank(req->src_), req->tag_, req->comm_, flag, status);
if(*flag)
{
if ((not(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr)
return MPI_ERR_REQUEST;
- request->generalized_funcs->mutex->lock();
+ const std::scoped_lock lock(*request->generalized_funcs->mutex);
request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete
request->generalized_funcs->cond->notify_one();
- request->generalized_funcs->mutex->unlock();
return MPI_SUCCESS;
}
{
return nbc_requests_;
}
-}
-}
+} // namespace simgrid::smpi