Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Unify on_start/on_completion signals of Activities
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Sun, 10 Jan 2021 12:20:01 +0000 (13:20 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Sun, 10 Jan 2021 12:20:01 +0000 (13:20 +0100)
16 files changed:
ChangeLog
docs/source/Platform_Examples.rst
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Exec.hpp
include/simgrid/s4u/Io.hpp
src/instr/instr_platform.cpp
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp
src/plugins/dirty_page_tracking.cpp
src/plugins/host_dvfs.cpp
src/plugins/host_energy.cpp
src/plugins/host_load.cpp
src/plugins/vm/VirtualMachineImpl.cpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp
src/s4u/s4u_Io.cpp

index 6b37281..9897130 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -13,6 +13,7 @@ S4U:
 - Functions Mailbox::get() and Mailbox::get_async() are now templated with the
   type of the pointee. Untyped functions are deprecated. Use Mailbox::get<void>()
   or Mailbox::get_async<void>() if you really want to play with void*.
+- Unify the interface of Activity::on_{start/activity}
 
 ----------------------------------------------------------------------------
 
index 09b34a1..36592f7 100644 (file)
@@ -153,6 +153,9 @@ Here is the meaning of this example: ``2 ; 4,4 ; 1,2 ; 1,2``
    :language: xml
    :lines: 1-3,10-
 
+.. todo:
+
+   Model some other platforms, for example from https://link.springer.com/article/10.1007/s11227-019-03142-8
 
 Dragonfly Cluster
 -----------------
index cc16d68..d89f061 100644 (file)
@@ -42,9 +42,9 @@ public:
 
   ~Comm() override;
 
-  static xbt::signal<void(Actor const&)> on_sender_start;
-  static xbt::signal<void(Actor const&)> on_receiver_start;
-  static xbt::signal<void(Actor const&)> on_completion;
+  static xbt::signal<void(Comm const&, Actor const&)> on_sender_start;
+  static xbt::signal<void(Comm const&, Actor const&)> on_receiver_start;
+  static xbt::signal<void(Comm const&, Actor const&)> on_completion;
 
   /*! take a vector s4u::CommPtr and return when one of them is finished.
    * The return value is the rank of the first finished CommPtr. */
index b15ed1b..38289a8 100644 (file)
@@ -46,8 +46,8 @@ public:
   Exec& operator=(Exec const&) = delete;
 
 #endif
-  static xbt::signal<void(Actor const&, Exec const&)> on_start;
-  static xbt::signal<void(Actor const&, Exec const&)> on_completion;
+  static xbt::signal<void(Exec const&, Actor const&)> on_start;
+  static xbt::signal<void(Exec const&, Actor const&)> on_completion;
 
   Exec* start() override;
   /** @brief On sequential executions, returns the amount of flops that remain to be done; This cannot be used on
index e979fb5..f51282b 100644 (file)
@@ -36,9 +36,12 @@ public:
 #ifndef DOXYGEN
   friend Disk;    // Factory of IOs
   friend Storage; // Factory of IOs
-#endif
 
   ~Io() override = default;
+#endif
+
+  static xbt::signal<void(Io const&, Actor const&)> on_start;
+  static xbt::signal<void(Io const&, Actor const&)> on_completion;
 
   Io* start() override;
   Io* wait() override;
index 287d531..e9bb5c4 100644 (file)
@@ -465,30 +465,31 @@ void define_callbacks()
     });
     s4u::Actor::on_wake_up.connect(
         [](s4u::Actor const& actor) { Container::by_name(instr_pid(actor))->get_state("ACTOR_STATE")->pop_event(); });
-    s4u::Exec::on_start.connect([](simgrid::s4u::Actor const& actor, s4u::Exec const&) {
+    s4u::Exec::on_start.connect([](s4u::Exec const&, simgrid::s4u::Actor const& actor) {
       Container::by_name(instr_pid(actor))->get_state("ACTOR_STATE")->push_event("execute");
     });
-    s4u::Exec::on_completion.connect([](s4u::Actor const& actor, s4u::Exec const&) {
+    s4u::Exec::on_completion.connect([](s4u::Exec const&, s4u::Actor const& actor) {
       Container::by_name(instr_pid(actor))->get_state("ACTOR_STATE")->pop_event();
     });
-    s4u::Comm::on_sender_start.connect([](s4u::Actor const& actor) {
+    s4u::Comm::on_sender_start.connect([](s4u::Comm const&, s4u::Actor const& actor) {
       Container::by_name(instr_pid(actor))->get_state("ACTOR_STATE")->push_event("send");
     });
-    s4u::Comm::on_receiver_start.connect([](s4u::Actor const& actor) {
+    s4u::Comm::on_receiver_start.connect([](s4u::Comm const&, s4u::Actor const& actor) {
       Container::by_name(instr_pid(actor))->get_state("ACTOR_STATE")->push_event("receive");
     });
-    s4u::Comm::on_completion.connect(
-        [](s4u::Actor const& actor) { Container::by_name(instr_pid(actor))->get_state("ACTOR_STATE")->pop_event(); });
+    s4u::Comm::on_completion.connect([](s4u::Comm const&, s4u::Actor const& actor) {
+      Container::by_name(instr_pid(actor))->get_state("ACTOR_STATE")->pop_event();
+    });
     s4u::Actor::on_host_change.connect(on_actor_host_change);
   }
 
   if (TRACE_smpi_is_enabled() && TRACE_smpi_is_computing()) {
-    s4u::Exec::on_start.connect([](simgrid::s4u::Actor const& actor, s4u::Exec const& exec) {
+    s4u::Exec::on_start.connect([](s4u::Exec const& exec, simgrid::s4u::Actor const& actor) {
       Container::by_name(std::string("rank-") + std::to_string(actor.get_pid()))
           ->get_state("MPI_STATE")
           ->push_event("computing", new CpuTIData("compute", exec.get_cost()));
     });
-    s4u::Exec::on_completion.connect([](s4u::Actor const& actor, s4u::Exec const&) {
+    s4u::Exec::on_completion.connect([](s4u::Exec const&, s4u::Actor const& actor) {
       Container::by_name(std::string("rank-") + std::to_string(actor.get_pid()))->get_state("MPI_STATE")->pop_event();
     });
   }
index 72e4007..c9a7430 100644 (file)
@@ -61,7 +61,6 @@ IoImpl* IoImpl::start()
   surf_action_->set_activity(this);
 
   XBT_DEBUG("Create IO synchro %p %s", this, get_cname());
-  IoImpl::on_start(*this);
 
   return this;
 }
@@ -86,8 +85,6 @@ void IoImpl::post()
     timeout_detector_ = nullptr;
   }
 
-  on_completion(*this);
-
   /* Answer all simcalls associated with the synchro */
   finish();
 }
@@ -122,12 +119,6 @@ void IoImpl::finish()
   }
 }
 
-/*************
- * Callbacks *
- *************/
-xbt::signal<void(IoImpl const&)> IoImpl::on_start;
-xbt::signal<void(IoImpl const&)> IoImpl::on_completion;
-
 } // namespace activity
 } // namespace kernel
 } // namespace simgrid
index 707372e..df8bf13 100644 (file)
@@ -36,9 +36,6 @@ public:
   IoImpl* start();
   void post() override;
   void finish() override;
-
-  static xbt::signal<void(IoImpl const&)> on_start;
-  static xbt::signal<void(IoImpl const&)> on_completion;
 };
 } // namespace activity
 } // namespace kernel
index db04237..9bed1db 100644 (file)
@@ -73,7 +73,7 @@ static void on_virtual_machine_creation(simgrid::vm::VirtualMachineImpl& vm)
   vm.extension_set<simgrid::vm::DirtyPageTrackingExt>(new simgrid::vm::DirtyPageTrackingExt());
 }
 
-static void on_exec_creation(simgrid::s4u::Actor const&, simgrid::s4u::Exec const& e)
+static void on_exec_creation(simgrid::s4u::Exec const& e, simgrid::s4u::Actor const&)
 {
   auto exec                        = static_cast<simgrid::kernel::activity::ExecImpl*>(e.get_impl());
   const simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec->get_host());
@@ -87,7 +87,7 @@ static void on_exec_creation(simgrid::s4u::Actor const&, simgrid::s4u::Exec cons
   }
 }
 
-static void on_exec_completion(simgrid::s4u::Actor const&, simgrid::s4u::Exec const& e)
+static void on_exec_completion(simgrid::s4u::Exec const& e, simgrid::s4u::Actor const&)
 {
   auto exec                        = static_cast<simgrid::kernel::activity::ExecImpl*>(e.get_impl());
   const simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec->get_host());
index 4c203c2..edd21d2 100644 (file)
@@ -295,11 +295,11 @@ public:
         task_id           = 0;
       }
     });
-    simgrid::s4u::Exec::on_start.connect([this](simgrid::s4u::Actor const&, simgrid::s4u::Exec const& activity) {
+    simgrid::s4u::Exec::on_start.connect([this](simgrid::s4u::Exec const& activity, simgrid::s4u::Actor const&) {
       if (activity.get_host() == get_host())
         pre_task();
     });
-    simgrid::s4u::Exec::on_completion.connect([this](simgrid::s4u::Actor const&, simgrid::s4u::Exec const& activity) {
+    simgrid::s4u::Exec::on_completion.connect([this](simgrid::s4u::Exec const& activity, simgrid::s4u::Actor const&) {
       // For more than one host (not yet supported), we can access the host via
       // simcalls_.front()->issuer->get_iface()->get_host()
       if (activity.get_host() == get_host() && iteration_running) {
index 1fe7043..06086ce 100644 (file)
@@ -551,7 +551,7 @@ void sg_host_energy_plugin_init()
   // that the next trigger would be the 2nd compute, hence ignoring the idle time
   // during the recv call. By updating at the beginning of a compute, we can
   // fix that. (If the cpu is not idle, this is not required.)
-  simgrid::s4u::Exec::on_start.connect([](simgrid::s4u::Actor const&, simgrid::s4u::Exec const& activity) {
+  simgrid::s4u::Exec::on_start.connect([](simgrid::s4u::Exec const& activity, simgrid::s4u::Actor const&) {
     if (activity.get_host_number() == 1) { // We only run on one host
       simgrid::s4u::Host* host         = activity.get_host();
       const simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
index c538bf3..4011e98 100644 (file)
@@ -231,7 +231,7 @@ void sg_host_load_plugin_init()
     host.extension_set(new HostLoad(&host));
   });
 
-  simgrid::s4u::Exec::on_start.connect([](simgrid::s4u::Actor const&, simgrid::s4u::Exec const& activity) {
+  simgrid::s4u::Exec::on_start.connect([](simgrid::s4u::Exec const& activity, simgrid::s4u::Actor const&) {
     if (activity.get_host_number() == 1) { // We only run on one host
       simgrid::s4u::Host* host         = activity.get_host();
       const simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
@@ -247,7 +247,7 @@ void sg_host_load_plugin_init()
       XBT_WARN("HostLoad plugin currently does not support executions on several hosts");
     }
   });
-  simgrid::s4u::Exec::on_completion.connect([](simgrid::s4u::Actor const&, simgrid::s4u::Exec const& activity) {
+  simgrid::s4u::Exec::on_completion.connect([](simgrid::s4u::Exec const& activity, simgrid::s4u::Actor const&) {
     if (activity.get_host_number() == 1) { // We only run on one host
       simgrid::s4u::Host* host         = activity.get_host();
       const simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
index b95cc55..640d91d 100644 (file)
@@ -55,7 +55,7 @@ static void host_state_change(s4u::Host const& host)
   }
 }
 
-static void add_active_exec(s4u::Actor const&, s4u::Exec const& task)
+static void add_active_exec(s4u::Exec const& task, s4u::Actor const&)
 {
   const s4u::VirtualMachine* vm = dynamic_cast<s4u::VirtualMachine*>(task.get_host());
   if (vm != nullptr) {
@@ -65,7 +65,7 @@ static void add_active_exec(s4u::Actor const&, s4u::Exec const& task)
   }
 }
 
-static void remove_active_exec(s4u::Actor const&, s4u::Exec const& task)
+static void remove_active_exec(s4u::Exec const& task, s4u::Actor const&)
 {
   const s4u::VirtualMachine* vm = dynamic_cast<s4u::VirtualMachine*>(task.get_host());
   if (vm != nullptr) {
index 8b37c10..7f77ab4 100644 (file)
@@ -16,9 +16,9 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous commun
 
 namespace simgrid {
 namespace s4u {
-xbt::signal<void(Actor const&)> Comm::on_sender_start;
-xbt::signal<void(Actor const&)> Comm::on_receiver_start;
-xbt::signal<void(Actor const&)> Comm::on_completion;
+xbt::signal<void(Comm const&, Actor const&)> Comm::on_sender_start;
+xbt::signal<void(Comm const&, Actor const&)> Comm::on_receiver_start;
+xbt::signal<void(Comm const&, Actor const&)> Comm::on_completion;
 
 Comm::~Comm()
 {
@@ -119,12 +119,12 @@ Comm* Comm::start()
              "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
 
   if (src_buff_ != nullptr) { // Sender side
-    on_sender_start(*Actor::self());
+    on_sender_start(*this, *Actor::self());
     pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
                                 clean_fun_, copy_data_function_, get_user_data(), detached_);
   } else if (dst_buff_ != nullptr) { // Receiver side
     xbt_assert(not detached_, "Receive cannot be detached");
-    on_receiver_start(*Actor::self());
+    on_receiver_start(*this, *Actor::self());
     pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
                                 copy_data_function_, get_user_data(), rate_);
 
@@ -160,12 +160,12 @@ Comm* Comm::wait_for(double timeout)
     case State::INITED:
     case State::STARTING: // It's not started yet. Do it in one simcall
       if (src_buff_ != nullptr) {
-        on_sender_start(*Actor::self());
+        on_sender_start(*this, *Actor::self());
         simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
                           copy_data_function_, get_user_data(), timeout);
 
       } else { // Receiver
-        on_receiver_start(*Actor::self());
+        on_receiver_start(*this, *Actor::self());
         simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
                           get_user_data(), timeout, rate_);
       }
@@ -185,7 +185,7 @@ Comm* Comm::wait_for(double timeout)
     default:
       THROW_IMPOSSIBLE;
   }
-  on_completion(*Actor::self());
+  on_completion(*this, *Actor::self());
   return this;
 }
 
index 67a5caf..704506a 100644 (file)
@@ -14,8 +14,8 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_exec, s4u_activity, "S4U asynchronous execut
 
 namespace simgrid {
 namespace s4u {
-xbt::signal<void(Actor const&, Exec const&)> Exec::on_start;
-xbt::signal<void(Actor const&, Exec const&)> Exec::on_completion;
+xbt::signal<void(Exec const&, Actor const&)> Exec::on_start;
+xbt::signal<void(Exec const&, Actor const&)> Exec::on_completion;
 
 Exec::Exec()
 {
@@ -35,7 +35,7 @@ Exec* Exec::wait_for(double timeout)
   kernel::actor::ActorImpl* issuer = Actor::self()->get_impl();
   kernel::actor::simcall_blocking<void>([this, issuer, timeout] { this->get_impl()->wait_for(issuer, timeout); });
   state_ = State::FINISHED;
-  on_completion(*Actor::self(), *this);
+  on_completion(*this, *Actor::self());
   this->release_dependencies();
   return this;
 }
@@ -185,7 +185,7 @@ Exec* Exec::start()
     pimpl_->suspend();
 
   state_ = State::STARTED;
-  on_start(*Actor::self(), *this);
+  on_start(*this, *Actor::self());
   return this;
 }
 
index 4831daf..acd9a0d 100644 (file)
@@ -12,6 +12,8 @@
 
 namespace simgrid {
 namespace s4u {
+xbt::signal<void(Io const&, Actor const&)> Io::on_start;
+xbt::signal<void(Io const&, Actor const&)> Io::on_completion;
 
 Io::Io(sg_disk_t disk, sg_size_t size, OpType type) : disk_(disk), size_(size), type_(type)
 {
@@ -49,6 +51,7 @@ Io* Io::start()
     pimpl_->suspend();
 
   state_ = State::STARTED;
+  on_start(*this, *Actor::self());
   return this;
 }
 
@@ -73,6 +76,8 @@ Io* Io::wait_for(double timeout)
   kernel::actor::simcall_blocking<void>([this, issuer, timeout] { this->get_impl()->wait_for(issuer, timeout); });
   state_ = State::FINISHED;
   this->release_dependencies();
+
+  on_completion(*this, *Actor::self());
   return this;
 }