Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
move on_start and on_completion from CommImpl to Comm
authorFred Suter <suterf@ornl.gov>
Wed, 31 May 2023 01:38:16 +0000 (21:38 -0400)
committerFred Suter <suterf@ornl.gov>
Wed, 31 May 2023 01:44:12 +0000 (21:44 -0400)
12 files changed:
docs/source/Plugins.rst
examples/cpp/dag-comm/s4u-dag-comm.tesh
examples/cpp/trace-process-migration/s4u-trace-process-migration.tesh
include/simgrid/s4u/Comm.hpp
src/instr/instr_platform.cpp
src/kernel/activity/CommImpl.cpp
src/kernel/resource/WifiLinkImpl.cpp
src/kernel/resource/WifiLinkImpl.hpp
src/plugins/link_energy.cpp
src/plugins/link_energy_wifi.cpp
src/plugins/link_load.cpp
src/s4u/s4u_Comm.cpp

index bee80bc..4a4fb58 100644 (file)
@@ -134,20 +134,17 @@ Partial list of existing signals in s4u:
 
   - :cpp:func:`Comm::on_send <simgrid::s4u::Comm::on_send_cb>`
     :cpp:func:`Comm::on_recv <simgrid::s4u::Comm::on_recv_cb>`
+  - :cpp:func:`Comm::on_start <simgrid::s4u::Comm::on_start_cb>`
     :cpp:func:`Comm::on_completion <simgrid::s4u::Comm::on_completion_cb>`
     :cpp:func:`Comm::on_suspend <simgrid::s4u::Comm::on_suspend_cb>`
     :cpp:func:`Comm::on_resume <simgrid::s4u::Comm::on_resume_cb>`
     :cpp:func:`Comm::on_veto <simgrid::s4u::Comm::on_veto_cb>`
-  - :cpp:func:`CommImpl::on_start <simgrid::s4u::CommImpl::on_start_cb>`
-    :cpp:func:`CommImpl::on_completion <simgrid::s4u::CommImpl::on_completion_cb>`
   - :cpp:func:`Exec::on_start <simgrid::s4u::Exec::on_start_cb>`
-    :cpp:func:`Exec::on_completion <simgrid::s4u::Exec::on_completion_cb>`
     :cpp:func:`Exec::on_completion <simgrid::s4u::Exec::on_completion_cb>`
     :cpp:func:`Exec::on_suspend <simgrid::s4u::Exec::on_suspend_cb>`
     :cpp:func:`Exec::on_resume <simgrid::s4u::Exec::on_resume_cb>`
     :cpp:func:`Exec::on_veto <simgrid::s4u::Exec::on_veto_cb>`
   - :cpp:func:`Io::on_start <simgrid::s4u::Io::on_start_cb>`
-    :cpp:func:`Io::on_completion <simgrid::s4u::Io::on_completion_cb>`
     :cpp:func:`Io::on_completion <simgrid::s4u::Io::on_completion_cb>`
     :cpp:func:`Io::on_suspend <simgrid::s4u::Io::on_suspend_cb>`
     :cpp:func:`Io::on_resume <simgrid::s4u::Io::on_resume_cb>`
index 0024dd2..abe40f4 100644 (file)
@@ -11,8 +11,8 @@ $ ${bindir:=.}/s4u-dag-comm ${platfdir}/two_hosts.xml --log=s4u_activity.t:verbo
 > [  1.000000] (0:maestro@) Exec 'parent' is complete (start time: 0.000000, finish time: 1.000000)
 > [  1.000000] (0:maestro@) Remove a dependency from 'parent' on 'transfer'
 > [  1.000000] (0:maestro@) 'transfer' is assigned to a resource and all dependencies are solved. Let's start
-> [  2.083775] (0:maestro@) Comm 'transfer' is complete
 > [  2.083775] (0:maestro@) Remove a dependency from 'transfer' on 'child'
 > [  2.083775] (0:maestro@) 'child' is assigned to a resource and all dependencies are solved. Let's start
+> [  2.083775] (0:maestro@) Comm 'transfer' is complete
 > [  3.083775] (0:maestro@) Exec 'child' is complete (start time: 2.083775, finish time: 3.083775)
 > [  3.083775] (0:maestro@) Simulation time 3.08378
index 42fcd20..99d6865 100644 (file)
@@ -262,8 +262,8 @@ $ tail -n +3 procmig.trace
 > 13 2.000000 11 32
 > 12 2.000000 11 32 14
 > 13 2.025708 11 33
-> 12 2.025708 11 33 15
 > 13 2.025708 11 32
+> 12 2.025708 11 33 15
 > 15 2.025708 17 0 M 32 0
 > 7 2.025708 10 32
 > 6 2.025708 34 10 1 "emigrant-1"
@@ -272,8 +272,8 @@ $ tail -n +3 procmig.trace
 > 13 4.025708 11 34
 > 12 4.025708 11 34 14
 > 13 4.025903 11 33
-> 12 4.025903 11 33 15
 > 13 4.025903 11 34
+> 12 4.025903 11 33 15
 > 15 4.025903 17 0 M 34 1
 > 7 4.025903 10 34
 > 6 4.025903 35 10 2 "emigrant-1"
@@ -282,8 +282,8 @@ $ tail -n +3 procmig.trace
 > 13 6.025903 11 35
 > 12 6.025903 11 35 14
 > 13 6.044918 11 33
-> 12 6.044918 11 33 15
 > 13 6.044918 11 35
+> 12 6.044918 11 33 15
 > 15 6.044918 17 0 M 35 2
 > 7 6.044918 10 35
 > 6 6.044918 36 10 3 "emigrant-1"
@@ -292,8 +292,8 @@ $ tail -n +3 procmig.trace
 > 13 8.044918 11 36
 > 12 8.044918 11 36 14
 > 13 8.070626 11 33
-> 12 8.070626 11 33 15
 > 13 8.070626 11 36
+> 12 8.070626 11 33 15
 > 15 8.070626 17 0 M 36 3
 > 7 8.070626 10 36
 > 6 8.070626 37 10 4 "emigrant-1"
@@ -302,8 +302,8 @@ $ tail -n +3 procmig.trace
 > 13 10.070626 11 37
 > 12 10.070626 11 37 14
 > 13 10.087178 11 33
-> 12 10.087178 11 33 15
 > 13 10.087178 11 37
+> 12 10.087178 11 33 15
 > 15 10.087178 17 0 M 37 4
 > 7 10.087178 10 37
 > 6 10.087178 38 10 5 "emigrant-1"
@@ -312,8 +312,8 @@ $ tail -n +3 procmig.trace
 > 13 12.087178 11 38
 > 12 12.087178 11 38 14
 > 13 12.112617 11 33
-> 12 12.112617 11 33 15
 > 13 12.112617 11 38
+> 12 12.112617 11 33 15
 > 15 12.112617 17 0 M 38 5
 > 7 12.112617 10 38
 > 6 12.112617 39 10 3 "emigrant-1"
@@ -322,8 +322,8 @@ $ tail -n +3 procmig.trace
 > 13 14.112617 11 39
 > 12 14.112617 11 39 14
 > 13 14.138325 11 33
-> 12 14.138325 11 33 15
 > 13 14.138325 11 39
+> 12 14.138325 11 33 15
 > 15 14.138325 17 0 M 39 6
 > 7 14.138325 10 39
 > 6 14.138325 40 10 1 "emigrant-1"
@@ -332,8 +332,8 @@ $ tail -n +3 procmig.trace
 > 13 16.138325 11 40
 > 12 16.138325 11 40 14
 > 13 16.138521 11 33
-> 12 16.138521 11 33 15
 > 13 16.138521 11 40
+> 12 16.138521 11 33 15
 > 15 16.138521 17 0 M 40 7
 > 7 16.138521 10 40
 > 6 16.138521 41 10 4 "emigrant-1"
index 1a3488c..2577c88 100644 (file)
@@ -19,6 +19,7 @@ namespace simgrid::s4u {
  */
 class XBT_PUBLIC Comm : public Activity_T<Comm> {
   friend Mailbox; // Factory of comms
+  friend kernel::activity::CommImpl;
   /* specified for normal mailbox-based communications*/
   Mailbox* mailbox_                   = nullptr;
   kernel::actor::ActorImpl* sender_   = nullptr;
@@ -38,12 +39,12 @@ class XBT_PUBLIC Comm : public Activity_T<Comm> {
   Comm() = default;
   Comm* do_start() override;
 
+protected:
   static xbt::signal<void(Comm const&)> on_send;
   static xbt::signal<void(Comm const&)> on_recv;
   static xbt::signal<void(Comm const&)> on_start;
 
-protected:
-  void fire_on_completion() const override { on_completion(*this); }
+  void fire_on_completion() const override { /* Do nothing */ }
   void fire_on_veto() const override { on_veto(const_cast<Comm&>(*this)); }
   void fire_on_suspend() const override { on_suspend(*this); }
   void fire_on_resume() const override { on_resume(*this); }
@@ -150,6 +151,7 @@ public:
 
   bool is_assigned() const override;
   Actor* get_sender() const;
+  Actor* get_receiver() const;
 
   /* Comm life cycle */
   /** Start the comm, and ignore its result. It can be completely forgotten after that. */
index 540a8dd..5ea76ef 100644 (file)
@@ -517,12 +517,13 @@ void define_callbacks()
     });
 
     s4u::Comm::on_completion_cb([](const s4u::Comm& c) {
-      std::string pid = instr_pid(*s4u::Actor::self());
-      if (pid == "-0") { //Comm is launched directly by Maestro, use the host as container
-          Container::by_name(c.get_source()->get_name())->get_state("HOST_STATE")->pop_event();
-          Container::by_name(c.get_destination()->get_name())->get_state("HOST_STATE")->pop_event();
-      } else
-        Container::by_name(pid)->get_state("ACTOR_STATE")->pop_event();
+      if (c.get_sender()) {
+        Container::by_name(instr_pid(*c.get_sender()))->get_state("ACTOR_STATE")->pop_event();
+        Container::by_name(instr_pid(*c.get_receiver()))->get_state("ACTOR_STATE")->pop_event();
+      } else {
+        Container::by_name(c.get_source()->get_name())->get_state("HOST_STATE")->pop_event();
+        Container::by_name(c.get_destination()->get_name())->get_state("HOST_STATE")->pop_event();
+      }
     });
     s4u::Comm::on_send_cb([](s4u::Comm const& c) {
       std::string pid = instr_pid(*s4u::Actor::self());
index 44ec50b..7a9ef04 100644 (file)
@@ -133,7 +133,6 @@ CommImpl* CommImpl::start()
     model_action_->set_category(get_tracing_category());
     set_start_time(model_action_->get_start_time());
     set_state(State::RUNNING);
-    on_start(*this);
 
     XBT_DEBUG("Starting communication %p from '%s' to '%s' (model action: %p; state: %s)", this, from_->get_cname(),
               to_->get_cname(), model_action_, get_state_str());
@@ -471,7 +470,7 @@ void CommImpl::finish()
   XBT_DEBUG("CommImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
             src_actor_.get(), dst_actor_.get(), detached_);
 
-  on_completion(*this);
+  s4u::Comm::on_completion(static_cast<const s4u::Comm&>(*this->get_iface()));
 
   /* Update synchro state */
   if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
index 0d5ed17..b0abe21 100644 (file)
@@ -3,6 +3,7 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+#include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Host.hpp>
 
 #include "src/kernel/activity/CommImpl.hpp"
@@ -15,6 +16,21 @@ namespace simgrid::kernel::resource {
 /************
  * Resource *
  ************/
+static void update_bw_comm_start(const s4u::Comm& comm)
+{
+  auto* pimpl = static_cast<activity::CommImpl*>(comm.get_impl());
+
+  auto const* actionWifi = dynamic_cast<const kernel::resource::WifiLinkAction*>(pimpl->model_action_);
+  if (actionWifi == nullptr)
+    return;
+
+  if (auto* link_src = actionWifi->get_src_link()) {
+    link_src->inc_active_flux();
+  }
+  if (auto* link_dst = actionWifi->get_dst_link()) {
+    link_dst->inc_active_flux();
+  }
+}
 
 WifiLinkImpl::WifiLinkImpl(const std::string& name, const std::vector<double>& bandwidths, lmm::System* system)
     : StandardLinkImpl(name)
@@ -22,7 +38,7 @@ WifiLinkImpl::WifiLinkImpl(const std::string& name, const std::vector<double>& b
   this->set_constraint(system->constraint_new(this, 1));
   for (auto bandwidth : bandwidths)
     bandwidths_.push_back({bandwidth, 1.0, nullptr});
-  kernel::activity::CommImpl::on_start.connect(&update_bw_comm_start);
+  s4u::Comm::on_start_cb(&update_bw_comm_start);
   s4u::Link::on_communication_state_change_cb(&update_bw_comm_end);
 }
 
@@ -79,20 +95,6 @@ void WifiLinkImpl::dec_active_flux()
   nb_active_flux_--;
 }
 
-void WifiLinkImpl::update_bw_comm_start(const kernel::activity::CommImpl& comm)
-{
-  auto const* actionWifi = dynamic_cast<const simgrid::kernel::resource::WifiLinkAction*>(comm.model_action_);
-  if (actionWifi == nullptr)
-    return;
-
-  if (auto* link_src = actionWifi->get_src_link()) {
-    link_src->inc_active_flux();
-  }
-  if (auto* link_dst = actionWifi->get_dst_link()) {
-    link_dst->inc_active_flux();
-  }
-}
-
 void WifiLinkImpl::update_bw_comm_end(const simgrid::kernel::resource::NetworkAction& action,
                                       simgrid::kernel::resource::Action::State /*state*/)
 {
index cd08f6d..969eeeb 100644 (file)
@@ -53,9 +53,7 @@ public:
   void set_latency(double) override;
   bool toggle_callback();
 
-  static void update_bw_comm_start(const kernel::activity::CommImpl&);
-  static void update_bw_comm_end(const simgrid::kernel::resource::NetworkAction& action,
-                                 simgrid::kernel::resource::Action::State state);
+  static void update_bw_comm_end(const NetworkAction& action, Action::State state);
   void inc_active_flux();
   void dec_active_flux();
   static double wifi_link_dynamic_sharing(const WifiLinkImpl& link, double capacity, int n);
index dd1e53b..66ab5d3 100644 (file)
@@ -6,6 +6,7 @@
 #include "simgrid/Exception.hpp"
 #include "simgrid/host.h"
 #include "simgrid/plugins/energy.h"
+#include "simgrid/s4u/Comm.hpp"
 #include "simgrid/s4u/Engine.hpp"
 #include "simgrid/s4u/Link.hpp"
 #include "src/kernel/activity/CommImpl.hpp"
@@ -161,15 +162,17 @@ static void on_simulation_end()
   XBT_INFO("Total energy over all links: %f", total_energy);
 }
 
-static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+static void on_communication(const simgrid::s4u::Comm& comm)
 {
-  for (auto const* link : comm.get_traversed_links()) {
+  auto* pimpl = static_cast<simgrid::kernel::activity::CommImpl*>(comm.get_impl());
+  for (auto const* link : pimpl->get_traversed_links()) {
     if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
       XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
       link->extension<LinkEnergy>()->update();
     }
   }
 }
+
 /* **************************** Public interface *************************** */
 
 int sg_link_energy_is_inited()
@@ -209,8 +212,8 @@ void sg_link_energy_plugin_init()
                link.extension<LinkEnergy>()->get_consumed_energy());
   });
 
-  simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
-  simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
+  simgrid::s4u::Comm::on_start_cb(&on_communication);
+  simgrid::s4u::Comm::on_completion_cb(&on_communication);
 
   simgrid::s4u::Engine::on_simulation_end_cb(&on_simulation_end);
 }
index 0eb9cf0..3912fc9 100644 (file)
@@ -4,6 +4,7 @@
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include <simgrid/plugins/energy.h>
+#include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 #include <simgrid/s4u/Link.hpp>
 
@@ -265,9 +266,10 @@ void LinkEnergyWifi::init_watts_range_list()
 
 using simgrid::plugin::LinkEnergyWifi;
 /* **************************** events  callback *************************** */
-static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+static void on_communication(const simgrid::s4u::Comm& comm)
 {
-  for (const auto* link : comm.get_traversed_links()) {
+  auto* pimpl = static_cast<simgrid::kernel::activity::CommImpl*>(comm.get_impl());
+  for (auto const* link : pimpl->get_traversed_links()) {
     if (link != nullptr && link->get_sharing_policy() == simgrid::s4u::Link::SharingPolicy::WIFI) {
       auto* link_energy = link->extension<LinkEnergyWifi>();
       XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
@@ -324,6 +326,6 @@ void sg_wifi_energy_plugin_init()
     }
   });
 
-  simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
-  simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
+  simgrid::s4u::Comm::on_start_cb(&on_communication);
+  simgrid::s4u::Comm::on_completion_cb(&on_communication);
 }
index d502cec..8f0937d 100644 (file)
@@ -4,6 +4,7 @@
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include <simgrid/plugins/load.h>
+#include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Engine.hpp>
 
 #include "src/kernel/activity/CommImpl.hpp"
@@ -161,9 +162,10 @@ double LinkLoad::get_average_bytes()
 using simgrid::plugin::LinkLoad;
 
 /* **************************** events  callback *************************** */
-static void on_communication(const simgrid::kernel::activity::CommImpl& comm)
+static void on_communication(const simgrid::s4u::Comm& comm)
 {
-  for (const auto* link : comm.get_traversed_links()) {
+  auto* pimpl = static_cast<simgrid::kernel::activity::CommImpl*>(comm.get_impl());
+  for (auto const* link : pimpl->get_traversed_links()) {
     if (link != nullptr && link->get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
       auto* link_load = link->extension<LinkLoad>();
       XBT_DEBUG("Update %s on Comm Start/End", link->get_cname());
@@ -199,8 +201,8 @@ void sg_link_load_plugin_init()
   });
 
   // Call this plugin on some of the links' events.
-  simgrid::kernel::activity::CommImpl::on_start.connect(&on_communication);
-  simgrid::kernel::activity::CommImpl::on_completion.connect(&on_communication);
+  simgrid::s4u::Comm::on_start_cb(&on_communication);
+  simgrid::s4u::Comm::on_completion_cb(&on_communication);
 
   simgrid::s4u::Link::on_onoff_cb([](simgrid::s4u::Link const& link) {
     if (link.get_sharing_policy() != simgrid::s4u::Link::SharingPolicy::WIFI) {
index 408d1a3..376d18d 100644 (file)
@@ -21,6 +21,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous commun
 namespace simgrid::s4u {
 xbt::signal<void(Comm const&)> Comm::on_send;
 xbt::signal<void(Comm const&)> Comm::on_recv;
+xbt::signal<void(Comm const&)> Comm::on_start;
 
 CommPtr Comm::set_copy_data_callback(const std::function<void(kernel::activity::CommImpl*, void*, size_t)>& callback)
 {
@@ -289,6 +290,14 @@ Actor* Comm::get_sender() const
   return sender ? sender->get_ciface() : nullptr;
 }
 
+Actor* Comm::get_receiver() const
+{
+  kernel::actor::ActorImplPtr receiver = nullptr;
+  if (pimpl_)
+    receiver = boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->dst_actor_;
+  return receiver ? receiver->get_ciface() : nullptr;
+}
+
 bool Comm::is_assigned() const
 {
   return (pimpl_ && boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->is_assigned()) ||
@@ -348,6 +357,9 @@ Comm* Comm::do_start()
   if (not detached_) {
     pimpl_->set_iface(this);
     pimpl_->set_actor(sender_);
+    // Only throw the signal when both sides are here and the status is READY
+    if (pimpl_->get_state() != kernel::activity::State::WAITING)
+      on_start(*this);
   }
 
   state_ = State::STARTED;