Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
task can now pass tokens (void*) to other tasks. add example using tokens and apache...
authorAdrien Gougeon <adrien.gougeon@ens-rennes.fr>
Mon, 12 Jun 2023 15:42:42 +0000 (17:42 +0200)
committerAdrien Gougeon <adrien.gougeon@ens-rennes.fr>
Mon, 12 Jun 2023 15:42:42 +0000 (17:42 +0200)
MANIFEST.in
examples/cpp/CMakeLists.txt
examples/cpp/task-storm/s4u-task-storm.cpp [new file with mode: 0644]
examples/cpp/task-storm/s4u-task-storm.tesh [new file with mode: 0644]
include/simgrid/plugins/task.hpp
src/plugins/task.cpp

index 296c687..13eeb1e 100644 (file)
@@ -398,6 +398,8 @@ include examples/cpp/task-io/s4u-task-io.cpp
 include examples/cpp/task-io/s4u-task-io.tesh
 include examples/cpp/task-simple/s4u-task-simple.cpp
 include examples/cpp/task-simple/s4u-task-simple.tesh
+include examples/cpp/task-storm/s4u-task-storm.cpp
+include examples/cpp/task-storm/s4u-task-storm.thesh
 include examples/cpp/task-switch-host/s4u-task-switch-host.cpp
 include examples/cpp/task-switch-host/s4u-task-switch-host.tesh
 include examples/cpp/task-variable-load/s4u-task-variable-load.cpp
index dc66fc9..66187a8 100644 (file)
@@ -171,7 +171,7 @@ foreach (example activity-testany activity-waitany
                  mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
                  network-ns3 network-ns3-wifi network-wifi
                  io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
-                 task-io task-simple task-variable-load task-switch-host
+                 task-io task-simple task-variable-load task-storm task-switch-host
                  photovoltaic-simple
                  platform-comm-serialize platform-failures platform-profile platform-properties
                  plugin-host-load plugin-link-load plugin-prodcons
diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp
new file mode 100644 (file)
index 0000000..0319d17
--- /dev/null
@@ -0,0 +1,137 @@
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
+   and use them to build a simulation of a stream processing application
+
+   Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+   Spout SB produces 1e6 bytes every 200ms.  
+   
+   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
+   Bolt B3 processes data from Spout SB.
+   Bolt B4 processes data from Bolt B3.
+   
+                        Fafard
+                        ┌────┐
+                    ┌──►│ B1 │
+         Tremblay   │   └────┘
+          ┌────┐    │
+          │ SA ├────┤  Ginette
+          └────┘    │   ┌────┐
+                    └──►│ B2 │
+                        └────┘
+
+
+                       Bourassa
+         Jupiter     ┌──────────┐
+          ┌────┐     │          │
+          │ SB ├─────┤ B3 ──► B4│
+          └────┘     │          │
+                     └──────────┘     
+ */
+
+#include "simgrid/plugins/task.hpp"
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+
+struct Token {
+  double data_ = 0;
+  Token(double data) : data_(data) {}
+};
+
+int main(int argc, char* argv[])
+{
+  simgrid::s4u::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+  simgrid::plugins::Task::init();
+
+  // Retrieve hosts
+  auto tremblay = e.host_by_name("Tremblay");
+  auto jupiter  = e.host_by_name("Jupiter");
+  auto fafard  = e.host_by_name("Fafard");
+  auto ginette  = e.host_by_name("Ginette");
+  auto bourassa = e.host_by_name("Bourassa");
+
+  // Create execution tasks
+  auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+  auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+  auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard);
+  auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette);
+  auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa);
+  auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa);
+
+  // Create communication tasks
+  auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard); 
+  auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+  auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+
+  // Create the graph by defining dependencies between tasks
+  // Some dependencies are defined dynamically
+  SA_to_B1->add_successor(B1);
+  SA_to_B2->add_successor(B2);
+  SB->add_successor(SB_to_B3);
+  SB_to_B3->add_successor(B3);
+  B3->add_successor(B4);
+
+  /* Dynamic modification of the graph and bytes sent
+     Alternatively we: remove/add the link between SA and SA_to_B2
+                       add/remove the link between SA and SA_to_B1
+  */
+  SA->on_this_start_cb([&](simgrid::plugins::Task* t) {
+    int count = t->get_count();
+    simgrid::plugins::CommTaskPtr comm;
+    if (count % 2 == 0) {
+      t->remove_successor(SA_to_B2);
+      t->add_successor(SA_to_B1);
+      comm = SA_to_B1;
+    }
+    else {
+      t->remove_successor(SA_to_B1);
+      t->add_successor(SA_to_B2);
+      comm = SA_to_B2;
+    }
+    std::vector<double> amount = {1e3,1e6,1e9};
+    comm->set_amount(amount[count % 3]);
+    auto token = std::make_shared<Token>(amount[count % 3]);
+    t->set_token(token);
+  });
+
+  // The token sent by SA is forwarded by both communication tasks
+  SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
+    t->set_token(t->get_tokens()[SA]);
+  });
+  SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
+    t->set_token(t->get_tokens()[SA]);
+  });
+
+  /* B1 and B2 read the value of the token received by their predecessors
+     and use it to adapt their amount of work to do.
+  */ 
+  B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
+    auto tokens_map = t->get_tokens();
+    Token* tok = (Token*)(tokens_map[SA_to_B1].get());
+    t->set_amount(tok->data_ * 10);
+  });
+  B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
+    auto tokens_map = t->get_tokens();
+    Token* tok = (Token*)(tokens_map[SA_to_B2].get());
+    t->set_amount(tok->data_ * 10);
+  });
+
+  // Enqueue executions for tasks without predecessors
+  SA->enqueue_execs(5);
+  SB->enqueue_execs(5);
+
+  // Add a function to be called when tasks end for log purpose
+  simgrid::plugins::Task::on_end_cb([]
+  (const simgrid::plugins::Task* t) {
+    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
+  });
+
+  // Start the simulation
+  e.run();
+  return 0;
+}
diff --git a/examples/cpp/task-storm/s4u-task-storm.tesh b/examples/cpp/task-storm/s4u-task-storm.tesh
new file mode 100644 (file)
index 0000000..d7c364a
--- /dev/null
@@ -0,0 +1,38 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml
+> [0.100000] [task_storm/INFO] Task SA finished (1)
+> [0.125841] [task_storm/INFO] Task SA_to_B1 finished (1)
+> [0.125972] [task_storm/INFO] Task B1 finished (1)
+> [0.200000] [task_storm/INFO] Task SB finished (1)
+> [0.200000] [task_storm/INFO] Task SA finished (2)
+> [0.300000] [task_storm/INFO] Task SA finished (3)
+> [0.364429] [task_storm/INFO] Task SA_to_B2 finished (1)
+> [0.400000] [task_storm/INFO] Task SB finished (2)
+> [0.400000] [task_storm/INFO] Task SA finished (4)
+> [0.416759] [task_storm/INFO] Task SA_to_B2 finished (2)
+> [0.500000] [task_storm/INFO] Task SA finished (5)
+> [0.557036] [task_storm/INFO] Task SB_to_B3 finished (1)
+> [0.570648] [task_storm/INFO] Task B2 finished (1)
+> [0.570855] [task_storm/INFO] Task B2 finished (2)
+> [0.600000] [task_storm/INFO] Task SB finished (3)
+> [0.800000] [task_storm/INFO] Task SB finished (4)
+> [0.867388] [task_storm/INFO] Task SB_to_B3 finished (2)
+> [1.000000] [task_storm/INFO] Task SB finished (5)
+> [1.177739] [task_storm/INFO] Task SB_to_B3 finished (3)
+> [1.488090] [task_storm/INFO] Task SB_to_B3 finished (4)
+> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
+> [2.619232] [task_storm/INFO] Task B3 finished (1)
+> [6.743624] [task_storm/INFO] Task B3 finished (2)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
+> [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [14.992407] [task_storm/INFO] Task B3 finished (4)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
+> [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [23.241190] [task_storm/INFO] Task B4 finished (3)
+> [27.365582] [task_storm/INFO] Task B4 finished (4)
+> [31.489974] [task_storm/INFO] Task B4 finished (5)
+> [133.367321] [task_storm/INFO] Task SA_to_B1 finished (2)
+> [133.525717] [task_storm/INFO] Task SA_to_B1 finished (3)
+> [264.435791] [task_storm/INFO] Task B1 finished (2)
+> [264.566859] [task_storm/INFO] Task B1 finished (3)
index 8143333..a652cb6 100644 (file)
@@ -6,6 +6,7 @@
 #include <xbt/Extendable.hpp>
 
 #include <atomic>
+#include <deque>
 #include <map>
 #include <memory>
 #include <set>
@@ -39,6 +40,8 @@ class Task {
 protected:
   std::string name_;
   double amount_;
+  std::shared_ptr<void> token_ = NULL;
+  std::deque<std::map<TaskPtr, std::shared_ptr<void>>> tokens_received_;
   int queued_execs_ = 0;
   int count_        = 0;
   bool working_     = false;
@@ -61,6 +64,8 @@ public:
   void enqueue_execs(int n);
   void set_amount(double amount);
   double get_amount() const { return amount_; }
+  void set_token(std::shared_ptr<void> token);
+  std::map<TaskPtr, std::shared_ptr<void>> get_tokens() const;
   void add_successor(TaskPtr t);
   void remove_successor(TaskPtr t);
   void remove_all_successors();
index dca0c90..4edfc3f 100644 (file)
@@ -52,7 +52,10 @@ bool Task::ready_to_run() const
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-  predecessors_[source]++;
+  auto source_count = predecessors_[source]++;
+  if (tokens_received_.size() <= queued_execs_ + source_count)
+    tokens_received_.push_back({});
+  tokens_received_[queued_execs_ + source_count][source] = source->token_;
   bool enough_tokens = true;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
@@ -134,6 +137,25 @@ void Task::set_amount(double amount)
   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
 }
 
+/** @ingroup plugin_task
+ *  @param token The token to set.
+ *  @brief Set the token to send to successors.
+ *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<void> token)
+{
+  simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @ingroup plugin_task
+ *  @return Map of tokens received for the next execution.
+ *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::map<TaskPtr, std::shared_ptr<void>> Task::get_tokens() const
+{
+  return tokens_received_.front();
+}
+
 /** @ingroup plugin_task
  *  @param successor The Task to add.
  *  @brief Add a successor to this Task.
@@ -231,6 +253,8 @@ void ExecTask::fire()
   Task::on_start(this);
   working_          = true;
   queued_execs_     = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::ExecPtr exec = s4u::Exec::init();
   exec->set_name(name_);
   exec->set_flops_amount(amount_);
@@ -292,6 +316,8 @@ void CommTask::fire()
   Task::on_start(this);
   working_          = true;
   queued_execs_     = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
   comm->set_name(name_);
   comm->set_payload_size(amount_);
@@ -383,6 +409,8 @@ void IoTask::fire()
   Task::on_start(this);
   working_      = true;
   queued_execs_ = std::max(queued_execs_ - 1, 0);
+  if (tokens_received_.size() > 0)
+      tokens_received_.pop_front();
   s4u::IoPtr io = s4u::Io::init();
   io->set_name(name_);
   io->set_size(amount_);