Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
task can now pass tokens (void*) to other tasks. add example using tokens and apache...
[simgrid.git] / src / plugins / task.cpp
index c464f04..4edfc3f 100644 (file)
@@ -27,30 +27,13 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plug
 
 namespace simgrid::plugins {
 
+xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
+
 xbt::signal<void(Task*)> Task::on_start;
 xbt::signal<void(Task*)> Task::on_end;
 
 Task::Task(const std::string& name) : name_(name) {}
 
-/**
- *  @param predecessor The Task to add.
- *  @brief Add a predecessor to this Task.
- */
-void Task::add_predecessor(Task* predecessor)
-{
-  if (predecessors_.find(predecessor) == predecessors_.end())
-    simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_[predecessor] = 0; });
-}
-
-/**
- *  @param predecessor The Task to remove.
- *  @brief Remove a predecessor from this Task.
- */
-void Task::remove_predecessor(Task* predecessor)
-{
-  simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_.erase(predecessor); });
-}
-
 /**
  *  @brief Return True if the Task can start a new Activity.
  *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
@@ -69,21 +52,21 @@ bool Task::ready_to_run() const
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-  auto it = predecessors_.find(source);
-  simgrid::kernel::actor::simcall_answered([this, it] {
-    it->second++;
-    bool enough_tokens = true;
-    for (auto const& [key, val] : predecessors_)
-      if (val < 1) {
-        enough_tokens = false;
-        break;
-      }
-    if (enough_tokens) {
-      for (auto& [key, val] : predecessors_)
-        val--;
-      enqueue_execs(1);
+  auto source_count = predecessors_[source]++;
+  if (tokens_received_.size() <= queued_execs_ + source_count)
+    tokens_received_.push_back({});
+  tokens_received_[queued_execs_ + source_count][source] = source->token_;
+  bool enough_tokens = true;
+  for (auto const& [key, val] : predecessors_)
+    if (val < 1) {
+      enough_tokens = false;
+      break;
     }
-  });
+  if (enough_tokens) {
+    for (auto& [key, val] : predecessors_)
+      val--;
+    enqueue_execs(1);
+  }
 }
 
 /**
@@ -97,13 +80,13 @@ void Task::receive(Task* source)
  */
 void Task::complete()
 {
-  simgrid::kernel::actor::simcall_answered([this] {
-    working_ = false;
-    count_++;
-  });
-  for (auto const& end_func : end_func_handlers_)
-    end_func(this);
+  xbt_assert(s4u::Actor::is_maestro());
+  working_ = false;
+  count_++;
+  on_this_end_(this);
   Task::on_end(this);
+  if (current_activity_)
+    previous_activity_ = std::move(current_activity_);
   for (auto const& t : successors_)
     t->receive(this);
   if (ready_to_run())
@@ -116,9 +99,11 @@ void Task::complete()
  */
 void Task::init()
 {
-  if (Task::inited_)
+  static bool inited = false;
+  if (inited)
     return;
-  Task::inited_                           = true;
+
+  inited                                  = true;
   ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
   simgrid::s4u::Exec::on_completion_cb(
       [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
@@ -152,6 +137,25 @@ void Task::set_amount(double amount)
   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
 }
 
+/** @ingroup plugin_task
+ *  @param token The token to set.
+ *  @brief Set the token to send to successors.
+ *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<void> token)
+{
+  simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @ingroup plugin_task
+ *  @return Map of tokens received for the next execution.
+ *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::map<TaskPtr, std::shared_ptr<void>> Task::get_tokens() const
+{
+  return tokens_received_.front();
+}
+
 /** @ingroup plugin_task
  *  @param successor The Task to add.
  *  @brief Add a successor to this Task.
@@ -159,8 +163,10 @@ void Task::set_amount(double amount)
  */
 void Task::add_successor(TaskPtr successor)
 {
-  simgrid::kernel::actor::simcall_answered([this, successor] { successors_.insert(successor.get()); });
-  successor->add_predecessor(this);
+  simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+    successors_.insert(successor_p);
+    successor_p->predecessors_.try_emplace(this, 0);
+  });
 }
 
 /** @ingroup plugin_task
@@ -170,8 +176,10 @@ void Task::add_successor(TaskPtr successor)
  */
 void Task::remove_successor(TaskPtr successor)
 {
-  simgrid::kernel::actor::simcall_answered([this, successor] { successors_.erase(successor.get()); });
-  successor->remove_predecessor(this);
+  simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+    successor_p->predecessors_.erase(this);
+    successors_.erase(successor_p);
+  });
 }
 
 void Task::remove_all_successors()
@@ -190,9 +198,9 @@ void Task::remove_all_successors()
  *  @brief Set a function to be called before each execution.
  *  @note The function is called before the underlying Activity starts.
  */
-void Task::on_this_start(const std::function<void(Task*)>& func)
+void Task::on_this_start_cb(const std::function<void(Task*)>& func)
 {
-  simgrid::kernel::actor::simcall_answered([this, &func] { start_func_handlers_.push_back(func); });
+  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
 }
 
 /** @ingroup plugin_task
@@ -200,9 +208,9 @@ void Task::on_this_start(const std::function<void(Task*)>& func)
  *  @brief Set a function to be called after each execution.
  *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
  */
-void Task::on_this_end(const std::function<void(Task*)>& func)
+void Task::on_this_end_cb(const std::function<void(Task*)>& func)
 {
-  simgrid::kernel::actor::simcall_answered([this, &func] { end_func_handlers_.push_back(func); });
+  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
 }
 
 /** @ingroup plugin_task
@@ -241,13 +249,12 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos
  */
 void ExecTask::fire()
 {
-  for (auto const& start_func : start_func_handlers_)
-    start_func(this);
+  on_this_start_(this);
   Task::on_start(this);
-  kernel::actor::simcall_answered([this] {
-    working_      = true;
-    queued_execs_ = std::max(queued_execs_ - 1, 0);
-  });
+  working_          = true;
+  queued_execs_     = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::ExecPtr exec = s4u::Exec::init();
   exec->set_name(name_);
   exec->set_flops_amount(amount_);
@@ -255,7 +262,7 @@ void ExecTask::fire()
   exec->start();
   exec->extension_set(new ExtendedAttributeActivity());
   exec->extension<ExtendedAttributeActivity>()->task_ = this;
-  kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; });
+  current_activity_                                   = exec;
 }
 
 /** @ingroup plugin_task
@@ -305,20 +312,19 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou
  */
 void CommTask::fire()
 {
-  for (auto const& start_func : start_func_handlers_)
-    start_func(this);
+  on_this_start_(this);
   Task::on_start(this);
-  kernel::actor::simcall_answered([this] {
-    working_      = true;
-    queued_execs_ = std::max(queued_execs_ - 1, 0);
-  });
+  working_          = true;
+  queued_execs_     = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
   comm->set_name(name_);
   comm->set_payload_size(amount_);
   comm->start();
   comm->extension_set(new ExtendedAttributeActivity());
   comm->extension<ExtendedAttributeActivity>()->task_ = this;
-  kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; });
+  current_activity_                                   = comm;
 }
 
 /** @ingroup plugin_task
@@ -399,13 +405,12 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
 
 void IoTask::fire()
 {
-  for (auto const& start_func : start_func_handlers_)
-    start_func(this);
+  on_this_start_(this);
   Task::on_start(this);
-  kernel::actor::simcall_answered([this] {
-    working_      = true;
-    queued_execs_ = std::max(queued_execs_ - 1, 0);
-  });
+  working_      = true;
+  queued_execs_ = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::IoPtr io = s4u::Io::init();
   io->set_name(name_);
   io->set_size(amount_);
@@ -414,11 +419,7 @@ void IoTask::fire()
   io->start();
   io->extension_set(new ExtendedAttributeActivity());
   io->extension<ExtendedAttributeActivity>()->task_ = this;
-  kernel::actor::simcall_answered([this, io] { current_activity_ = io; });
+  current_activity_                                 = io;
 }
 
 } // namespace simgrid::plugins
-
-simgrid::xbt::Extension<simgrid::s4u::Activity, simgrid::plugins::ExtendedAttributeActivity>
-    simgrid::plugins::ExtendedAttributeActivity::EXTENSION_ID;
-bool simgrid::plugins::Task::inited_ = false;