From: SUTER Frederic Date: Thu, 3 Feb 2022 16:23:46 +0000 (+0100) Subject: make isend and irecv observable (except for irecv in smpi_request.cpp) X-Git-Tag: v3.31~508 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/2b536801d3cfb974714210beed30e109c2cb0f3e make isend and irecv observable (except for irecv in smpi_request.cpp) --- diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index b6ef8334f8..4de3fa3d21 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -10,6 +10,7 @@ #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/activity/MailboxImpl.hpp" +#include "src/kernel/actor/SimcallObserver.hpp" #include "src/kernel/context/Context.hpp" #include "src/kernel/resource/CpuImpl.hpp" #include "src/kernel/resource/LinkImpl.hpp" @@ -272,7 +273,7 @@ void CommImpl::copy_data() } ActivityImplPtr -CommImpl::isend(actor::ActorImpl* src_proc, MailboxImpl* mbox, double task_size, double rate, unsigned char* src_buff, +CommImpl::isend(actor::ActorImpl* sender, MailboxImpl* mbox, double task_size, double rate, unsigned char* src_buff, size_t src_buff_size, bool (*match_fun)(void*, void*, CommImpl*), void (*clean_fun)(void*), // used to free the synchro in case of problem after a detached send void (*copy_data_fun)(CommImpl*, void*, size_t), // used to copy data if not default one @@ -314,11 +315,11 @@ CommImpl::isend(actor::ActorImpl* src_proc, MailboxImpl* mbox, double task_size, other_comm->clean_fun = clean_fun; } else { other_comm->clean_fun = nullptr; - src_proc->activities_.emplace_back(other_comm); + sender->activities_.emplace_back(other_comm); } /* Setup the communication synchro */ - other_comm->src_actor_ = src_proc; + other_comm->src_actor_ = sender; other_comm->src_data_ = data; (*other_comm).set_src_buff(src_buff, src_buff_size).set_size(task_size).set_rate(rate); @@ -330,6 +331,11 @@ CommImpl::isend(actor::ActorImpl* src_proc, MailboxImpl* mbox, double task_size, else other_comm->start(); + if (auto* observer = dynamic_cast(sender->simcall_.observer_)) { + observer->set_result(detached ? nullptr : other_comm); + sender->simcall_answer(); + } + return (detached ? nullptr : other_comm); } @@ -398,6 +404,12 @@ ActivityImplPtr CommImpl::irecv(actor::ActorImpl* receiver, MailboxImpl* mbox, u return other_comm; } other_comm->start(); + + if (auto* observer = dynamic_cast(receiver->simcall_.observer_)) { + observer->set_result(other_comm); + receiver->simcall_answer(); + } + return other_comm; } bool CommImpl::test(actor::ActorImpl* issuer) diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index 1c90aad198..6ae30c06bc 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -219,13 +219,43 @@ Comm* Comm::start() } else if (src_buff_ != nullptr) { // Sender side on_send(*this); - pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_, - clean_fun_, copy_data_function_, get_data(), detached_); + kernel::actor::CommIsendSimcall observer{sender_, + mailbox_->get_impl(), + static_cast(remains_), + rate_, + static_cast(src_buff_), + src_buff_size_, + match_fun_, + clean_fun_, + copy_data_function_, + get_data(), + detached_}; + pimpl_ = kernel::actor::simcall_blocking( + [&observer] { + return kernel::activity::CommImpl::isend( + observer.get_issuer(), observer.get_mailbox(), observer.get_payload_size(), observer.get_rate(), + observer.get_src_buff(), observer.get_src_buff_size(), observer.match_fun_, observer.clean_fun_, + observer.copy_data_fun_, observer.get_payload(), observer.is_detached()); + }, + &observer); } else if (dst_buff_ != nullptr) { // Receiver side xbt_assert(not detached_, "Receive cannot be detached"); on_recv(*this); - pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, - copy_data_function_, get_data(), rate_); + kernel::actor::CommIrecvSimcall observer{receiver_, + mailbox_->get_impl(), + static_cast(dst_buff_), + &dst_buff_size_, + match_fun_, + copy_data_function_, + get_data(), + rate_}; + pimpl_ = kernel::actor::simcall_blocking( + [&observer] { + return kernel::activity::CommImpl::irecv( + observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(), observer.get_dst_buff_size(), + observer.match_fun_, observer.copy_data_fun_, observer.get_payload(), observer.get_rate()); + }, + &observer); } else { xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver"); } diff --git a/src/smpi/mpi/smpi_request.cpp b/src/smpi/mpi/smpi_request.cpp index 8d77e4b1c6..b95e3618dc 100644 --- a/src/smpi/mpi/smpi_request.cpp +++ b/src/smpi/mpi/smpi_request.cpp @@ -510,10 +510,29 @@ void Request::start() } if(!is_probe) flags_ &= ~MPI_REQ_PROBE; - action_ = simcall_comm_irecv( process->get_actor()->get_impl(), mailbox->get_impl(), buf_, &real_size_, &match_recv, process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, -1.0); + + // kernel::actor::CommIrecvSimcall observer{process->get_actor()->get_impl(), + // mailbox->get_impl(), + // static_cast(buf_), + // &real_size_, + // &match_recv, + // process->replaying() ? &smpi_comm_null_copy_buffer_callback + // : smpi_comm_copy_data_callback, + // this, + // -1.0}; + // + // action_ = kernel::actor::simcall_blocking( + // [&observer] { + // return kernel::activity::CommImpl::irecv( + // observer.get_issuer(), observer.get_mailbox(), observer.get_dst_buff(), + // observer.get_dst_buff_size(), observer.match_fun_, observer.copy_data_fun_, observer.get_payload(), + // observer.get_rate()); + // }, + // &observer); + XBT_DEBUG("recv simcall posted"); if (smpi_cfg_async_small_thresh() != 0 || (flags_ & MPI_REQ_RMA) != 0) @@ -609,13 +628,21 @@ void Request::start() } size_t payload_size_ = size_ + 16;//MPI enveloppe size (tag+dest+communicator) - action_ = simcall_comm_isend( - simgrid::kernel::actor::ActorImpl::by_pid(src_), mailbox->get_impl(), payload_size_, -1.0, buf, real_size_, - &match_send, + kernel::actor::CommIsendSimcall observer{ + simgrid::kernel::actor::ActorImpl::by_pid(src_), mailbox->get_impl(), payload_size_, -1, + static_cast(buf), real_size_, &match_send, &xbt_free_f, // how to free the userdata if a detached send fails process->replaying() ? &smpi_comm_null_copy_buffer_callback : smpi_comm_copy_data_callback, this, // detach if msg size < eager/rdv switch limit - detached_); + detached_}; + action_ = kernel::actor::simcall_blocking( + [&observer] { + return kernel::activity::CommImpl::isend( + observer.get_issuer(), observer.get_mailbox(), observer.get_payload_size(), observer.get_rate(), + observer.get_src_buff(), observer.get_src_buff_size(), observer.match_fun_, observer.clean_fun_, + observer.copy_data_fun_, observer.get_payload(), observer.is_detached()); + }, + &observer); XBT_DEBUG("send simcall posted"); /* FIXME: detached sends are not traceable (action_ == nullptr) */