include examples/cpp/synchro-semaphore/s4u-synchro-semaphore.tesh
include examples/cpp/task-io/s4u-task-io.cpp
include examples/cpp/task-io/s4u-task-io.tesh
+include examples/cpp/task-microservice/s4u-task-microservice.cpp
+include examples/cpp/task-microservice/s4u-task-microservice.tesh
+include examples/cpp/task-parallelism/s4u-task-parallelism.cpp
+include examples/cpp/task-parallelism/s4u-task-parallelism.tesh
include examples/cpp/task-simple/s4u-task-simple.cpp
include examples/cpp/task-simple/s4u-task-simple.tesh
include examples/cpp/task-storm/s4u-task-storm.cpp
mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
network-ns3 network-ns3-wifi network-wifi
io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
- task-io task-simple task-variable-load task-storm task-switch-host
+ task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load
photovoltaic-simple
platform-comm-serialize platform-failures platform-profile platform-properties
plugin-host-load plugin-jbod plugin-link-load plugin-prodcons
--- /dev/null
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example illustrates how to use Simgrid Tasks to reproduce the workflow 2.a of the article
+ * "Automated performance prediction of microservice applications using simulation" by Clément Courageux-Sudan et al.
+ *
+ * To build this workflow we create;
+ * - each Execution Task
+ * - each Communication Task
+ * - the links between the Tasks, i.e. the graph
+ *
+ * We also increase the parallelism degree of each Task to 10.
+ *
+ * In this scenario we send 500 requests per second (RPS) to the entry point of the graph for 7 seconds,
+ * and we count the number of processed requests between 2 and 7 seconds to evaluate the number of requests processed
+ * per second.
+ */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_microservice, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+static void request_sender(sg4::TaskPtr t, int requests_per_second)
+{
+ for (int i = 0; i < requests_per_second * 7; i++) {
+ t->enqueue_firings(1);
+ sg4::this_actor::sleep_for(1.0 / requests_per_second);
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+
+ // Retrieve Hosts
+ auto pm0 = e.host_by_name("PM0");
+ auto pm1 = e.host_by_name("PM1");
+
+ // Set concurrency limit
+ pm0->set_concurrency_limit(10);
+ pm1->set_concurrency_limit(10);
+
+ // Create Exec Tasks
+ auto nginx_web_server = sg4::ExecTask::init("nginx_web_server", 783 * pm0->get_speed() / 1e6, pm0);
+ auto compose_post_service_0 = sg4::ExecTask::init("compose_post_service_0", 682 * pm0->get_speed() / 1e6, pm0);
+ auto unique_id_service = sg4::ExecTask::init("unique_id_service", 12 * pm0->get_speed() / 1e6, pm0);
+ auto compose_post_service_1 = sg4::ExecTask::init("compose_post_service_1", 140 * pm0->get_speed() / 1e6, pm0);
+ auto media_service = sg4::ExecTask::init("media_service", 6 * pm1->get_speed() / 1e6, pm1);
+ auto compose_post_service_2 = sg4::ExecTask::init("compose_post_service_2", 135 * pm0->get_speed() / 1e6, pm0);
+ auto user_service = sg4::ExecTask::init("user_service", 5 * pm0->get_speed() / 1e6, pm0);
+ auto compose_post_service_3 = sg4::ExecTask::init("compose_post_service_3", 147 * pm0->get_speed() / 1e6, pm0);
+ auto text_service_0 = sg4::ExecTask::init("text_service_0", 296 * pm0->get_speed() / 1e6, pm0);
+ auto text_service_1 = sg4::ExecTask::init("text_service_1", 350 * pm0->get_speed() / 1e6, pm0);
+ auto text_service_2 = sg4::ExecTask::init("text_service_2", 146 * pm0->get_speed() / 1e6, pm0);
+ auto user_mention_service = sg4::ExecTask::init("user_mention_service", 934 * pm0->get_speed() / 1e6, pm0);
+ auto url_shorten_service = sg4::ExecTask::init("url_shorten_service", 555 * pm0->get_speed() / 1e6, pm0);
+ auto compose_post_service_4 = sg4::ExecTask::init("compose_post_service_4", 138 * pm0->get_speed() / 1e6, pm0);
+ auto home_timeline_service_0 = sg4::ExecTask::init("home_timeline_service_0", 243 * pm0->get_speed() / 1e6, pm0);
+ auto social_graph_service = sg4::ExecTask::init("home_timeline_service_0", 707 * pm0->get_speed() / 1e6, pm0);
+ auto home_timeline_service_1 = sg4::ExecTask::init("home_timeline_service_0", 7 * pm0->get_speed() / 1e6, pm0);
+ auto compose_post_service_5 = sg4::ExecTask::init("compose_post_service_5", 192 * pm0->get_speed() / 1e6, pm0);
+ auto user_timeline_service = sg4::ExecTask::init("user_timeline_service", 913 * pm0->get_speed() / 1e6, pm0);
+ auto compose_post_service_6 = sg4::ExecTask::init("compose_post_service_6", 508 * pm0->get_speed() / 1e6, pm0);
+ auto post_storage_service = sg4::ExecTask::init("post_storage_service", 391 * pm1->get_speed() / 1e6, pm1);
+
+ // Create Comm Tasks
+ auto compose_post_service_1_to_media_service =
+ sg4::CommTask::init("compose_post_service_1_to_media_service", 100, pm0, pm1);
+ auto media_service_to_compose_post_service_2 =
+ sg4::CommTask::init("media_service_to_compose_post_service_2", 100, pm1, pm0);
+ auto compose_post_service_6_to_post_storage_service =
+ sg4::CommTask::init("media_service_to_compose_post_service_2", 100, pm1, pm0);
+
+ // Create the graph
+ nginx_web_server->add_successor(compose_post_service_0);
+ compose_post_service_0->add_successor(unique_id_service);
+ unique_id_service->add_successor(compose_post_service_1);
+ compose_post_service_1->add_successor(compose_post_service_1_to_media_service);
+ compose_post_service_1_to_media_service->add_successor(media_service);
+ media_service->add_successor(media_service_to_compose_post_service_2);
+ media_service_to_compose_post_service_2->add_successor(compose_post_service_2);
+ compose_post_service_2->add_successor(user_service);
+ user_service->add_successor(compose_post_service_3);
+ compose_post_service_3->add_successor(text_service_0);
+ text_service_0->add_successor(text_service_1);
+ text_service_0->add_successor(text_service_2);
+ text_service_1->add_successor(user_mention_service);
+ text_service_2->add_successor(url_shorten_service);
+ user_mention_service->add_successor(compose_post_service_4);
+ compose_post_service_4->add_successor(home_timeline_service_0);
+ home_timeline_service_0->add_successor(social_graph_service);
+ social_graph_service->add_successor(home_timeline_service_1);
+ home_timeline_service_1->add_successor(compose_post_service_5);
+ compose_post_service_5->add_successor(user_timeline_service);
+ user_timeline_service->add_successor(compose_post_service_6);
+ compose_post_service_6->add_successor(compose_post_service_6_to_post_storage_service);
+ compose_post_service_6_to_post_storage_service->add_successor(post_storage_service);
+
+ // Dispatch Exec Tasks
+ std::vector<sg4::TaskPtr> exec_tasks = {nginx_web_server,
+ compose_post_service_0,
+ unique_id_service,
+ compose_post_service_1,
+ compose_post_service_1_to_media_service,
+ media_service,
+ media_service_to_compose_post_service_2,
+ compose_post_service_2,
+ user_service,
+ compose_post_service_3,
+ text_service_0,
+ text_service_1,
+ text_service_2,
+ user_mention_service,
+ url_shorten_service,
+ compose_post_service_4,
+ home_timeline_service_0,
+ social_graph_service,
+ home_timeline_service_1,
+ compose_post_service_5,
+ user_timeline_service,
+ compose_post_service_6,
+ compose_post_service_6_to_post_storage_service,
+ post_storage_service};
+ for (auto t : exec_tasks)
+ t->set_parallelism_degree(10);
+
+ // Create the actor that will inject requests during the simulation
+ sg4::Actor::create("request_sender", pm0, request_sender, nginx_web_server, 500);
+
+ // Add a function to be called when tasks end for log purpose
+ int requests_processed = 0;
+ sg4::Task::on_completion_cb([&e, &requests_processed](const sg4::Task* t) {
+ if (t->get_name() == "post_storage_service" and e.get_clock() < 7 and e.get_clock() > 2)
+ requests_processed++;
+ });
+
+ // Start the simulation
+ e.run();
+ XBT_INFO("Requests processed per second: %f", requests_processed / 5.0);
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-microservice ${platfdir}/three_multicore_hosts.xml
+> [7.008495] [task_microservice/INFO] Requests processed per second: 500.000000
\ No newline at end of file
--- /dev/null
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example tests increasing and decreasing parallelism degree of Tasks.
+ * First we increase and decrease parallelism degree while the Task is idle,
+ * then we increase and decrease parallelism degree while the Task has queued firings.
+ */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_parallelism, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+static void manager(sg4::ExecTaskPtr t)
+{
+ t->set_parallelism_degree(1);
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(300);
+
+ t->set_parallelism_degree(2);
+ t->enqueue_firings(4);
+ sg4::this_actor::sleep_for(300);
+
+ t->set_parallelism_degree(1);
+ t->enqueue_firings(2);
+ sg4::this_actor::sleep_for(300);
+
+ t->enqueue_firings(11);
+ t->set_parallelism_degree(2);
+ sg4::this_actor::sleep_for(150);
+ t->set_parallelism_degree(1);
+ sg4::this_actor::sleep_for(200);
+ t->set_parallelism_degree(3);
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+ auto pm0 = e.host_by_name("PM0");
+ auto t = sg4::ExecTask::init("exec_A", 100 * pm0->get_speed(), pm0);
+ sg4::Task::on_completion_cb(
+ [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_cname(), t->get_count()); });
+ sg4::Task::on_start_cb([](const sg4::Task* t) { XBT_INFO("Task %s start", t->get_cname()); });
+ sg4::Actor::create("sender", pm0, manager, t);
+
+ e.run();
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
+> [0.000000] [task_parallelism/INFO] Task exec_A start
+> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
+> [100.000000] [task_parallelism/INFO] Task exec_A start
+> [200.000000] [task_parallelism/INFO] Task exec_A finished (2)
+> [300.000000] [task_parallelism/INFO] Task exec_A start
+> [300.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
+> [400.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
+> [400.000000] [task_parallelism/INFO] Task exec_A start
+> [500.000000] [task_parallelism/INFO] Task exec_A finished (5)
+> [500.000000] [task_parallelism/INFO] Task exec_A finished (6)
+> [600.000000] [task_parallelism/INFO] Task exec_A start
+> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
+> [700.000000] [task_parallelism/INFO] Task exec_A start
+> [800.000000] [task_parallelism/INFO] Task exec_A finished (8)
+> [900.000000] [task_parallelism/INFO] Task exec_A start
+> [900.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
+> [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
+> [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1100.000000] [task_parallelism/INFO] Task exec_A finished (11)
+> [1100.000000] [task_parallelism/INFO] Task exec_A finished (12)
+> [1100.000000] [task_parallelism/INFO] Task exec_A start
+> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
+> [1200.000000] [task_parallelism/INFO] Task exec_A start
+> [1250.000000] [task_parallelism/INFO] Task exec_A start
+> [1250.000000] [task_parallelism/INFO] Task exec_A start
+> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
+> [1300.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
+> [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1400.000000] [task_parallelism/INFO] Task exec_A finished (17)
+> [1450.000000] [task_parallelism/INFO] Task exec_A finished (18)
+> [1450.000000] [task_parallelism/INFO] Task exec_A finished (19)
\ No newline at end of file
class Task {
std::string name_;
double amount_;
- int queued_firings_ = 0;
- int count_ = 0;
- bool working_ = false;
+ int queued_firings_ = 0;
+ int count_ = 0;
+ int running_instances_ = 0;
+ int parallelism_degree_ = 1;
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
std::shared_ptr<Token> token_ = nullptr;
std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
- ActivityPtr previous_activity_;
- ActivityPtr current_activity_;
+ std::deque<ActivityPtr> current_activities_;
inline static xbt::signal<void(Task*)> on_start;
xbt::signal<void(Task*)> on_this_start;
virtual void fire();
void complete();
- void set_current_activity(ActivityPtr a) { current_activity_ = a; }
+ void store_activity(ActivityPtr a) { current_activities_.push_back(a); }
public:
+ void set_name(std::string name);
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
void set_amount(double amount);
double get_amount() const { return amount_; }
int get_count() const { return count_; }
+ void set_parallelism_degree(int n);
+ int get_parallelism_degree() { return parallelism_degree_; }
void set_token(std::shared_ptr<Token> token);
std::shared_ptr<Token> get_next_token_from(TaskPtr t);
*/
bool Task::ready_to_run() const
{
- return not working_ && queued_firings_ > 0;
+ return running_instances_ < parallelism_degree_ && queued_firings_ > 0;
}
/**
void Task::complete()
{
xbt_assert(Actor::is_maestro());
- working_ = false;
+ running_instances_--;
count_++;
on_this_completion(this);
on_completion(this);
- if (current_activity_)
- previous_activity_ = std::move(current_activity_);
for (auto const& t : successors_)
t->receive(this);
if (ready_to_run())
fire();
}
+/** @param n The new parallelism degree of the Task.
+ * @brief Set the parallelism degree of the Task.
+ * @note When increasing the degree the function starts new instances.
+ * When decreasing the degree the function does NOT stop running instances.
+ */
+void Task::set_parallelism_degree(int n)
+{
+ xbt_assert(n > 0, "Parallelism degree of Tasks must be above 0.");
+ simgrid::kernel::actor::simcall_answered([this, n] {
+ parallelism_degree_ = n;
+ while (ready_to_run())
+ fire();
+ });
+}
+
/** @param n The number of firings to enqueue.
* @brief Enqueue firing.
* @note Immediatly fire an activity if possible.
{
simgrid::kernel::actor::simcall_answered([this, n] {
queued_firings_ += n;
- if (ready_to_run())
+ while (ready_to_run())
fire();
});
}
+/** @param name The new name to set.
+ * @brief Set the name of the Task.
+ */
+void Task::set_name(std::string name)
+{
+ name_ = name;
+}
+
/** @param amount The amount to set.
* @brief Set the amout of work to do.
* @note Amount in flop for ExecTask and in bytes for CommTask.
void Task::fire()
{
+ if ((int)current_activities_.size() > parallelism_degree_) {
+ current_activities_.pop_front();
+ }
on_this_start(this);
on_start(this);
- working_ = true;
+ running_instances_++;
queued_firings_ = std::max(queued_firings_ - 1, 0);
if (not tokens_received_.empty())
tokens_received_.pop_front();
/**
* @brief Do one execution of the Task.
- * @note Call the on_this_start() func. Set its working status as true.
+ * @note Call the on_this_start() func.
* Init and start the underlying Activity.
*/
void ExecTask::fire()
Task::fire();
auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
exec->start();
- exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
- set_current_activity(exec);
+ exec->on_this_completion_cb([this](Exec const&) { complete(); });
+ store_activity(exec);
}
/** @ingroup plugin_task
/**
* @brief Do one execution of the Task.
- * @note Call the on_this_start() func. Set its working status as true.
+ * @note Call the on_this_start() func.
* Init and start the underlying Activity.
*/
void CommTask::fire()
Task::fire();
auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
comm->start();
- comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
- set_current_activity(comm);
+ comm->on_this_completion_cb([this](Comm const&) { complete(); });
+ store_activity(comm);
}
/** @ingroup plugin_task
Task::fire();
auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
io->start();
- io->on_this_completion_cb([this](Io const&) { this->complete(); });
- set_current_activity(io);
+ io->on_this_completion_cb([this](Io const&) { complete(); });
+ store_activity(io);
}
} // namespace simgrid::s4u