Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
introduce Comm::on_start and Comm::on_completion
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Wed, 20 Oct 2021 09:59:41 +0000 (11:59 +0200)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Wed, 20 Oct 2021 09:59:41 +0000 (11:59 +0200)
+ Consistency with other activities
+ Potential replacement to the unnatural Link::on_communicate
+ Use these signals in the link plugins (energy and load)
+ update doc

docs/source/Plugins.rst
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/plugins/link_energy.cpp
src/plugins/link_energy_wifi.cpp
src/plugins/link_load.cpp

index 154417c..2530e77 100644 (file)
@@ -84,6 +84,10 @@ Partial list of existing signals in s4u:
   :cpp:member:`Engine::on_time_advance <simgrid::s4u::Engine::on_time_advance>`
   :cpp:member:`Engine::on_simulation_end <simgrid::s4u::Engine::on_simulation_end>`
   :cpp:member:`Engine::on_deadlock <simgrid::s4u::Engine::on_deadlock>`
+- :cpp:member:`Comm::on_start <simgrid::s4u::Comm::on_start>`
+  :cpp:member:`Comm::on_completion <simgrid::s4u::Comm::on_completion>`
+- :cpp:member:`Exec::on_start <simgrid::s4u::Exec::on_start>`
+  :cpp:member:`Exec::on_completion <simgrid::s4u::Exec::on_completion>`
 - :cpp:member:`Exec::on_start <simgrid::s4u::Exec::on_start>`
   :cpp:member:`Exec::on_completion <simgrid::s4u::Exec::on_completion>`
 - :cpp:member:`Host::on_creation <simgrid::s4u::Host::on_creation>`
index ba477b8..28b5014 100644 (file)
@@ -226,6 +226,8 @@ void SIMIX_comm_copy_pointer_callback(simgrid::kernel::activity::CommImpl* comm,
 namespace simgrid {
 namespace kernel {
 namespace activity {
+xbt::signal<void(CommImpl const&)> CommImpl::on_start;
+xbt::signal<void(CommImpl const&)> CommImpl::on_completion;
 
 void (*CommImpl::copy_data_callback_)(CommImpl*, void*, size_t) = &s4u::Comm::copy_pointer_callback;
 
@@ -300,7 +302,7 @@ CommImpl* CommImpl::start()
   if (state_ == State::READY) {
     from_ = from_ != nullptr ? from_ : src_actor_->get_host();
     to_   = to_ != nullptr ? to_ : dst_actor_->get_host();
-
+    on_start(*this);
     /* Getting the network_model from the origin host
      * Valid while we have a single network model, otherwise we would need to change this function to first get the
      * routes and later create the respective surf actions */
@@ -341,6 +343,16 @@ CommImpl* CommImpl::start()
   return this;
 }
 
+std::vector<s4u::Link*> CommImpl::get_traversed_links() const
+{
+  xbt_assert(state_ != State::WAITING, "You cannot use %s() if your communication is not ready (%s)", __FUNCTION__,
+             get_state_str());
+  std::vector<s4u::Link*> vlinks;
+  XBT_ATTRIB_UNUSED double res = 0;
+  from_->route_to(to_, vlinks, &res);
+  return vlinks;
+}
+
 /** @brief Copy the communication data from the sender's buffer to the receiver's one  */
 void CommImpl::copy_data()
 {
@@ -519,6 +531,8 @@ void CommImpl::cleanup_surf()
 
 void CommImpl::post()
 {
+  on_completion(*this);
+
   /* Update synchro state */
   if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
     state_ = State::SRC_TIMEOUT;
@@ -546,7 +560,6 @@ void CommImpl::post()
 void CommImpl::finish()
 {
   XBT_DEBUG("CommImpl::finish() in state %s", to_c_str(state_));
-
   /* If the synchro is still in a rendez-vous point then remove from it */
   if (mbox_)
     mbox_->remove(this);
index 4221a89..0334947 100644 (file)
@@ -46,6 +46,7 @@ public:
   MailboxImpl* get_mailbox() const { return mbox_; }
   bool detached() const { return detached_; }
 
+  std::vector<s4u::Link*> get_traversed_links() const;
   void copy_data();
 
   bool test() override;
@@ -88,6 +89,9 @@ expectations of the other side, too. See  */
 
   void* src_data_ = nullptr; /* User data associated to the communication */
   void* dst_data_ = nullptr;
+
+  static xbt::signal<void(CommImpl const&)> on_start;
+  static xbt::signal<void(CommImpl const&)> on_completion;
 };
 } // namespace activity
 } // namespace kernel
index 636b8bf..37366fe 100644 (file)
@@ -7,8 +7,8 @@
 #include "simgrid/host.h"
 #include "simgrid/plugins/energy.h"
 #include "simgrid/s4u/Engine.hpp"
-#include "simgrid/simix.hpp"
-#include "src/surf/network_interface.hpp"
+#include "simgrid/s4u/Link.hpp"
+#include "src/kernel/activity/CommImpl.hpp"
 #include "src/surf/surf_interface.hpp"
 
 #include <boost/algorithm/string/classification.hpp>
@@ -147,20 +147,6 @@ double LinkEnergy::get_consumed_energy()
 using simgrid::plugin::LinkEnergy;
 
 /* **************************** events  callback *************************** */
-static void on_communicate(const simgrid::kernel::resource::NetworkAction& action)
-{
-  XBT_DEBUG("onCommunicate is called");
-  for (auto const* link : action.get_links()) {
-    if (link == nullptr || link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI)
-      continue;
-
-    XBT_DEBUG("Update link %s", link->get_cname());
-    auto* link_energy = link->get_iface()->extension<LinkEnergy>();
-    link_energy->init_watts_range_list();
-    link_energy->update();
-  }
-}
-
 static void on_simulation_end()
 {
   std::vector<simgrid::s4u::Link*> links = simgrid::s4u::Engine::get_instance()->get_all_links();
@@ -176,6 +162,16 @@ static void on_simulation_end()
 
   XBT_INFO("Total energy over all links: %f", total_energy);
 }
+
+static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+{
+  for (auto const* link : comm.get_traversed_links()) {
+    if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
+      XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
+      link->extension<LinkEnergy>()->update();
+    }
+  }
+}
 /* **************************** Public interface *************************** */
 
 int sg_link_energy_is_inited()
@@ -215,16 +211,9 @@ void sg_link_energy_plugin_init()
                link.extension<LinkEnergy>()->get_consumed_energy());
   });
 
-  simgrid::s4u::Link::on_communication_state_change.connect(
-      [](simgrid::kernel::resource::NetworkAction const& action,
-         simgrid::kernel::resource::Action::State /* previous */) {
-        for (auto const* link : action.get_links()) {
-          if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI)
-            link->get_iface()->extension<LinkEnergy>()->update();
-        }
-      });
+  simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
+  simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
 
-  simgrid::s4u::Link::on_communicate.connect(&on_communicate);
   simgrid::s4u::Engine::on_simulation_end.connect(&on_simulation_end);
 }
 
index acd2dc5..2ccc71b 100644 (file)
@@ -7,6 +7,7 @@
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Link.hpp>
 
+#include "src/kernel/activity/CommImpl.hpp"
 #include "src/surf/network_interface.hpp"
 #include "src/surf/network_wifi.hpp"
 
@@ -61,7 +62,7 @@ public:
   LinkEnergyWifi()  = delete;
 
   /** Update the energy consumed by link_ when transmissions start or end */
-  void update(const simgrid::kernel::resource::NetworkAction &);
+  void update();
 
   /** Update the energy consumed when link_ is destroyed */
   void update_destroy();
@@ -110,7 +111,7 @@ void LinkEnergyWifi::update_destroy()
   XBT_DEBUG("finish eStat_ += %f * %f * (%d+1) | eStat = %f", duration, pIdle_, wifi_link->get_host_count(), eStat_);
 }
 
-void LinkEnergyWifi::update(const kernel::resource::NetworkAction&)
+void LinkEnergyWifi::update()
 {
   init_watts_range_list();
 
@@ -263,6 +264,17 @@ void LinkEnergyWifi::init_watts_range_list()
 } // namespace simgrid
 
 using simgrid::plugin::LinkEnergyWifi;
+/* **************************** events  callback *************************** */
+static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+{
+  for (auto* link : comm.get_traversed_links()) {
+    if (link != nullptr && link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) {
+      auto* link_energy = link->extension<LinkEnergyWifi>();
+      XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
+      link_energy->update();
+    }
+  }
+}
 
 void sg_wifi_energy_plugin_init()
 {
@@ -274,10 +286,10 @@ void sg_wifi_energy_plugin_init()
 
   /**
    * Attaching to events:
-   * - on_creation to initialize the plugin
-   * - on_destruction to produce final energy results
-   * - on_communication_state_change: to account the energy when communications are updated
-   * - on_communicate: ''
+   * - Link::on_creation to initialize the plugin
+   * - Link::on_destruction to produce final energy results
+   * - Link::on_communication_state_change: to account for the energy when communications are updated
+   * - Comm::on_start and Comm::on_completion: to account for the energy during communications
    */
   simgrid::s4u::Link::on_creation.connect([](simgrid::s4u::Link& link) {
     // verify the link is appropriate to WiFi energy computations
@@ -308,23 +320,11 @@ void sg_wifi_energy_plugin_init()
         // update WiFi links encountered during the communication
         for (auto const* link : action.get_links()) {
           if (link != nullptr && link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) {
-            link->get_iface()->extension<LinkEnergyWifi>()->update(action);
+            link->get_iface()->extension<LinkEnergyWifi>()->update();
           }
         }
       });
 
-  simgrid::s4u::Link::on_communicate.connect([](const simgrid::kernel::resource::NetworkAction& action) {
-    auto const* actionWifi = dynamic_cast<const simgrid::kernel::resource::NetworkWifiAction*>(&action);
-
-    if (actionWifi == nullptr)
-      return;
-
-    auto const* link_src = actionWifi->get_src_link();
-    auto const* link_dst = actionWifi->get_dst_link();
-
-    if(link_src != nullptr)
-      link_src->get_iface()->extension<LinkEnergyWifi>()->update(action);
-    if(link_dst != nullptr)
-      link_dst->get_iface()->extension<LinkEnergyWifi>()->update(action);
-  });
+  simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
+  simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
 }
index 719b96a..c2df9c8 100644 (file)
@@ -6,6 +6,7 @@
 #include <simgrid/plugins/load.h>
 #include <simgrid/s4u/Engine.hpp>
 
+#include "src/kernel/activity/CommImpl.hpp"
 #include "src/surf/network_interface.hpp"
 
 #include <limits>
@@ -161,16 +162,14 @@ double LinkLoad::get_average_bytes()
 using simgrid::plugin::LinkLoad;
 
 /* **************************** events  callback *************************** */
-static void on_communicate(const simgrid::kernel::resource::NetworkAction& action)
+static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
 {
-  XBT_DEBUG("on_communicate is called");
-  for (auto* link : action.get_links()) {
-    if (link == nullptr || link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI)
-      continue;
-
-    auto link_load = link->get_iface()->extension<LinkLoad>();
-    if (link_load->is_tracked()) {
-      link_load->update();
+  for (auto* link : comm.get_traversed_links()) {
+    if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
+      auto* link_load = link->extension<LinkLoad>();
+      XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
+      if (link_load->is_tracked())
+        link_load->update();
     }
   }
 }
@@ -200,7 +199,9 @@ void sg_link_load_plugin_init()
   });
 
   // Call this plugin on some of the links' events.
-  simgrid::s4u::Link::on_communicate.connect(&on_communicate);
+  simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
+  simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
+
   simgrid::s4u::Link::on_state_change.connect([](simgrid::s4u::Link const& link) {
     if (link.get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
       auto link_load = link.extension<LinkLoad>();