Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add the possibility to increase the parallelism degree of Tasks
authorAdrien Gougeon <adrien.gougeon@ens-rennes.fr>
Thu, 14 Sep 2023 11:48:15 +0000 (13:48 +0200)
committerAdrien Gougeon <adrien.gougeon@ens-rennes.fr>
Thu, 14 Sep 2023 11:48:15 +0000 (13:48 +0200)
MANIFEST.in
examples/cpp/CMakeLists.txt
examples/cpp/task-microservice/s4u-task-microservice.cpp [new file with mode: 0644]
examples/cpp/task-microservice/s4u-task-microservice.tesh [new file with mode: 0644]
examples/cpp/task-parallelism/s4u-task-parallelism.cpp [new file with mode: 0644]
examples/cpp/task-parallelism/s4u-task-parallelism.tesh [new file with mode: 0644]
include/simgrid/s4u/Task.hpp
src/s4u/s4u_Task.cpp

index a3e6cb4..57876c4 100644 (file)
@@ -394,6 +394,10 @@ include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.cpp
 include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.tesh
 include examples/cpp/task-io/s4u-task-io.cpp
 include examples/cpp/task-io/s4u-task-io.tesh
+include examples/cpp/task-microservice/s4u-task-microservice.cpp
+include examples/cpp/task-microservice/s4u-task-microservice.tesh
+include examples/cpp/task-parallelism/s4u-task-parallelism.cpp
+include examples/cpp/task-parallelism/s4u-task-parallelism.tesh
 include examples/cpp/task-simple/s4u-task-simple.cpp
 include examples/cpp/task-simple/s4u-task-simple.tesh
 include examples/cpp/task-storm/s4u-task-storm.cpp
index 72e060f..8d35473 100644 (file)
@@ -171,7 +171,7 @@ foreach (example activityset-testany activityset-waitany activityset-waitall act
                  mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
                  network-ns3 network-ns3-wifi network-wifi
                  io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
-                 task-io task-simple task-variable-load task-storm task-switch-host
+                 task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load
                  photovoltaic-simple
                  platform-comm-serialize platform-failures platform-profile platform-properties
                  plugin-host-load plugin-jbod plugin-link-load plugin-prodcons
diff --git a/examples/cpp/task-microservice/s4u-task-microservice.cpp b/examples/cpp/task-microservice/s4u-task-microservice.cpp
new file mode 100644 (file)
index 0000000..5c449e8
--- /dev/null
@@ -0,0 +1,145 @@
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example illustrates how to use Simgrid Tasks to reproduce the workflow 2.a of the article
+ * "Automated performance prediction of microservice applications using simulation" by ClĂ©ment Courageux-Sudan et al.
+ *
+ * To build this workflow we create;
+ *   - each Execution Task
+ *   - each Communication Task
+ *   - the links between the Tasks, i.e. the graph
+ *
+ * We also increase the parallelism degree of each Task to 10.
+ *
+ * In this scenario we send 500 requests per second (RPS) to the entry point of the graph for 7 seconds,
+ * and we count the number of processed requests between 2 and 7 seconds to evaluate the number of requests processed
+ * per second.
+ */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_microservice, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+static void request_sender(sg4::TaskPtr t, int requests_per_second)
+{
+  for (int i = 0; i < requests_per_second * 7; i++) {
+    t->enqueue_firings(1);
+    sg4::this_actor::sleep_for(1.0 / requests_per_second);
+  }
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+
+  // Retrieve Hosts
+  auto pm0 = e.host_by_name("PM0");
+  auto pm1 = e.host_by_name("PM1");
+
+  // Set concurrency limit
+  pm0->set_concurrency_limit(10);
+  pm1->set_concurrency_limit(10);
+
+  // Create Exec Tasks
+  auto nginx_web_server        = sg4::ExecTask::init("nginx_web_server", 783 * pm0->get_speed() / 1e6, pm0);
+  auto compose_post_service_0  = sg4::ExecTask::init("compose_post_service_0", 682 * pm0->get_speed() / 1e6, pm0);
+  auto unique_id_service       = sg4::ExecTask::init("unique_id_service", 12 * pm0->get_speed() / 1e6, pm0);
+  auto compose_post_service_1  = sg4::ExecTask::init("compose_post_service_1", 140 * pm0->get_speed() / 1e6, pm0);
+  auto media_service           = sg4::ExecTask::init("media_service", 6 * pm1->get_speed() / 1e6, pm1);
+  auto compose_post_service_2  = sg4::ExecTask::init("compose_post_service_2", 135 * pm0->get_speed() / 1e6, pm0);
+  auto user_service            = sg4::ExecTask::init("user_service", 5 * pm0->get_speed() / 1e6, pm0);
+  auto compose_post_service_3  = sg4::ExecTask::init("compose_post_service_3", 147 * pm0->get_speed() / 1e6, pm0);
+  auto text_service_0          = sg4::ExecTask::init("text_service_0", 296 * pm0->get_speed() / 1e6, pm0);
+  auto text_service_1          = sg4::ExecTask::init("text_service_1", 350 * pm0->get_speed() / 1e6, pm0);
+  auto text_service_2          = sg4::ExecTask::init("text_service_2", 146 * pm0->get_speed() / 1e6, pm0);
+  auto user_mention_service    = sg4::ExecTask::init("user_mention_service", 934 * pm0->get_speed() / 1e6, pm0);
+  auto url_shorten_service     = sg4::ExecTask::init("url_shorten_service", 555 * pm0->get_speed() / 1e6, pm0);
+  auto compose_post_service_4  = sg4::ExecTask::init("compose_post_service_4", 138 * pm0->get_speed() / 1e6, pm0);
+  auto home_timeline_service_0 = sg4::ExecTask::init("home_timeline_service_0", 243 * pm0->get_speed() / 1e6, pm0);
+  auto social_graph_service    = sg4::ExecTask::init("home_timeline_service_0", 707 * pm0->get_speed() / 1e6, pm0);
+  auto home_timeline_service_1 = sg4::ExecTask::init("home_timeline_service_0", 7 * pm0->get_speed() / 1e6, pm0);
+  auto compose_post_service_5  = sg4::ExecTask::init("compose_post_service_5", 192 * pm0->get_speed() / 1e6, pm0);
+  auto user_timeline_service   = sg4::ExecTask::init("user_timeline_service", 913 * pm0->get_speed() / 1e6, pm0);
+  auto compose_post_service_6  = sg4::ExecTask::init("compose_post_service_6", 508 * pm0->get_speed() / 1e6, pm0);
+  auto post_storage_service    = sg4::ExecTask::init("post_storage_service", 391 * pm1->get_speed() / 1e6, pm1);
+
+  // Create Comm Tasks
+  auto compose_post_service_1_to_media_service =
+      sg4::CommTask::init("compose_post_service_1_to_media_service", 100, pm0, pm1);
+  auto media_service_to_compose_post_service_2 =
+      sg4::CommTask::init("media_service_to_compose_post_service_2", 100, pm1, pm0);
+  auto compose_post_service_6_to_post_storage_service =
+      sg4::CommTask::init("media_service_to_compose_post_service_2", 100, pm1, pm0);
+
+  // Create the graph
+  nginx_web_server->add_successor(compose_post_service_0);
+  compose_post_service_0->add_successor(unique_id_service);
+  unique_id_service->add_successor(compose_post_service_1);
+  compose_post_service_1->add_successor(compose_post_service_1_to_media_service);
+  compose_post_service_1_to_media_service->add_successor(media_service);
+  media_service->add_successor(media_service_to_compose_post_service_2);
+  media_service_to_compose_post_service_2->add_successor(compose_post_service_2);
+  compose_post_service_2->add_successor(user_service);
+  user_service->add_successor(compose_post_service_3);
+  compose_post_service_3->add_successor(text_service_0);
+  text_service_0->add_successor(text_service_1);
+  text_service_0->add_successor(text_service_2);
+  text_service_1->add_successor(user_mention_service);
+  text_service_2->add_successor(url_shorten_service);
+  user_mention_service->add_successor(compose_post_service_4);
+  compose_post_service_4->add_successor(home_timeline_service_0);
+  home_timeline_service_0->add_successor(social_graph_service);
+  social_graph_service->add_successor(home_timeline_service_1);
+  home_timeline_service_1->add_successor(compose_post_service_5);
+  compose_post_service_5->add_successor(user_timeline_service);
+  user_timeline_service->add_successor(compose_post_service_6);
+  compose_post_service_6->add_successor(compose_post_service_6_to_post_storage_service);
+  compose_post_service_6_to_post_storage_service->add_successor(post_storage_service);
+
+  // Dispatch Exec Tasks
+  std::vector<sg4::TaskPtr> exec_tasks = {nginx_web_server,
+                                          compose_post_service_0,
+                                          unique_id_service,
+                                          compose_post_service_1,
+                                          compose_post_service_1_to_media_service,
+                                          media_service,
+                                          media_service_to_compose_post_service_2,
+                                          compose_post_service_2,
+                                          user_service,
+                                          compose_post_service_3,
+                                          text_service_0,
+                                          text_service_1,
+                                          text_service_2,
+                                          user_mention_service,
+                                          url_shorten_service,
+                                          compose_post_service_4,
+                                          home_timeline_service_0,
+                                          social_graph_service,
+                                          home_timeline_service_1,
+                                          compose_post_service_5,
+                                          user_timeline_service,
+                                          compose_post_service_6,
+                                          compose_post_service_6_to_post_storage_service,
+                                          post_storage_service};
+  for (auto t : exec_tasks)
+    t->set_parallelism_degree(10);
+
+  // Create the actor that will inject requests during the simulation
+  sg4::Actor::create("request_sender", pm0, request_sender, nginx_web_server, 500);
+
+  // Add a function to be called when tasks end for log purpose
+  int requests_processed = 0;
+  sg4::Task::on_completion_cb([&e, &requests_processed](const sg4::Task* t) {
+    if (t->get_name() == "post_storage_service" and e.get_clock() < 7 and e.get_clock() > 2)
+      requests_processed++;
+  });
+
+  // Start the simulation
+  e.run();
+  XBT_INFO("Requests processed per second: %f", requests_processed / 5.0);
+  return 0;
+}
diff --git a/examples/cpp/task-microservice/s4u-task-microservice.tesh b/examples/cpp/task-microservice/s4u-task-microservice.tesh
new file mode 100644 (file)
index 0000000..9a95e44
--- /dev/null
@@ -0,0 +1,4 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-microservice ${platfdir}/three_multicore_hosts.xml
+> [7.008495] [task_microservice/INFO] Requests processed per second: 500.000000
\ No newline at end of file
diff --git a/examples/cpp/task-parallelism/s4u-task-parallelism.cpp b/examples/cpp/task-parallelism/s4u-task-parallelism.cpp
new file mode 100644 (file)
index 0000000..c9f70da
--- /dev/null
@@ -0,0 +1,51 @@
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example tests increasing and decreasing parallelism degree of Tasks.
+ * First we increase and decrease parallelism degree while the Task is idle,
+ * then we increase and decrease parallelism degree while the Task has queued firings.
+ */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_parallelism, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+static void manager(sg4::ExecTaskPtr t)
+{
+  t->set_parallelism_degree(1);
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(300);
+
+  t->set_parallelism_degree(2);
+  t->enqueue_firings(4);
+  sg4::this_actor::sleep_for(300);
+
+  t->set_parallelism_degree(1);
+  t->enqueue_firings(2);
+  sg4::this_actor::sleep_for(300);
+
+  t->enqueue_firings(11);
+  t->set_parallelism_degree(2);
+  sg4::this_actor::sleep_for(150);
+  t->set_parallelism_degree(1);
+  sg4::this_actor::sleep_for(200);
+  t->set_parallelism_degree(3);
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+  auto pm0 = e.host_by_name("PM0");
+  auto t   = sg4::ExecTask::init("exec_A", 100 * pm0->get_speed(), pm0);
+  sg4::Task::on_completion_cb(
+      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_cname(), t->get_count()); });
+  sg4::Task::on_start_cb([](const sg4::Task* t) { XBT_INFO("Task %s start", t->get_cname()); });
+  sg4::Actor::create("sender", pm0, manager, t);
+
+  e.run();
+  return 0;
+}
diff --git a/examples/cpp/task-parallelism/s4u-task-parallelism.tesh b/examples/cpp/task-parallelism/s4u-task-parallelism.tesh
new file mode 100644 (file)
index 0000000..938ad96
--- /dev/null
@@ -0,0 +1,41 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
+> [0.000000] [task_parallelism/INFO] Task exec_A start
+> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
+> [100.000000] [task_parallelism/INFO] Task exec_A start
+> [200.000000] [task_parallelism/INFO] Task exec_A finished (2)
+> [300.000000] [task_parallelism/INFO] Task exec_A start
+> [300.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
+> [400.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
+> [400.000000] [task_parallelism/INFO] Task exec_A start
+> [500.000000] [task_parallelism/INFO] Task exec_A finished (5)
+> [500.000000] [task_parallelism/INFO] Task exec_A finished (6)
+> [600.000000] [task_parallelism/INFO] Task exec_A start
+> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
+> [700.000000] [task_parallelism/INFO] Task exec_A start
+> [800.000000] [task_parallelism/INFO] Task exec_A finished (8)
+> [900.000000] [task_parallelism/INFO] Task exec_A start
+> [900.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
+> [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
+> [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1100.000000] [task_parallelism/INFO] Task exec_A finished (11)
+> [1100.000000] [task_parallelism/INFO] Task exec_A finished (12)
+> [1100.000000] [task_parallelism/INFO] Task exec_A start
+> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
+> [1200.000000] [task_parallelism/INFO] Task exec_A start
+> [1250.000000] [task_parallelism/INFO] Task exec_A start
+> [1250.000000] [task_parallelism/INFO] Task exec_A start
+> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
+> [1300.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
+> [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1400.000000] [task_parallelism/INFO] Task exec_A finished (17)
+> [1450.000000] [task_parallelism/INFO] Task exec_A finished (18)
+> [1450.000000] [task_parallelism/INFO] Task exec_A finished (19)
\ No newline at end of file
index f854adb..36e6e0d 100644 (file)
@@ -29,9 +29,10 @@ class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
 class Task {
   std::string name_;
   double amount_;
-  int queued_firings_ = 0;
-  int count_          = 0;
-  bool working_       = false;
+  int queued_firings_     = 0;
+  int count_              = 0;
+  int running_instances_  = 0;
+  int parallelism_degree_ = 1;
 
   std::set<Task*> successors_                 = {};
   std::map<Task*, unsigned int> predecessors_ = {};
@@ -42,8 +43,7 @@ class Task {
 
   std::shared_ptr<Token> token_ = nullptr;
   std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
-  ActivityPtr previous_activity_;
-  ActivityPtr current_activity_;
+  std::deque<ActivityPtr> current_activities_;
 
   inline static xbt::signal<void(Task*)> on_start;
   xbt::signal<void(Task*)> on_this_start;
@@ -57,14 +57,17 @@ protected:
   virtual void fire();
   void complete();
 
-  void set_current_activity(ActivityPtr a) { current_activity_ = a; }
+  void store_activity(ActivityPtr a) { current_activities_.push_back(a); }
 
 public:
+  void set_name(std::string name);
   const std::string& get_name() const { return name_; }
   const char* get_cname() const { return name_.c_str(); }
   void set_amount(double amount);
   double get_amount() const { return amount_; }
   int get_count() const { return count_; }
+  void set_parallelism_degree(int n);
+  int get_parallelism_degree() { return parallelism_degree_; }
 
   void set_token(std::shared_ptr<Token> token);
   std::shared_ptr<Token> get_next_token_from(TaskPtr t);
index ee75342..d899e1a 100644 (file)
@@ -33,7 +33,7 @@ Task::Task(const std::string& name) : name_(name) {}
  */
 bool Task::ready_to_run() const
 {
-  return not working_ && queued_firings_ > 0;
+  return running_instances_ < parallelism_degree_ && queued_firings_ > 0;
 }
 
 /**
@@ -75,18 +75,31 @@ void Task::receive(Task* source)
 void Task::complete()
 {
   xbt_assert(Actor::is_maestro());
-  working_ = false;
+  running_instances_--;
   count_++;
   on_this_completion(this);
   on_completion(this);
-  if (current_activity_)
-    previous_activity_ = std::move(current_activity_);
   for (auto const& t : successors_)
     t->receive(this);
   if (ready_to_run())
     fire();
 }
 
+/** @param n The new parallelism degree of the Task.
+ *  @brief Set the parallelism degree of the Task.
+ *  @note When increasing the degree the function starts new instances.
+ *        When decreasing the degree the function does NOT stop running instances.
+ */
+void Task::set_parallelism_degree(int n)
+{
+  xbt_assert(n > 0, "Parallelism degree of Tasks must be above 0.");
+  simgrid::kernel::actor::simcall_answered([this, n] {
+    parallelism_degree_ = n;
+    while (ready_to_run())
+      fire();
+  });
+}
+
 /** @param n The number of firings to enqueue.
  *  @brief Enqueue firing.
  *  @note Immediatly fire an activity if possible.
@@ -95,11 +108,19 @@ void Task::enqueue_firings(int n)
 {
   simgrid::kernel::actor::simcall_answered([this, n] {
     queued_firings_ += n;
-    if (ready_to_run())
+    while (ready_to_run())
       fire();
   });
 }
 
+/** @param name The new name to set.
+ *  @brief Set the name of the Task.
+ */
+void Task::set_name(std::string name)
+{
+  name_ = name;
+}
+
 /** @param amount The amount to set.
  *  @brief Set the amout of work to do.
  *  @note Amount in flop for ExecTask and in bytes for CommTask.
@@ -128,9 +149,12 @@ std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
 
 void Task::fire()
 {
+  if ((int)current_activities_.size() > parallelism_degree_) {
+    current_activities_.pop_front();
+  }
   on_this_start(this);
   on_start(this);
-  working_        = true;
+  running_instances_++;
   queued_firings_ = std::max(queued_firings_ - 1, 0);
   if (not tokens_received_.empty())
     tokens_received_.pop_front();
@@ -194,7 +218,7 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
 
 /**
  *  @brief Do one execution of the Task.
- *  @note Call the on_this_start() func. Set its working status as true.
+ *  @note Call the on_this_start() func.
  *  Init and start the underlying Activity.
  */
 void ExecTask::fire()
@@ -202,8 +226,8 @@ void ExecTask::fire()
   Task::fire();
   auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
   exec->start();
-  exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
-  set_current_activity(exec);
+  exec->on_this_completion_cb([this](Exec const&) { complete(); });
+  store_activity(exec);
 }
 
 /** @ingroup plugin_task
@@ -248,7 +272,7 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source,
 
 /**
  *  @brief Do one execution of the Task.
- *  @note Call the on_this_start() func. Set its working status as true.
+ *  @note Call the on_this_start() func.
  *  Init and start the underlying Activity.
  */
 void CommTask::fire()
@@ -256,8 +280,8 @@ void CommTask::fire()
   Task::fire();
   auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
   comm->start();
-  comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
-  set_current_activity(comm);
+  comm->on_this_completion_cb([this](Comm const&) { complete(); });
+  store_activity(comm);
 }
 
 /** @ingroup plugin_task
@@ -341,8 +365,8 @@ void IoTask::fire()
   Task::fire();
   auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
   io->start();
-  io->on_this_completion_cb([this](Io const&) { this->complete(); });
-  set_current_activity(io);
+  io->on_this_completion_cb([this](Io const&) { complete(); });
+  store_activity(io);
 }
 
 } // namespace simgrid::s4u