Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Various cleanups in Task plugin.
authorArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Mon, 5 Jun 2023 20:42:27 +0000 (22:42 +0200)
committerArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Tue, 6 Jun 2023 09:59:50 +0000 (11:59 +0200)
* reduce number of calls to simcall_answered
* use xbt::signal for the "on_this_*" signals
* reduce scope for static member "inited"
* remove intrusive_ptr_* function for derived classes

examples/cpp/task-switch-host/s4u-task-switch-host.cpp
examples/python/task-switch-host/task-switch-host.py
include/simgrid/plugins/task.hpp
src/bindings/python/simgrid_python.cpp
src/plugins/task.cpp

index 27038a3..6ebca12 100644 (file)
@@ -54,7 +54,7 @@ int main(int argc, char* argv[])
   // Add a function to be called before each executions of comm0
   // This function modifies the graph of tasks by adding or removing
   // successors to comm0
-  comm0->on_this_start([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
+  comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
     auto* comm0      = dynamic_cast<simgrid::plugins::CommTask*>(t);
     static int count = 0;
     if (count % 2 == 0) {
index 3bd21e6..a95db16 100644 (file)
@@ -81,7 +81,7 @@ if __name__ == '__main__':
     # Add a function to be called before each executions of comm0
     # This function modifies the graph of tasks by adding or removing
     # successors to comm0
-    comm0.on_this_start(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
+    comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
 
     # Enqueue two executions for task exec1
     comm0.enqueue_execs(4)
index e9fa1af..bf8609d 100644 (file)
@@ -18,16 +18,10 @@ XBT_PUBLIC void intrusive_ptr_release(Task* o);
 XBT_PUBLIC void intrusive_ptr_add_ref(Task* o);
 class ExecTask;
 using ExecTaskPtr = boost::intrusive_ptr<ExecTask>;
-XBT_PUBLIC void intrusive_ptr_release(ExecTask* e);
-XBT_PUBLIC void intrusive_ptr_add_ref(ExecTask* e);
 class CommTask;
 using CommTaskPtr = boost::intrusive_ptr<CommTask>;
-XBT_PUBLIC void intrusive_ptr_release(CommTask* c);
-XBT_PUBLIC void intrusive_ptr_add_ref(CommTask* c);
 class IoTask;
 using IoTaskPtr = boost::intrusive_ptr<IoTask>;
-XBT_PUBLIC void intrusive_ptr_release(IoTask* i);
-XBT_PUBLIC void intrusive_ptr_add_ref(IoTask* i);
 
 struct ExtendedAttributeActivity {
   static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
@@ -35,12 +29,9 @@ struct ExtendedAttributeActivity {
 };
 
 class Task {
-  static bool inited_;
   std::set<Task*> successors_                 = {};
   std::map<Task*, unsigned int> predecessors_ = {};
 
-  void add_predecessor(Task* predecessor);
-  void remove_predecessor(Task* predecessor);
   bool ready_to_run() const;
   void receive(Task* source);
   void complete();
@@ -52,8 +43,8 @@ protected:
   int count_        = 0;
   bool working_     = false;
   s4u::ActivityPtr current_activity_;
-  std::vector<std::function<void(Task*)>> end_func_handlers_;
-  std::vector<std::function<void(Task*)>> start_func_handlers_;
+  xbt::signal<void(Task*)> on_this_start_;
+  xbt::signal<void(Task*)> on_this_end_;
   explicit Task(const std::string& name);
   virtual ~Task()     = default;
   virtual void fire() = 0;
@@ -73,8 +64,8 @@ public:
   void remove_successor(TaskPtr t);
   void remove_all_successors();
   const std::set<Task*>& get_successors() const { return successors_; }
-  void on_this_start(const std::function<void(Task*)>& func);
-  void on_this_end(const std::function<void(Task*)>& func);
+  void on_this_start_cb(const std::function<void(Task*)>& func);
+  void on_this_end_cb(const std::function<void(Task*)>& func);
   int get_count() const;
 
   /** Add a callback fired before a task activity start.
@@ -110,8 +101,6 @@ public:
   s4u::Host* get_host() const { return host_; }
   ExecTaskPtr set_flops(double flops);
   double get_flops() const { return get_amount(); }
-  friend void inline intrusive_ptr_release(ExecTask* e) { intrusive_ptr_release(static_cast<Task*>(e)); }
-  friend void inline intrusive_ptr_add_ref(ExecTask* e) { intrusive_ptr_add_ref(static_cast<Task*>(e)); }
 };
 
 class CommTask : public Task {
@@ -130,8 +119,6 @@ public:
   s4u::Host* get_destination() const { return destination_; }
   CommTaskPtr set_bytes(double bytes);
   double get_bytes() const { return get_amount(); }
-  friend void inline intrusive_ptr_release(CommTask* c) { intrusive_ptr_release(static_cast<Task*>(c)); }
-  friend void inline intrusive_ptr_add_ref(CommTask* c) { intrusive_ptr_add_ref(static_cast<Task*>(c)); }
 };
 
 class IoTask : public Task {
@@ -149,9 +136,6 @@ public:
   double get_bytes() { return get_amount(); }
   IoTaskPtr set_op_type(s4u::Io::OpType type);
   s4u::Io::OpType get_op_type() { return type_; }
-
-  friend void inline intrusive_ptr_release(IoTask* i) { intrusive_ptr_release(static_cast<Task*>(i)); }
-  friend void inline intrusive_ptr_add_ref(IoTask* i) { intrusive_ptr_add_ref(static_cast<Task*>(i)); }
 };
 } // namespace simgrid::plugins
 #endif
index d0ce338..5f0f335 100644 (file)
@@ -956,10 +956,10 @@ PYBIND11_MODULE(simgrid, m)
            py::call_guard<py::gil_scoped_release>(), py::arg("op"), "Remove a successor of this task.")
       .def("remove_all_successors", &Task::remove_all_successors, py::call_guard<py::gil_scoped_release>(),
            "Remove all successors of this task.")
-      .def("on_this_start", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start), py::arg("func"),
-           "Add a callback called when this task starts.")
-      .def("on_this_end", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end), py::arg("func"),
-           "Add a callback called when this task ends.")
+      .def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
+           py::arg("func"), "Add a callback called when this task starts.")
+      .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+           py::arg("func"), "Add a callback called when this task ends.")
       .def(
           "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
           "Textual representation of the Task");
index c464f04..c84f83d 100644 (file)
@@ -27,30 +27,13 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plug
 
 namespace simgrid::plugins {
 
+xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
+
 xbt::signal<void(Task*)> Task::on_start;
 xbt::signal<void(Task*)> Task::on_end;
 
 Task::Task(const std::string& name) : name_(name) {}
 
-/**
- *  @param predecessor The Task to add.
- *  @brief Add a predecessor to this Task.
- */
-void Task::add_predecessor(Task* predecessor)
-{
-  if (predecessors_.find(predecessor) == predecessors_.end())
-    simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_[predecessor] = 0; });
-}
-
-/**
- *  @param predecessor The Task to remove.
- *  @brief Remove a predecessor from this Task.
- */
-void Task::remove_predecessor(Task* predecessor)
-{
-  simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_.erase(predecessor); });
-}
-
 /**
  *  @brief Return True if the Task can start a new Activity.
  *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
@@ -69,21 +52,18 @@ bool Task::ready_to_run() const
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-  auto it = predecessors_.find(source);
-  simgrid::kernel::actor::simcall_answered([this, it] {
-    it->second++;
-    bool enough_tokens = true;
-    for (auto const& [key, val] : predecessors_)
-      if (val < 1) {
-        enough_tokens = false;
-        break;
-      }
-    if (enough_tokens) {
-      for (auto& [key, val] : predecessors_)
-        val--;
-      enqueue_execs(1);
+  predecessors_[source]++;
+  bool enough_tokens = true;
+  for (auto const& [key, val] : predecessors_)
+    if (val < 1) {
+      enough_tokens = false;
+      break;
     }
-  });
+  if (enough_tokens) {
+    for (auto& [key, val] : predecessors_)
+      val--;
+    enqueue_execs(1);
+  }
 }
 
 /**
@@ -97,12 +77,10 @@ void Task::receive(Task* source)
  */
 void Task::complete()
 {
-  simgrid::kernel::actor::simcall_answered([this] {
-    working_ = false;
-    count_++;
-  });
-  for (auto const& end_func : end_func_handlers_)
-    end_func(this);
+  xbt_assert(s4u::Actor::is_maestro());
+  working_ = false;
+  count_++;
+  on_this_end_(this);
   Task::on_end(this);
   for (auto const& t : successors_)
     t->receive(this);
@@ -116,9 +94,11 @@ void Task::complete()
  */
 void Task::init()
 {
-  if (Task::inited_)
+  static bool inited = false;
+  if (inited)
     return;
-  Task::inited_                           = true;
+
+  inited                                  = true;
   ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
   simgrid::s4u::Exec::on_completion_cb(
       [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
@@ -159,8 +139,10 @@ void Task::set_amount(double amount)
  */
 void Task::add_successor(TaskPtr successor)
 {
-  simgrid::kernel::actor::simcall_answered([this, successor] { successors_.insert(successor.get()); });
-  successor->add_predecessor(this);
+  simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+    successors_.insert(successor_p);
+    successor_p->predecessors_.try_emplace(this, 0);
+  });
 }
 
 /** @ingroup plugin_task
@@ -170,8 +152,10 @@ void Task::add_successor(TaskPtr successor)
  */
 void Task::remove_successor(TaskPtr successor)
 {
-  simgrid::kernel::actor::simcall_answered([this, successor] { successors_.erase(successor.get()); });
-  successor->remove_predecessor(this);
+  simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+    successor_p->predecessors_.erase(this);
+    successors_.erase(successor_p);
+  });
 }
 
 void Task::remove_all_successors()
@@ -190,9 +174,9 @@ void Task::remove_all_successors()
  *  @brief Set a function to be called before each execution.
  *  @note The function is called before the underlying Activity starts.
  */
-void Task::on_this_start(const std::function<void(Task*)>& func)
+void Task::on_this_start_cb(const std::function<void(Task*)>& func)
 {
-  simgrid::kernel::actor::simcall_answered([this, &func] { start_func_handlers_.push_back(func); });
+  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
 }
 
 /** @ingroup plugin_task
@@ -200,9 +184,9 @@ void Task::on_this_start(const std::function<void(Task*)>& func)
  *  @brief Set a function to be called after each execution.
  *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
  */
-void Task::on_this_end(const std::function<void(Task*)>& func)
+void Task::on_this_end_cb(const std::function<void(Task*)>& func)
 {
-  simgrid::kernel::actor::simcall_answered([this, &func] { end_func_handlers_.push_back(func); });
+  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
 }
 
 /** @ingroup plugin_task
@@ -241,13 +225,10 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos
  */
 void ExecTask::fire()
 {
-  for (auto const& start_func : start_func_handlers_)
-    start_func(this);
+  on_this_start_(this);
   Task::on_start(this);
-  kernel::actor::simcall_answered([this] {
-    working_      = true;
-    queued_execs_ = std::max(queued_execs_ - 1, 0);
-  });
+  working_          = true;
+  queued_execs_     = std::max(queued_execs_ - 1, 0);
   s4u::ExecPtr exec = s4u::Exec::init();
   exec->set_name(name_);
   exec->set_flops_amount(amount_);
@@ -255,7 +236,7 @@ void ExecTask::fire()
   exec->start();
   exec->extension_set(new ExtendedAttributeActivity());
   exec->extension<ExtendedAttributeActivity>()->task_ = this;
-  kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; });
+  current_activity_                                   = exec;
 }
 
 /** @ingroup plugin_task
@@ -305,20 +286,17 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou
  */
 void CommTask::fire()
 {
-  for (auto const& start_func : start_func_handlers_)
-    start_func(this);
+  on_this_start_(this);
   Task::on_start(this);
-  kernel::actor::simcall_answered([this] {
-    working_      = true;
-    queued_execs_ = std::max(queued_execs_ - 1, 0);
-  });
+  working_          = true;
+  queued_execs_     = std::max(queued_execs_ - 1, 0);
   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
   comm->set_name(name_);
   comm->set_payload_size(amount_);
   comm->start();
   comm->extension_set(new ExtendedAttributeActivity());
   comm->extension<ExtendedAttributeActivity>()->task_ = this;
-  kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; });
+  current_activity_                                   = comm;
 }
 
 /** @ingroup plugin_task
@@ -399,13 +377,10 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
 
 void IoTask::fire()
 {
-  for (auto const& start_func : start_func_handlers_)
-    start_func(this);
+  on_this_start_(this);
   Task::on_start(this);
-  kernel::actor::simcall_answered([this] {
-    working_      = true;
-    queued_execs_ = std::max(queued_execs_ - 1, 0);
-  });
+  working_      = true;
+  queued_execs_ = std::max(queued_execs_ - 1, 0);
   s4u::IoPtr io = s4u::Io::init();
   io->set_name(name_);
   io->set_size(amount_);
@@ -414,11 +389,7 @@ void IoTask::fire()
   io->start();
   io->extension_set(new ExtendedAttributeActivity());
   io->extension<ExtendedAttributeActivity>()->task_ = this;
-  kernel::actor::simcall_answered([this, io] { current_activity_ = io; });
+  current_activity_                                 = io;
 }
 
 } // namespace simgrid::plugins
-
-simgrid::xbt::Extension<simgrid::s4u::Activity, simgrid::plugins::ExtendedAttributeActivity>
-    simgrid::plugins::ExtendedAttributeActivity::EXTENSION_ID;
-bool simgrid::plugins::Task::inited_ = false;