Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix all Task examples to support the new Task format with dispatcher and collector
authorAdrien Gougeon <adrien.gougeon@ens-rennes.fr>
Mon, 2 Oct 2023 14:03:23 +0000 (16:03 +0200)
committerAdrien Gougeon <adrien.gougeon@ens-rennes.fr>
Mon, 2 Oct 2023 14:03:23 +0000 (16:03 +0200)
12 files changed:
examples/cpp/task-parallelism/s4u-task-parallelism.tesh
examples/cpp/task-storm/s4u-task-storm.cpp
examples/cpp/task-storm/s4u-task-storm.tesh
examples/cpp/task-switch-host/s4u-task-switch-host.cpp
examples/python/task-io/task-io.py
examples/python/task-simple/task-simple.py
examples/python/task-simple/task-simple.tesh
examples/python/task-switch-host/task-switch-host.py
examples/python/task-variable-load/task-variable-load.py
include/simgrid/s4u/Task.hpp
src/bindings/python/simgrid_python.cpp
src/s4u/s4u_Task.cpp

index 938ad96..492d015 100644 (file)
@@ -2,40 +2,40 @@
 
 $ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
 > [0.000000] [task_parallelism/INFO] Task exec_A start
-> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
 > [100.000000] [task_parallelism/INFO] Task exec_A start
+> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
 > [200.000000] [task_parallelism/INFO] Task exec_A finished (2)
 > [300.000000] [task_parallelism/INFO] Task exec_A start
 > [300.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
 > [400.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
 > [400.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
 > [500.000000] [task_parallelism/INFO] Task exec_A finished (5)
 > [500.000000] [task_parallelism/INFO] Task exec_A finished (6)
 > [600.000000] [task_parallelism/INFO] Task exec_A start
-> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
 > [700.000000] [task_parallelism/INFO] Task exec_A start
+> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
 > [800.000000] [task_parallelism/INFO] Task exec_A finished (8)
 > [900.000000] [task_parallelism/INFO] Task exec_A start
 > [900.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
 > [1000.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
 > [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
+> [1100.000000] [task_parallelism/INFO] Task exec_A start
 > [1100.000000] [task_parallelism/INFO] Task exec_A finished (11)
 > [1100.000000] [task_parallelism/INFO] Task exec_A finished (12)
-> [1100.000000] [task_parallelism/INFO] Task exec_A start
-> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
 > [1200.000000] [task_parallelism/INFO] Task exec_A start
+> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
 > [1250.000000] [task_parallelism/INFO] Task exec_A start
 > [1250.000000] [task_parallelism/INFO] Task exec_A start
-> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
 > [1300.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
 > [1350.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
 > [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
 > [1400.000000] [task_parallelism/INFO] Task exec_A finished (17)
 > [1450.000000] [task_parallelism/INFO] Task exec_A finished (18)
 > [1450.000000] [task_parallelism/INFO] Task exec_A finished (19)
\ No newline at end of file
index 58d08c3..0a0ab71 100644 (file)
@@ -74,10 +74,10 @@ int main(int argc, char* argv[])
      Alternatively we: remove/add the link between SA and SA_to_B2
                        add/remove the link between SA and SA_to_B1
   */
-  SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+  SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) {
     int count = t->get_count();
     sg4::CommTaskPtr comm;
-    if (count % 2 == 0) {
+    if (count % 2 == 1) {
       t->remove_successor(SA_to_B2);
       t->add_successor(SA_to_B1);
       comm = SA_to_B1;
@@ -86,7 +86,8 @@ int main(int argc, char* argv[])
       t->add_successor(SA_to_B2);
       comm = SA_to_B2;
     }
-    std::vector<double> amount = {1e3, 1e6, 1e9};
+    std::vector<double> amount = {1e9, 1e3, 1e6};
+    // XBT_INFO("Comm %f", amount[count % 3]);
     comm->set_amount(amount[count % 3]);
     auto token = std::make_shared<sg4::Token>();
     token->set_data(new double(amount[count % 3]));
@@ -94,18 +95,26 @@ int main(int argc, char* argv[])
   });
 
   // The token sent by SA is forwarded by both communication tasks
-  SA_to_B1->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
-  SA_to_B2->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+  SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
+  });
+  SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
+  });
 
   /* B1 and B2 read the value of the token received by their predecessors
      and use it to adapt their amount of work to do.
   */
-  B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
-    auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+  B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B1)->get_data<double>();
+    t->deque_token_from(SA_to_B1);
     t->set_amount(*data * 10);
   });
-  B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
-    auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+  B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B2)->get_data<double>();
+    t->deque_token_from(SA_to_B2);
     t->set_amount(*data * 10);
   });
 
index d7c364a..376dc31 100644 (file)
@@ -24,11 +24,11 @@ $ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml
 > [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
 > [2.619232] [task_storm/INFO] Task B3 finished (1)
 > [6.743624] [task_storm/INFO] Task B3 finished (2)
-> [10.868015] [task_storm/INFO] Task B3 finished (3)
 > [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
 > [14.992407] [task_storm/INFO] Task B3 finished (4)
-> [19.116799] [task_storm/INFO] Task B3 finished (5)
 > [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
 > [23.241190] [task_storm/INFO] Task B4 finished (3)
 > [27.365582] [task_storm/INFO] Task B4 finished (4)
 > [31.489974] [task_storm/INFO] Task B4 finished (5)
index b007523..6f8c62b 100644 (file)
@@ -54,12 +54,19 @@ int main(int argc, char* argv[])
   // successors to comm0
   comm0->on_this_start_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) {
     static int count = 0;
-    if (count % 2 == 0) {
+    if (count % 2 == 0)
       comm0->set_destination(jupiter);
+    else
+      comm0->set_destination(fafard);
+    count++;
+  });
+
+  comm0->on_this_completion_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) {
+    static int count = 0;
+    if (count % 2 == 0) {
       comm0->add_successor(exec1);
       comm0->remove_successor(exec2);
     } else {
-      comm0->set_destination(fafard);
       comm0->add_successor(exec2);
       comm0->remove_successor(exec1);
     }
index e75215a..d3ab8c1 100644 (file)
@@ -18,7 +18,7 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
 if __name__ == '__main__':
     args = parse()
index 23e9fc0..beca2b6 100644 (file)
@@ -28,7 +28,7 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
 if __name__ == '__main__':
     args = parse()
index 5a27a53..f9a828f 100644 (file)
@@ -5,5 +5,5 @@ $ ${pythoncmd:=python3} ${PYTHON_TOOL_OPTIONS:=} ${srcdir:=.}/task-simple.py --p
 > [11.714617112501687] CommTask(comm) finished (1)
 > [20.388399000968448] ExecTask(exec1) finished (2)
 > [21.90881661298591] CommTask(comm) finished (2)
-> [24.82146412938331] ExecTask(exec2) finished (1)
-> [37.92831114626493] ExecTask(exec2) finished (2)
+> [24.821464129383305] ExecTask(exec2) finished (1)
+> [37.928311146264925] ExecTask(exec2) finished (2)
index 5be8922..03dce6a 100644 (file)
@@ -44,12 +44,16 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
-def switch(t, hosts, execs):
-    comm0.destination = hosts[t.count % 2]
-    comm0.remove_successor(execs[t.count % 2 - 1])
-    comm0.add_successor(execs[t.count % 2])
+def switch_destination(t, hosts):
+    t.destination = hosts[switch_destination.count % 2]
+    switch_destination.count += 1
+switch_destination.count = 0
+
+def switch_successor(t, execs):
+    t.remove_successor(execs[t.get_count() % 2])
+    t.add_successor(execs[t.get_count() % 2 - 1])
 
 if __name__ == '__main__':
     args = parse()
@@ -74,13 +78,16 @@ if __name__ == '__main__':
     exec1.add_successor(comm1)
     exec2.add_successor(comm2)
 
-    # Add a function to be called when tasks end for log purpose
+    # Add a callback when tasks end for log purpose
     Task.on_completion_cb(callback)
 
-    # Add a function to be called before each firing of comm0
-    # This function modifies the graph of tasks by adding or removing
-    # successors to comm0
-    comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
+    # Add a callback before each firing of comm0
+    # It switches the destination of comm0
+    comm0.on_this_start_cb(lambda t: switch_destination(t, [jupiter, fafard]))
+
+    # Add a callback before comm0 send tokens to successors
+    # It switches the successor of comm0
+    comm0.on_this_completion_cb(lambda t: switch_successor(t, [exec1,exec2]))
 
     # Enqueue two firings for task exec1
     comm0.enqueue_firings(4)
index 51dbc1a..14fc2ec 100644 (file)
@@ -28,7 +28,7 @@ def parse():
     return parser.parse_args()
 
 def callback(t):
-    print(f'[{Engine.clock}] {t} finished ({t.count})')
+    print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
 
 def variable_load(t):
     print('--- Small load ---')
index ea144e2..1fe1813 100644 (file)
@@ -48,7 +48,7 @@ class Task {
   void receive(Task* source);
 
   std::shared_ptr<Token> token_ = nullptr;
-  std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+  std::map<TaskPtr, std::deque<std::shared_ptr<Token>>> tokens_received_;
   std::map<std::string, std::deque<ActivityPtr>> current_activities_ = {
       {"instance_0", {}}, {"dispatcher", {}}, {"collector", {}}};
 
@@ -83,7 +83,9 @@ public:
   void set_load_balancing_function(std::function<std::string()> func);
 
   void set_token(std::shared_ptr<Token> token);
-  std::shared_ptr<Token> get_next_token_from(TaskPtr t) const { return tokens_received_.front().at(t); }
+  std::shared_ptr<Token> get_token_from(TaskPtr t) const { return tokens_received_.at(t).front(); }
+  std::deque<std::shared_ptr<Token>> get_tokens_from(TaskPtr t) const { return tokens_received_.at(t); }
+  void deque_token_from(TaskPtr t);
 
   void add_successor(TaskPtr t);
   void remove_successor(TaskPtr t);
index 792ae4b..2f2ef70 100644 (file)
@@ -861,9 +861,14 @@ PYBIND11_MODULE(simgrid, m)
           },
           "Add a callback called when each task ends.")
       .def_property_readonly("name", &Task::get_name, "The name of this task (read-only).")
-      .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
       .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
       .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
+      .def(
+          "get_count", [](const TaskPtr t) { return t->get_count("instance_0"); },
+          "The execution count of this task instance_0.")
+      .def(
+          "get_count", [](const TaskPtr t, const std::string& instance) { return t->get_count(instance); },
+          "The execution count of this task instance.")
       .def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
            py::arg("n"), "Enqueue firings for this task.")
       .def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
index e6dbe06..ca72bfb 100644 (file)
@@ -51,10 +51,8 @@ void Task::receive(Task* source)
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
   auto source_count = predecessors_[source];
   predecessors_[source]++;
-  if (tokens_received_.size() <= queued_firings_["dispatcher"] + source_count)
-    tokens_received_.emplace_back();
-  tokens_received_[queued_firings_["dispatcher"] + source_count][source] = source->token_;
-  bool enough_tokens                                                     = true;
+  tokens_received_[source].push_back(source->token_);
+  bool enough_tokens = true;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
       enough_tokens = false;
@@ -82,6 +80,7 @@ void Task::complete(std::string instance)
   running_instances_[instance] = running_instances_[instance] - 1;
   count_[instance]             = count_[instance] + 1;
   if (instance == "collector") {
+    // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str());
     on_this_completion(this);
     on_completion(this);
     for (auto const& t : successors_)
@@ -167,26 +166,29 @@ void Task::set_amount(double amount, std::string instance)
 
 /** @param token The token to set.
  *  @brief Set the token to send to successors.
- *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ *  @note The token is passed to each successor after the task end, i.e., after the on_completion callback.
  */
 void Task::set_token(std::shared_ptr<Token> token)
 {
   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
 }
 
+void Task::deque_token_from(TaskPtr t)
+{
+  simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_.at(t).pop_front(); });
+}
+
 void Task::fire(std::string instance)
 {
   if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
     current_activities_[instance].pop_front();
   }
-  if (instance == "dispatcher") {
+  if (instance != "dispatcher" and instance != "collector") {
     on_this_start(this);
     on_start(this);
   }
   running_instances_[instance]++;
   queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
-  if (not tokens_received_.empty())
-    tokens_received_.pop_front();
 }
 
 /** @param successor The Task to add.