#include "smpi_request.hpp"
-#include "mc/mc.h"
#include "private.hpp"
#include "simgrid/Exception.hpp"
#include "simgrid/s4u/ConditionVariable.hpp"
#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
#include "src/kernel/actor/SimcallObserver.hpp"
+#include "src/mc/mc.h"
#include "src/mc/mc_replay.hpp"
#include "src/smpi/include/smpi_actor.hpp"
#include <algorithm>
#include <array>
+#include <mutex> // std::scoped_lock and std::unique_lock
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(smpi_request, smpi, "Logging specific to SMPI (request)");
refcount_ = 1;
else
refcount_ = 0;
- message_id_ = 0;
init_buffer(count);
this->add_f();
}
bool Request::match_recv(void* a, void* b, simgrid::kernel::activity::CommImpl*)
{
- auto ref = static_cast<MPI_Request>(a);
- auto req = static_cast<MPI_Request>(b);
+ auto* ref = static_cast<MPI_Request>(a);
+ auto* req = static_cast<MPI_Request>(b);
bool match = match_common(req, req, ref);
if (not match || ref->comm_ == MPI_COMM_UNINITIALIZED || ref->comm_->is_smp_comm())
return match;
-
- if (ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
- ref->comm_->group()->rank(req->dst_), req->tag_) == req->message_id_) {
+ auto it = std::find(req->message_id_.begin(), req->message_id_.end(), ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
+ ref->comm_->group()->rank(req->dst_), req->tag_));
+ if (it != req->message_id_.end()) {
if (((ref->flags_ & MPI_REQ_PROBE) == 0) && ((req->flags_ & MPI_REQ_PROBE) == 0)) {
+ req->message_id_.erase(it);
XBT_DEBUG("increasing count in comm %p, which was %u from pid %ld, to pid %ld with tag %d", ref->comm_,
ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
ref->comm_->group()->rank(req->dst_), req->tag_),
match = false;
req->flags_ &= ~MPI_REQ_MATCHED;
ref->detached_sender_ = nullptr;
- XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u != %u, "
+ XBT_DEBUG("Refusing to match message, as its ID is not the one I expect. in comm %p, %u, "
"from pid %ld to pid %ld, with tag %d",
ref->comm_,
ref->comm_->get_received_messages_count(ref->comm_->group()->rank(req->src_),
ref->comm_->group()->rank(req->dst_), req->tag_),
- req->message_id_, req->src_, req->dst_, req->tag_);
+ req->src_, req->dst_, req->tag_);
}
return match;
}
bool Request::match_send(void* a, void* b, simgrid::kernel::activity::CommImpl*)
{
- auto ref = static_cast<MPI_Request>(a);
- auto req = static_cast<MPI_Request>(b);
+ auto* ref = static_cast<MPI_Request>(a);
+ auto* req = static_cast<MPI_Request>(b);
return match_common(req, ref, req);
}
MPI_Request Request::ibsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND | MPI_REQ_BSEND);
if(dst != MPI_PROC_NULL)
request->start();
return request;
MPI_Request Request::isend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
return request;
MPI_Request Request::issend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_ISEND | MPI_REQ_SSEND | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
return request;
source = MPI_ANY_SOURCE;
else if (src != MPI_PROC_NULL)
source = comm->group()->actor(src);
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
- simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, source,
+ simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_NON_PERSISTENT | MPI_REQ_RECV);
if(src != MPI_PROC_NULL)
request->start();
return request;
{
MPI_Request request = irecv(buf, count, datatype, src, tag, comm);
int retval = wait(&request,status);
- request = nullptr;
return retval;
}
void Request::bsend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND | MPI_REQ_BSEND);
if(dst != MPI_PROC_NULL)
request->start();
wait(&request, MPI_STATUS_IGNORE);
- request = nullptr;
}
void Request::send(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
wait(&request, MPI_STATUS_IGNORE);
- request = nullptr;
}
void Request::ssend(const void *buf, int count, MPI_Datatype datatype, int dst, int tag, MPI_Comm comm)
{
- auto request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
- dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
- MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
+ auto* request = new Request(buf == MPI_BOTTOM ? nullptr : buf, count, datatype, simgrid::s4u::this_actor::get_pid(),
+ dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL, tag, comm,
+ MPI_REQ_NON_PERSISTENT | MPI_REQ_SSEND | MPI_REQ_SEND);
if(dst != MPI_PROC_NULL)
request->start();
wait(&request,MPI_STATUS_IGNORE);
- request = nullptr;
}
void Request::sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag,
}
}
+void Request::isendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,int dst, int sendtag,
+ void *recvbuf, int recvcount, MPI_Datatype recvtype, int src, int recvtag,
+ MPI_Comm comm, MPI_Request* request)
+{
+ aid_t source = MPI_PROC_NULL;
+ if (src == MPI_ANY_SOURCE)
+ source = MPI_ANY_SOURCE;
+ else if (src != MPI_PROC_NULL)
+ source = comm->group()->actor(src);
+ aid_t destination = dst != MPI_PROC_NULL ? comm->group()->actor(dst) : MPI_PROC_NULL;
+
+ (*request) = new Request( nullptr, 0, MPI_BYTE,
+ src,dst, sendtag, comm, MPI_REQ_PERSISTENT|MPI_REQ_NBC);
+ std::vector<MPI_Request> requests;
+ if (aid_t myid = simgrid::s4u::this_actor::get_pid(); (destination == myid) && (source == myid)) {
+ Datatype::copy(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype);
+ return;
+ }
+ requests.push_back(isend_init(sendbuf, sendcount, sendtype, dst, sendtag, comm));
+ requests.push_back(irecv_init(recvbuf, recvcount, recvtype, src, recvtag, comm));
+ (*request)->start_nbc_requests(requests);
+}
+
void Request::start()
{
s4u::Mailbox* mailbox;
simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
- simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
+ std::unique_lock<s4u::Mutex> mut_lock;
if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
- mut->lock();
+ mut_lock = std::unique_lock(*process->mailboxes_mutex());
bool is_probe = ((flags_ & MPI_REQ_PROBE) != 0);
flags_ |= MPI_REQ_PROBE;
process->replaying() ? &smpi_comm_null_copy_buffer_callback
: smpi_comm_copy_data_callback,
this,
- -1.0};
+ -1.0,
+ process->call_location()->get_call_location()};
observer.set_tag(tag_);
action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); },
&observer);
XBT_DEBUG("recv simcall posted");
-
- if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
- mut->unlock();
} else { /* the RECV flag was not set, so this is a send */
- const simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(dst_));
xbt_assert(process, "Actor pid=%ld is gone??", dst_);
if (TRACE_smpi_view_internals())
TRACE_smpi_send(src_, src_, dst_, tag_, size_);
this->print_request("New send");
- message_id_=comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
+ message_id_.push_back(comm_->get_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_));
comm_->increment_sent_messages_count(comm_->group()->rank(src_), comm_->group()->rank(dst_), tag_);
void* buf = buf_;
- if ((flags_ & MPI_REQ_SSEND) == 0 &&
- ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 ||
- static_cast<int>(size_) < smpi_cfg_detached_send_thresh())) {
- void *oldbuf = nullptr;
+ if ((flags_ & MPI_REQ_SSEND) == 0 && ((flags_ & MPI_REQ_RMA) != 0 || (flags_ & MPI_REQ_BSEND) != 0 ||
+ static_cast<int>(size_) < smpi_cfg_detached_send_thresh())) {
detached_ = true;
XBT_DEBUG("Send request %p is detached", this);
this->ref();
if (not(type_->flags() & DT_FLAG_DERIVED)) {
- oldbuf = buf_;
+ void* oldbuf = buf_;
if (not process->replaying() && oldbuf != nullptr && size_ != 0) {
if (smpi_switch_data_segment(simgrid::s4u::Actor::by_pid(src_), buf_))
XBT_DEBUG("Privatization : We are sending from a zone inside global memory. Switch data segment ");
XBT_DEBUG("sending size of %zu : sleep %f ", size_, sleeptime);
}
- simgrid::s4u::MutexPtr mut = process->mailboxes_mutex();
-
+ std::unique_lock<s4u::Mutex> mut_lock;
if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)
- mut->lock();
+ mut_lock = std::unique_lock(*process->mailboxes_mutex());
if (not(smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0)) {
mailbox = process->mailbox();
&xbt_free_f, // how to free the userdata if a detached send fails
process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this,
// detach if msg size < eager/rdv switch limit
- detached_};
+ detached_, process->call_location()->get_call_location()};
observer.set_tag(tag_);
action_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); },
&observer);
boost::static_pointer_cast<kernel::activity::CommImpl>(action_)->set_tracing_category(
smpi_process()->get_tracing_category());
}
-
- if (smpi_cfg_async_small_thresh() != 0 || ((flags_ & MPI_REQ_RMA) != 0))
- mut->unlock();
}
}
if ((*request)->action_ != nullptr && ((*request)->flags_ & MPI_REQ_CANCELLED) == 0){
try{
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get()};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityTestSimcall observer{issuer, (*request)->action_.get(),
+ process->call_location()->get_call_location()};
*flag = kernel::actor::simcall_answered(
[&observer] { return observer.get_activity()->test(observer.get_issuer()); }, &observer);
} catch (const Exception&) {
ssize_t i;
try{
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityTestanySimcall observer{issuer, comms};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityTestanySimcall observer{issuer, comms, process->call_location()->get_call_location()};
i = kernel::actor::simcall_answered(
[&observer] {
return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
static int nsleeps = 1;
double speed = s4u::this_actor::get_host()->get_speed();
double maxrate = smpi_cfg_iprobe_cpu_usage();
- auto request =
+ auto* request =
new Request(nullptr, 0, MPI_CHAR, source == MPI_ANY_SOURCE ? MPI_ANY_SOURCE : comm->group()->actor(source),
simgrid::s4u::this_actor::get_pid(), tag, comm, MPI_REQ_PERSISTENT | MPI_REQ_RECV | MPI_REQ_PROBE);
if (smpi_iprobe_sleep > 0) {
try{
// this is not a detached send
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityWaitSimcall observer{issuer, (*request)->action_.get(), -1,
+ process->call_location()->get_call_location()};
kernel::actor::simcall_blocking([issuer, &observer] { observer.get_activity()->wait_for(issuer, -1); },
&observer);
} catch (const CancelException&) {
if ((*request)->flags_ & MPI_REQ_GENERALIZED) {
if (not((*request)->flags_ & MPI_REQ_COMPLETE)) {
- ((*request)->generalized_funcs)->mutex->lock();
- ((*request)->generalized_funcs)->cond->wait(((*request)->generalized_funcs)->mutex);
- ((*request)->generalized_funcs)->mutex->unlock();
+ const std::scoped_lock lock(*(*request)->generalized_funcs->mutex);
+ (*request)->generalized_funcs->cond->wait((*request)->generalized_funcs->mutex);
}
MPI_Status tmp_status;
MPI_Status* mystatus;
ssize_t i;
try{
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
- kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1};
+ simgrid::smpi::ActorExt* process = smpi_process_remote(simgrid::s4u::Actor::by_pid(issuer->get_pid()));
+ kernel::actor::ActivityWaitanySimcall observer{issuer, comms, -1,
+ process->call_location()->get_call_location()};
i = kernel::actor::simcall_blocking(
[&observer] {
kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
{
if ((not(request->flags_ & MPI_REQ_GENERALIZED)) || request->generalized_funcs->mutex == nullptr)
return MPI_ERR_REQUEST;
- request->generalized_funcs->mutex->lock();
+ const std::scoped_lock lock(*request->generalized_funcs->mutex);
request->flags_ |= MPI_REQ_COMPLETE; // in case wait would be called after complete
request->generalized_funcs->cond->notify_one();
- request->generalized_funcs->mutex->unlock();
return MPI_SUCCESS;
}