From d50d059ea12035662120f56d1ed5bd9289474193 Mon Sep 17 00:00:00 2001 From: Adrien Gougeon Date: Thu, 14 Sep 2023 13:48:15 +0200 Subject: [PATCH] add the possibility to increase the parallelism degree of Tasks --- MANIFEST.in | 4 + examples/cpp/CMakeLists.txt | 2 +- .../s4u-task-microservice.cpp | 145 ++++++++++++++++++ .../s4u-task-microservice.tesh | 4 + .../task-parallelism/s4u-task-parallelism.cpp | 51 ++++++ .../s4u-task-parallelism.tesh | 41 +++++ include/simgrid/s4u/Task.hpp | 15 +- src/s4u/s4u_Task.cpp | 52 +++++-- 8 files changed, 293 insertions(+), 21 deletions(-) create mode 100644 examples/cpp/task-microservice/s4u-task-microservice.cpp create mode 100644 examples/cpp/task-microservice/s4u-task-microservice.tesh create mode 100644 examples/cpp/task-parallelism/s4u-task-parallelism.cpp create mode 100644 examples/cpp/task-parallelism/s4u-task-parallelism.tesh diff --git a/MANIFEST.in b/MANIFEST.in index a3e6cb48c5..57876c43f0 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -394,6 +394,10 @@ include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.cpp include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.tesh include examples/cpp/task-io/s4u-task-io.cpp include examples/cpp/task-io/s4u-task-io.tesh +include examples/cpp/task-microservice/s4u-task-microservice.cpp +include examples/cpp/task-microservice/s4u-task-microservice.tesh +include examples/cpp/task-parallelism/s4u-task-parallelism.cpp +include examples/cpp/task-parallelism/s4u-task-parallelism.tesh include examples/cpp/task-simple/s4u-task-simple.cpp include examples/cpp/task-simple/s4u-task-simple.tesh include examples/cpp/task-storm/s4u-task-storm.cpp diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 72e060f429..8d35473c16 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -171,7 +171,7 @@ foreach (example activityset-testany activityset-waitany activityset-waitall act mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert network-ns3 network-ns3-wifi network-wifi io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent - task-io task-simple task-variable-load task-storm task-switch-host + task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load photovoltaic-simple platform-comm-serialize platform-failures platform-profile platform-properties plugin-host-load plugin-jbod plugin-link-load plugin-prodcons diff --git a/examples/cpp/task-microservice/s4u-task-microservice.cpp b/examples/cpp/task-microservice/s4u-task-microservice.cpp new file mode 100644 index 0000000000..5c449e8d78 --- /dev/null +++ b/examples/cpp/task-microservice/s4u-task-microservice.cpp @@ -0,0 +1,145 @@ +/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example illustrates how to use Simgrid Tasks to reproduce the workflow 2.a of the article + * "Automated performance prediction of microservice applications using simulation" by Clément Courageux-Sudan et al. + * + * To build this workflow we create; + * - each Execution Task + * - each Communication Task + * - the links between the Tasks, i.e. the graph + * + * We also increase the parallelism degree of each Task to 10. + * + * In this scenario we send 500 requests per second (RPS) to the entry point of the graph for 7 seconds, + * and we count the number of processed requests between 2 and 7 seconds to evaluate the number of requests processed + * per second. + */ + +#include "simgrid/s4u.hpp" + +XBT_LOG_NEW_DEFAULT_CATEGORY(task_microservice, "Messages specific for this s4u example"); +namespace sg4 = simgrid::s4u; + +static void request_sender(sg4::TaskPtr t, int requests_per_second) +{ + for (int i = 0; i < requests_per_second * 7; i++) { + t->enqueue_firings(1); + sg4::this_actor::sleep_for(1.0 / requests_per_second); + } +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + e.load_platform(argv[1]); + + // Retrieve Hosts + auto pm0 = e.host_by_name("PM0"); + auto pm1 = e.host_by_name("PM1"); + + // Set concurrency limit + pm0->set_concurrency_limit(10); + pm1->set_concurrency_limit(10); + + // Create Exec Tasks + auto nginx_web_server = sg4::ExecTask::init("nginx_web_server", 783 * pm0->get_speed() / 1e6, pm0); + auto compose_post_service_0 = sg4::ExecTask::init("compose_post_service_0", 682 * pm0->get_speed() / 1e6, pm0); + auto unique_id_service = sg4::ExecTask::init("unique_id_service", 12 * pm0->get_speed() / 1e6, pm0); + auto compose_post_service_1 = sg4::ExecTask::init("compose_post_service_1", 140 * pm0->get_speed() / 1e6, pm0); + auto media_service = sg4::ExecTask::init("media_service", 6 * pm1->get_speed() / 1e6, pm1); + auto compose_post_service_2 = sg4::ExecTask::init("compose_post_service_2", 135 * pm0->get_speed() / 1e6, pm0); + auto user_service = sg4::ExecTask::init("user_service", 5 * pm0->get_speed() / 1e6, pm0); + auto compose_post_service_3 = sg4::ExecTask::init("compose_post_service_3", 147 * pm0->get_speed() / 1e6, pm0); + auto text_service_0 = sg4::ExecTask::init("text_service_0", 296 * pm0->get_speed() / 1e6, pm0); + auto text_service_1 = sg4::ExecTask::init("text_service_1", 350 * pm0->get_speed() / 1e6, pm0); + auto text_service_2 = sg4::ExecTask::init("text_service_2", 146 * pm0->get_speed() / 1e6, pm0); + auto user_mention_service = sg4::ExecTask::init("user_mention_service", 934 * pm0->get_speed() / 1e6, pm0); + auto url_shorten_service = sg4::ExecTask::init("url_shorten_service", 555 * pm0->get_speed() / 1e6, pm0); + auto compose_post_service_4 = sg4::ExecTask::init("compose_post_service_4", 138 * pm0->get_speed() / 1e6, pm0); + auto home_timeline_service_0 = sg4::ExecTask::init("home_timeline_service_0", 243 * pm0->get_speed() / 1e6, pm0); + auto social_graph_service = sg4::ExecTask::init("home_timeline_service_0", 707 * pm0->get_speed() / 1e6, pm0); + auto home_timeline_service_1 = sg4::ExecTask::init("home_timeline_service_0", 7 * pm0->get_speed() / 1e6, pm0); + auto compose_post_service_5 = sg4::ExecTask::init("compose_post_service_5", 192 * pm0->get_speed() / 1e6, pm0); + auto user_timeline_service = sg4::ExecTask::init("user_timeline_service", 913 * pm0->get_speed() / 1e6, pm0); + auto compose_post_service_6 = sg4::ExecTask::init("compose_post_service_6", 508 * pm0->get_speed() / 1e6, pm0); + auto post_storage_service = sg4::ExecTask::init("post_storage_service", 391 * pm1->get_speed() / 1e6, pm1); + + // Create Comm Tasks + auto compose_post_service_1_to_media_service = + sg4::CommTask::init("compose_post_service_1_to_media_service", 100, pm0, pm1); + auto media_service_to_compose_post_service_2 = + sg4::CommTask::init("media_service_to_compose_post_service_2", 100, pm1, pm0); + auto compose_post_service_6_to_post_storage_service = + sg4::CommTask::init("media_service_to_compose_post_service_2", 100, pm1, pm0); + + // Create the graph + nginx_web_server->add_successor(compose_post_service_0); + compose_post_service_0->add_successor(unique_id_service); + unique_id_service->add_successor(compose_post_service_1); + compose_post_service_1->add_successor(compose_post_service_1_to_media_service); + compose_post_service_1_to_media_service->add_successor(media_service); + media_service->add_successor(media_service_to_compose_post_service_2); + media_service_to_compose_post_service_2->add_successor(compose_post_service_2); + compose_post_service_2->add_successor(user_service); + user_service->add_successor(compose_post_service_3); + compose_post_service_3->add_successor(text_service_0); + text_service_0->add_successor(text_service_1); + text_service_0->add_successor(text_service_2); + text_service_1->add_successor(user_mention_service); + text_service_2->add_successor(url_shorten_service); + user_mention_service->add_successor(compose_post_service_4); + compose_post_service_4->add_successor(home_timeline_service_0); + home_timeline_service_0->add_successor(social_graph_service); + social_graph_service->add_successor(home_timeline_service_1); + home_timeline_service_1->add_successor(compose_post_service_5); + compose_post_service_5->add_successor(user_timeline_service); + user_timeline_service->add_successor(compose_post_service_6); + compose_post_service_6->add_successor(compose_post_service_6_to_post_storage_service); + compose_post_service_6_to_post_storage_service->add_successor(post_storage_service); + + // Dispatch Exec Tasks + std::vector exec_tasks = {nginx_web_server, + compose_post_service_0, + unique_id_service, + compose_post_service_1, + compose_post_service_1_to_media_service, + media_service, + media_service_to_compose_post_service_2, + compose_post_service_2, + user_service, + compose_post_service_3, + text_service_0, + text_service_1, + text_service_2, + user_mention_service, + url_shorten_service, + compose_post_service_4, + home_timeline_service_0, + social_graph_service, + home_timeline_service_1, + compose_post_service_5, + user_timeline_service, + compose_post_service_6, + compose_post_service_6_to_post_storage_service, + post_storage_service}; + for (auto t : exec_tasks) + t->set_parallelism_degree(10); + + // Create the actor that will inject requests during the simulation + sg4::Actor::create("request_sender", pm0, request_sender, nginx_web_server, 500); + + // Add a function to be called when tasks end for log purpose + int requests_processed = 0; + sg4::Task::on_completion_cb([&e, &requests_processed](const sg4::Task* t) { + if (t->get_name() == "post_storage_service" and e.get_clock() < 7 and e.get_clock() > 2) + requests_processed++; + }); + + // Start the simulation + e.run(); + XBT_INFO("Requests processed per second: %f", requests_processed / 5.0); + return 0; +} diff --git a/examples/cpp/task-microservice/s4u-task-microservice.tesh b/examples/cpp/task-microservice/s4u-task-microservice.tesh new file mode 100644 index 0000000000..9a95e4433e --- /dev/null +++ b/examples/cpp/task-microservice/s4u-task-microservice.tesh @@ -0,0 +1,4 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-task-microservice ${platfdir}/three_multicore_hosts.xml +> [7.008495] [task_microservice/INFO] Requests processed per second: 500.000000 \ No newline at end of file diff --git a/examples/cpp/task-parallelism/s4u-task-parallelism.cpp b/examples/cpp/task-parallelism/s4u-task-parallelism.cpp new file mode 100644 index 0000000000..c9f70dafb8 --- /dev/null +++ b/examples/cpp/task-parallelism/s4u-task-parallelism.cpp @@ -0,0 +1,51 @@ +/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example tests increasing and decreasing parallelism degree of Tasks. + * First we increase and decrease parallelism degree while the Task is idle, + * then we increase and decrease parallelism degree while the Task has queued firings. + */ + +#include "simgrid/s4u.hpp" + +XBT_LOG_NEW_DEFAULT_CATEGORY(task_parallelism, "Messages specific for this s4u example"); +namespace sg4 = simgrid::s4u; + +static void manager(sg4::ExecTaskPtr t) +{ + t->set_parallelism_degree(1); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(300); + + t->set_parallelism_degree(2); + t->enqueue_firings(4); + sg4::this_actor::sleep_for(300); + + t->set_parallelism_degree(1); + t->enqueue_firings(2); + sg4::this_actor::sleep_for(300); + + t->enqueue_firings(11); + t->set_parallelism_degree(2); + sg4::this_actor::sleep_for(150); + t->set_parallelism_degree(1); + sg4::this_actor::sleep_for(200); + t->set_parallelism_degree(3); +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + e.load_platform(argv[1]); + auto pm0 = e.host_by_name("PM0"); + auto t = sg4::ExecTask::init("exec_A", 100 * pm0->get_speed(), pm0); + sg4::Task::on_completion_cb( + [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_cname(), t->get_count()); }); + sg4::Task::on_start_cb([](const sg4::Task* t) { XBT_INFO("Task %s start", t->get_cname()); }); + sg4::Actor::create("sender", pm0, manager, t); + + e.run(); + return 0; +} diff --git a/examples/cpp/task-parallelism/s4u-task-parallelism.tesh b/examples/cpp/task-parallelism/s4u-task-parallelism.tesh new file mode 100644 index 0000000000..938ad9677f --- /dev/null +++ b/examples/cpp/task-parallelism/s4u-task-parallelism.tesh @@ -0,0 +1,41 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml +> [0.000000] [task_parallelism/INFO] Task exec_A start +> [100.000000] [task_parallelism/INFO] Task exec_A finished (1) +> [100.000000] [task_parallelism/INFO] Task exec_A start +> [200.000000] [task_parallelism/INFO] Task exec_A finished (2) +> [300.000000] [task_parallelism/INFO] Task exec_A start +> [300.000000] [task_parallelism/INFO] Task exec_A start +> [400.000000] [task_parallelism/INFO] Task exec_A finished (3) +> [400.000000] [task_parallelism/INFO] Task exec_A start +> [400.000000] [task_parallelism/INFO] Task exec_A finished (4) +> [400.000000] [task_parallelism/INFO] Task exec_A start +> [500.000000] [task_parallelism/INFO] Task exec_A finished (5) +> [500.000000] [task_parallelism/INFO] Task exec_A finished (6) +> [600.000000] [task_parallelism/INFO] Task exec_A start +> [700.000000] [task_parallelism/INFO] Task exec_A finished (7) +> [700.000000] [task_parallelism/INFO] Task exec_A start +> [800.000000] [task_parallelism/INFO] Task exec_A finished (8) +> [900.000000] [task_parallelism/INFO] Task exec_A start +> [900.000000] [task_parallelism/INFO] Task exec_A start +> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9) +> [1000.000000] [task_parallelism/INFO] Task exec_A start +> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10) +> [1000.000000] [task_parallelism/INFO] Task exec_A start +> [1100.000000] [task_parallelism/INFO] Task exec_A finished (11) +> [1100.000000] [task_parallelism/INFO] Task exec_A finished (12) +> [1100.000000] [task_parallelism/INFO] Task exec_A start +> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13) +> [1200.000000] [task_parallelism/INFO] Task exec_A start +> [1250.000000] [task_parallelism/INFO] Task exec_A start +> [1250.000000] [task_parallelism/INFO] Task exec_A start +> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14) +> [1300.000000] [task_parallelism/INFO] Task exec_A start +> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15) +> [1350.000000] [task_parallelism/INFO] Task exec_A start +> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16) +> [1350.000000] [task_parallelism/INFO] Task exec_A start +> [1400.000000] [task_parallelism/INFO] Task exec_A finished (17) +> [1450.000000] [task_parallelism/INFO] Task exec_A finished (18) +> [1450.000000] [task_parallelism/INFO] Task exec_A finished (19) \ No newline at end of file diff --git a/include/simgrid/s4u/Task.hpp b/include/simgrid/s4u/Task.hpp index f854adb86d..36e6e0dbd2 100644 --- a/include/simgrid/s4u/Task.hpp +++ b/include/simgrid/s4u/Task.hpp @@ -29,9 +29,10 @@ class XBT_PUBLIC Token : public xbt::Extendable {}; class Task { std::string name_; double amount_; - int queued_firings_ = 0; - int count_ = 0; - bool working_ = false; + int queued_firings_ = 0; + int count_ = 0; + int running_instances_ = 0; + int parallelism_degree_ = 1; std::set successors_ = {}; std::map predecessors_ = {}; @@ -42,8 +43,7 @@ class Task { std::shared_ptr token_ = nullptr; std::deque>> tokens_received_; - ActivityPtr previous_activity_; - ActivityPtr current_activity_; + std::deque current_activities_; inline static xbt::signal on_start; xbt::signal on_this_start; @@ -57,14 +57,17 @@ protected: virtual void fire(); void complete(); - void set_current_activity(ActivityPtr a) { current_activity_ = a; } + void store_activity(ActivityPtr a) { current_activities_.push_back(a); } public: + void set_name(std::string name); const std::string& get_name() const { return name_; } const char* get_cname() const { return name_.c_str(); } void set_amount(double amount); double get_amount() const { return amount_; } int get_count() const { return count_; } + void set_parallelism_degree(int n); + int get_parallelism_degree() { return parallelism_degree_; } void set_token(std::shared_ptr token); std::shared_ptr get_next_token_from(TaskPtr t); diff --git a/src/s4u/s4u_Task.cpp b/src/s4u/s4u_Task.cpp index ee75342eae..d899e1acca 100644 --- a/src/s4u/s4u_Task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -33,7 +33,7 @@ Task::Task(const std::string& name) : name_(name) {} */ bool Task::ready_to_run() const { - return not working_ && queued_firings_ > 0; + return running_instances_ < parallelism_degree_ && queued_firings_ > 0; } /** @@ -75,18 +75,31 @@ void Task::receive(Task* source) void Task::complete() { xbt_assert(Actor::is_maestro()); - working_ = false; + running_instances_--; count_++; on_this_completion(this); on_completion(this); - if (current_activity_) - previous_activity_ = std::move(current_activity_); for (auto const& t : successors_) t->receive(this); if (ready_to_run()) fire(); } +/** @param n The new parallelism degree of the Task. + * @brief Set the parallelism degree of the Task. + * @note When increasing the degree the function starts new instances. + * When decreasing the degree the function does NOT stop running instances. + */ +void Task::set_parallelism_degree(int n) +{ + xbt_assert(n > 0, "Parallelism degree of Tasks must be above 0."); + simgrid::kernel::actor::simcall_answered([this, n] { + parallelism_degree_ = n; + while (ready_to_run()) + fire(); + }); +} + /** @param n The number of firings to enqueue. * @brief Enqueue firing. * @note Immediatly fire an activity if possible. @@ -95,11 +108,19 @@ void Task::enqueue_firings(int n) { simgrid::kernel::actor::simcall_answered([this, n] { queued_firings_ += n; - if (ready_to_run()) + while (ready_to_run()) fire(); }); } +/** @param name The new name to set. + * @brief Set the name of the Task. + */ +void Task::set_name(std::string name) +{ + name_ = name; +} + /** @param amount The amount to set. * @brief Set the amout of work to do. * @note Amount in flop for ExecTask and in bytes for CommTask. @@ -128,9 +149,12 @@ std::shared_ptr Task::get_next_token_from(TaskPtr t) void Task::fire() { + if ((int)current_activities_.size() > parallelism_degree_) { + current_activities_.pop_front(); + } on_this_start(this); on_start(this); - working_ = true; + running_instances_++; queued_firings_ = std::max(queued_firings_ - 1, 0); if (not tokens_received_.empty()) tokens_received_.pop_front(); @@ -194,7 +218,7 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) /** * @brief Do one execution of the Task. - * @note Call the on_this_start() func. Set its working status as true. + * @note Call the on_this_start() func. * Init and start the underlying Activity. */ void ExecTask::fire() @@ -202,8 +226,8 @@ void ExecTask::fire() Task::fire(); auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_); exec->start(); - exec->on_this_completion_cb([this](Exec const&) { this->complete(); }); - set_current_activity(exec); + exec->on_this_completion_cb([this](Exec const&) { complete(); }); + store_activity(exec); } /** @ingroup plugin_task @@ -248,7 +272,7 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, /** * @brief Do one execution of the Task. - * @note Call the on_this_start() func. Set its working status as true. + * @note Call the on_this_start() func. * Init and start the underlying Activity. */ void CommTask::fire() @@ -256,8 +280,8 @@ void CommTask::fire() Task::fire(); auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); comm->start(); - comm->on_this_completion_cb([this](Comm const&) { this->complete(); }); - set_current_activity(comm); + comm->on_this_completion_cb([this](Comm const&) { complete(); }); + store_activity(comm); } /** @ingroup plugin_task @@ -341,8 +365,8 @@ void IoTask::fire() Task::fire(); auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); io->start(); - io->on_this_completion_cb([this](Io const&) { this->complete(); }); - set_current_activity(io); + io->on_this_completion_cb([this](Io const&) { complete(); }); + store_activity(io); } } // namespace simgrid::s4u -- 2.20.1