From: SUTER Frederic Date: Wed, 20 Oct 2021 09:59:41 +0000 (+0200) Subject: introduce Comm::on_start and Comm::on_completion X-Git-Tag: v3.30~322 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/fa07ae68a2fa2f0a3e55001bb572ff9a353e48f2 introduce Comm::on_start and Comm::on_completion + Consistency with other activities + Potential replacement to the unnatural Link::on_communicate + Use these signals in the link plugins (energy and load) + update doc --- diff --git a/docs/source/Plugins.rst b/docs/source/Plugins.rst index 154417cc7a..2530e77f5f 100644 --- a/docs/source/Plugins.rst +++ b/docs/source/Plugins.rst @@ -84,6 +84,10 @@ Partial list of existing signals in s4u: :cpp:member:`Engine::on_time_advance ` :cpp:member:`Engine::on_simulation_end ` :cpp:member:`Engine::on_deadlock ` +- :cpp:member:`Comm::on_start ` + :cpp:member:`Comm::on_completion ` +- :cpp:member:`Exec::on_start ` + :cpp:member:`Exec::on_completion ` - :cpp:member:`Exec::on_start ` :cpp:member:`Exec::on_completion ` - :cpp:member:`Host::on_creation ` diff --git a/src/kernel/activity/CommImpl.cpp b/src/kernel/activity/CommImpl.cpp index ba477b802d..28b501498b 100644 --- a/src/kernel/activity/CommImpl.cpp +++ b/src/kernel/activity/CommImpl.cpp @@ -226,6 +226,8 @@ void SIMIX_comm_copy_pointer_callback(simgrid::kernel::activity::CommImpl* comm, namespace simgrid { namespace kernel { namespace activity { +xbt::signal CommImpl::on_start; +xbt::signal CommImpl::on_completion; void (*CommImpl::copy_data_callback_)(CommImpl*, void*, size_t) = &s4u::Comm::copy_pointer_callback; @@ -300,7 +302,7 @@ CommImpl* CommImpl::start() if (state_ == State::READY) { from_ = from_ != nullptr ? from_ : src_actor_->get_host(); to_ = to_ != nullptr ? to_ : dst_actor_->get_host(); - + on_start(*this); /* Getting the network_model from the origin host * Valid while we have a single network model, otherwise we would need to change this function to first get the * routes and later create the respective surf actions */ @@ -341,6 +343,16 @@ CommImpl* CommImpl::start() return this; } +std::vector CommImpl::get_traversed_links() const +{ + xbt_assert(state_ != State::WAITING, "You cannot use %s() if your communication is not ready (%s)", __FUNCTION__, + get_state_str()); + std::vector vlinks; + XBT_ATTRIB_UNUSED double res = 0; + from_->route_to(to_, vlinks, &res); + return vlinks; +} + /** @brief Copy the communication data from the sender's buffer to the receiver's one */ void CommImpl::copy_data() { @@ -519,6 +531,8 @@ void CommImpl::cleanup_surf() void CommImpl::post() { + on_completion(*this); + /* Update synchro state */ if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED) state_ = State::SRC_TIMEOUT; @@ -546,7 +560,6 @@ void CommImpl::post() void CommImpl::finish() { XBT_DEBUG("CommImpl::finish() in state %s", to_c_str(state_)); - /* If the synchro is still in a rendez-vous point then remove from it */ if (mbox_) mbox_->remove(this); diff --git a/src/kernel/activity/CommImpl.hpp b/src/kernel/activity/CommImpl.hpp index 4221a89536..0334947bf0 100644 --- a/src/kernel/activity/CommImpl.hpp +++ b/src/kernel/activity/CommImpl.hpp @@ -46,6 +46,7 @@ public: MailboxImpl* get_mailbox() const { return mbox_; } bool detached() const { return detached_; } + std::vector get_traversed_links() const; void copy_data(); bool test() override; @@ -88,6 +89,9 @@ expectations of the other side, too. See */ void* src_data_ = nullptr; /* User data associated to the communication */ void* dst_data_ = nullptr; + + static xbt::signal on_start; + static xbt::signal on_completion; }; } // namespace activity } // namespace kernel diff --git a/src/plugins/link_energy.cpp b/src/plugins/link_energy.cpp index 636b8bf23e..37366fe9c0 100644 --- a/src/plugins/link_energy.cpp +++ b/src/plugins/link_energy.cpp @@ -7,8 +7,8 @@ #include "simgrid/host.h" #include "simgrid/plugins/energy.h" #include "simgrid/s4u/Engine.hpp" -#include "simgrid/simix.hpp" -#include "src/surf/network_interface.hpp" +#include "simgrid/s4u/Link.hpp" +#include "src/kernel/activity/CommImpl.hpp" #include "src/surf/surf_interface.hpp" #include @@ -147,20 +147,6 @@ double LinkEnergy::get_consumed_energy() using simgrid::plugin::LinkEnergy; /* **************************** events callback *************************** */ -static void on_communicate(const simgrid::kernel::resource::NetworkAction& action) -{ - XBT_DEBUG("onCommunicate is called"); - for (auto const* link : action.get_links()) { - if (link == nullptr || link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) - continue; - - XBT_DEBUG("Update link %s", link->get_cname()); - auto* link_energy = link->get_iface()->extension(); - link_energy->init_watts_range_list(); - link_energy->update(); - } -} - static void on_simulation_end() { std::vector links = simgrid::s4u::Engine::get_instance()->get_all_links(); @@ -176,6 +162,16 @@ static void on_simulation_end() XBT_INFO("Total energy over all links: %f", total_energy); } + +static void on_communication(const simgrid::kernel::activity::CommImpl& comm) +{ + for (auto const* link : comm.get_traversed_links()) { + if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) { + XBT_DEBUG("Update %s on Comm Start/End", link->get_cname()); + link->extension()->update(); + } + } +} /* **************************** Public interface *************************** */ int sg_link_energy_is_inited() @@ -215,16 +211,9 @@ void sg_link_energy_plugin_init() link.extension()->get_consumed_energy()); }); - simgrid::s4u::Link::on_communication_state_change.connect( - [](simgrid::kernel::resource::NetworkAction const& action, - simgrid::kernel::resource::Action::State /* previous */) { - for (auto const* link : action.get_links()) { - if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) - link->get_iface()->extension()->update(); - } - }); + simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication); + simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication); - simgrid::s4u::Link::on_communicate.connect(&on_communicate); simgrid::s4u::Engine::on_simulation_end.connect(&on_simulation_end); } diff --git a/src/plugins/link_energy_wifi.cpp b/src/plugins/link_energy_wifi.cpp index acd2dc56d4..2ccc71bc76 100644 --- a/src/plugins/link_energy_wifi.cpp +++ b/src/plugins/link_energy_wifi.cpp @@ -7,6 +7,7 @@ #include #include +#include "src/kernel/activity/CommImpl.hpp" #include "src/surf/network_interface.hpp" #include "src/surf/network_wifi.hpp" @@ -61,7 +62,7 @@ public: LinkEnergyWifi() = delete; /** Update the energy consumed by link_ when transmissions start or end */ - void update(const simgrid::kernel::resource::NetworkAction &); + void update(); /** Update the energy consumed when link_ is destroyed */ void update_destroy(); @@ -110,7 +111,7 @@ void LinkEnergyWifi::update_destroy() XBT_DEBUG("finish eStat_ += %f * %f * (%d+1) | eStat = %f", duration, pIdle_, wifi_link->get_host_count(), eStat_); } -void LinkEnergyWifi::update(const kernel::resource::NetworkAction&) +void LinkEnergyWifi::update() { init_watts_range_list(); @@ -263,6 +264,17 @@ void LinkEnergyWifi::init_watts_range_list() } // namespace simgrid using simgrid::plugin::LinkEnergyWifi; +/* **************************** events callback *************************** */ +static void on_communication(const simgrid::kernel::activity::CommImpl& comm) +{ + for (auto* link : comm.get_traversed_links()) { + if (link != nullptr && link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) { + auto* link_energy = link->extension(); + XBT_DEBUG("Update %s on Comm Start/End", link->get_cname()); + link_energy->update(); + } + } +} void sg_wifi_energy_plugin_init() { @@ -274,10 +286,10 @@ void sg_wifi_energy_plugin_init() /** * Attaching to events: - * - on_creation to initialize the plugin - * - on_destruction to produce final energy results - * - on_communication_state_change: to account the energy when communications are updated - * - on_communicate: '' + * - Link::on_creation to initialize the plugin + * - Link::on_destruction to produce final energy results + * - Link::on_communication_state_change: to account for the energy when communications are updated + * - Comm::on_start and Comm::on_completion: to account for the energy during communications */ simgrid::s4u::Link::on_creation.connect([](simgrid::s4u::Link& link) { // verify the link is appropriate to WiFi energy computations @@ -308,23 +320,11 @@ void sg_wifi_energy_plugin_init() // update WiFi links encountered during the communication for (auto const* link : action.get_links()) { if (link != nullptr && link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) { - link->get_iface()->extension()->update(action); + link->get_iface()->extension()->update(); } } }); - simgrid::s4u::Link::on_communicate.connect([](const simgrid::kernel::resource::NetworkAction& action) { - auto const* actionWifi = dynamic_cast(&action); - - if (actionWifi == nullptr) - return; - - auto const* link_src = actionWifi->get_src_link(); - auto const* link_dst = actionWifi->get_dst_link(); - - if(link_src != nullptr) - link_src->get_iface()->extension()->update(action); - if(link_dst != nullptr) - link_dst->get_iface()->extension()->update(action); - }); + simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication); + simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication); } diff --git a/src/plugins/link_load.cpp b/src/plugins/link_load.cpp index 719b96afe3..c2df9c868a 100644 --- a/src/plugins/link_load.cpp +++ b/src/plugins/link_load.cpp @@ -6,6 +6,7 @@ #include #include +#include "src/kernel/activity/CommImpl.hpp" #include "src/surf/network_interface.hpp" #include @@ -161,16 +162,14 @@ double LinkLoad::get_average_bytes() using simgrid::plugin::LinkLoad; /* **************************** events callback *************************** */ -static void on_communicate(const simgrid::kernel::resource::NetworkAction& action) +static void on_communication(const simgrid::kernel::activity::CommImpl& comm) { - XBT_DEBUG("on_communicate is called"); - for (auto* link : action.get_links()) { - if (link == nullptr || link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) - continue; - - auto link_load = link->get_iface()->extension(); - if (link_load->is_tracked()) { - link_load->update(); + for (auto* link : comm.get_traversed_links()) { + if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) { + auto* link_load = link->extension(); + XBT_DEBUG("Update %s on Comm Start/End", link->get_cname()); + if (link_load->is_tracked()) + link_load->update(); } } } @@ -200,7 +199,9 @@ void sg_link_load_plugin_init() }); // Call this plugin on some of the links' events. - simgrid::s4u::Link::on_communicate.connect(&on_communicate); + simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication); + simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication); + simgrid::s4u::Link::on_state_change.connect([](simgrid::s4u::Link const& link) { if (link.get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) { auto link_load = link.extension();