X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/7f00b09c7ebfa3b4e12c96c764ee7a0e0e07ec20..1abe27c3efd3ba83f6b0da4f391a1f29e053c588:/src/s4u/s4u_Comm.cpp diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index dcacfd0b3e..4b7030e994 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -10,10 +10,10 @@ #include #include -#include "mc/mc.h" #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" #include "src/kernel/actor/SimcallObserver.hpp" +#include "src/mc/mc.h" #include "src/mc/mc_replay.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications"); @@ -28,7 +28,8 @@ CommPtr Comm::set_copy_data_callback(const std::functiondst_buff_, buff, buff_size); @@ -39,7 +40,8 @@ void Comm::copy_buffer_callback(kernel::activity::CommImpl* comm, void* buff, si } } -void Comm::copy_pointer_callback(kernel::activity::CommImpl* comm, void* buff, size_t buff_size) +void Comm::copy_pointer_callback(kernel::activity::CommImpl* comm, void* buff, + size_t buff_size) // XBT_ATTRIB_DEPRECATED_v338 { xbt_assert((buff_size == sizeof(void*)), "Cannot copy %zu bytes: must be sizeof(void*)", buff_size); *(void**)(comm->dst_buff_) = buff; @@ -76,12 +78,13 @@ void Comm::send(kernel::actor::ActorImpl* sender, const Mailbox* mbox, double ta simgrid::kernel::activity::ActivityImplPtr comm = nullptr; simgrid::kernel::actor::CommIsendSimcall send_observer{ - sender, mbox->get_impl(), task_size, rate, static_cast(src_buff), src_buff_size, match_fun, - nullptr, copy_data_fun, data, false}; + sender, mbox->get_impl(), task_size, rate, static_cast(src_buff), + src_buff_size, match_fun, nullptr, copy_data_fun, data, + false, "Isend"}; comm = simgrid::kernel::actor::simcall_answered( [&send_observer] { return simgrid::kernel::activity::CommImpl::isend(&send_observer); }, &send_observer); - if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout}; + if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{sender, comm.get(), timeout, "Wait"}; simgrid::kernel::actor::simcall_blocking( [&wait_observer] { wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout()); @@ -93,7 +96,7 @@ void Comm::send(kernel::actor::ActorImpl* sender, const Mailbox* mbox, double ta } else { simgrid::kernel::actor::CommIsendSimcall observer(sender, mbox->get_impl(), task_size, rate, static_cast(src_buff), src_buff_size, match_fun, - nullptr, copy_data_fun, data, false); + nullptr, copy_data_fun, data, false, "Isend"); simgrid::kernel::actor::simcall_blocking([&observer, timeout] { simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::isend(&observer); comm->wait_for(observer.get_issuer(), timeout); @@ -120,11 +123,12 @@ void Comm::recv(kernel::actor::ActorImpl* receiver, const Mailbox* mbox, void* d match_fun, copy_data_fun, data, - rate}; + rate, + "Irecv"}; comm = simgrid::kernel::actor::simcall_answered( [&observer] { return simgrid::kernel::activity::CommImpl::irecv(&observer); }, &observer); - if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout}; + if (simgrid::kernel::actor::ActivityWaitSimcall wait_observer{receiver, comm.get(), timeout, "wait"}; simgrid::kernel::actor::simcall_blocking( [&wait_observer] { wait_observer.get_activity()->wait_for(wait_observer.get_issuer(), wait_observer.get_timeout()); @@ -135,7 +139,7 @@ void Comm::recv(kernel::actor::ActorImpl* receiver, const Mailbox* mbox, void* d comm = nullptr; } else { simgrid::kernel::actor::CommIrecvSimcall observer(receiver, mbox->get_impl(), static_cast(dst_buff), - dst_buff_size, match_fun, copy_data_fun, data, rate); + dst_buff_size, match_fun, copy_data_fun, data, rate, "Irecv"); simgrid::kernel::actor::simcall_blocking([&observer, timeout] { simgrid::kernel::activity::ActivityImplPtr comm = simgrid::kernel::activity::CommImpl::irecv(&observer); comm->wait_for(observer.get_issuer(), timeout); @@ -287,6 +291,14 @@ Actor* Comm::get_sender() const return sender ? sender->get_ciface() : nullptr; } +Actor* Comm::get_receiver() const +{ + kernel::actor::ActorImplPtr receiver = nullptr; + if (pimpl_) + receiver = boost::static_pointer_cast(pimpl_)->dst_actor_; + return receiver ? receiver->get_ciface() : nullptr; +} + bool Comm::is_assigned() const { return (pimpl_ && boost::static_pointer_cast(pimpl_)->is_assigned()) || @@ -306,8 +318,11 @@ Comm* Comm::do_start() pimpl_->set_state(kernel::activity::State::READY); boost::static_pointer_cast(pimpl_)->start(); }); + fire_on_start(); + fire_on_this_start(); } else if (src_buff_ != nullptr) { // Sender side on_send(*this); + on_this_send(*this); kernel::actor::CommIsendSimcall observer{sender_, mailbox_->get_impl(), remains_, @@ -318,12 +333,14 @@ Comm* Comm::do_start() clean_fun_, copy_data_function_, get_data(), - detached_}; + detached_, + "Isend"}; pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer); } else if (dst_buff_ != nullptr) { // Receiver side xbt_assert(not detached_, "Receive cannot be detached"); on_recv(*this); + on_this_recv(*this); kernel::actor::CommIrecvSimcall observer{receiver_, mailbox_->get_impl(), static_cast(dst_buff_), @@ -331,7 +348,8 @@ Comm* Comm::do_start() match_fun_, copy_data_function_, get_data(), - rate_}; + rate_, + "Irecv"}; pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, &observer); } else { @@ -344,6 +362,11 @@ Comm* Comm::do_start() if (not detached_) { pimpl_->set_iface(this); pimpl_->set_actor(sender_); + // Only throw the signal when both sides are here and the status is READY + if (pimpl_->get_state() != kernel::activity::State::WAITING) { + fire_on_start(); + fire_on_this_start(); + } } state_ = State::STARTED; @@ -389,11 +412,13 @@ Comm* Comm::wait_for(double timeout) return start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls } else if (src_buff_ != nullptr) { on_send(*this); + on_this_send(*this); send(sender_, mailbox_, remains_, rate_, src_buff_, src_buff_size_, match_fun_, copy_data_function_, get_data(), timeout); } else { // Receiver on_recv(*this); + on_this_recv(*this); recv(receiver_, mailbox_, dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_, get_data(), timeout, rate_); } @@ -401,7 +426,7 @@ Comm* Comm::wait_for(double timeout) case State::STARTED: try { issuer = kernel::actor::ActorImpl::self(); - kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout}; + kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"}; if (kernel::actor::simcall_blocking( [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); }, &observer)) { @@ -447,7 +472,7 @@ ssize_t Comm::wait_any_for(const std::vector& comms, double timeout) void Comm::wait_all(const std::vector& comms) { // TODO: this should be a simcall or something - for (auto& comm : comms) + for (const auto& comm : comms) comm->wait(); } @@ -473,6 +498,11 @@ size_t Comm::wait_all_for(const std::vector& comms, double timeout) } } // namespace simgrid::s4u /* **************************** Public C interface *************************** */ +int sg_comm_isinstance(sg_activity_t acti) +{ + return dynamic_cast(acti) != nullptr; +} + void sg_comm_detach(sg_comm_t comm, void (*clean_function)(void*)) { comm->detach(clean_function);