include examples/cpp/task-io/s4u-task-io.tesh
include examples/cpp/task-simple/s4u-task-simple.cpp
include examples/cpp/task-simple/s4u-task-simple.tesh
+include examples/cpp/task-storm/s4u-task-storm.cpp
+include examples/cpp/task-storm/s4u-task-storm.thesh
include examples/cpp/task-switch-host/s4u-task-switch-host.cpp
include examples/cpp/task-switch-host/s4u-task-switch-host.tesh
include examples/cpp/task-variable-load/s4u-task-variable-load.cpp
mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
network-ns3 network-ns3-wifi network-wifi
io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
- task-io task-simple task-variable-load task-switch-host
+ task-io task-simple task-variable-load task-storm task-switch-host
photovoltaic-simple
platform-comm-serialize platform-failures platform-profile platform-properties
plugin-host-load plugin-link-load plugin-prodcons
--- /dev/null
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
+ and use them to build a simulation of a stream processing application
+
+ Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+ Spout SB produces 1e6 bytes every 200ms.
+
+ Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
+ Bolt B3 processes data from Spout SB.
+ Bolt B4 processes data from Bolt B3.
+
+ Fafard
+ ┌────┐
+ ┌──►│ B1 │
+ Tremblay │ └────┘
+ ┌────┐ │
+ │ SA ├────┤ Ginette
+ └────┘ │ ┌────┐
+ └──►│ B2 │
+ └────┘
+
+
+ Bourassa
+ Jupiter ┌──────────┐
+ ┌────┐ │ │
+ │ SB ├─────┤ B3 ──► B4│
+ └────┘ │ │
+ └──────────┘
+ */
+
+#include "simgrid/plugins/task.hpp"
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+
+struct Token {
+ double data_ = 0;
+ Token(double data) : data_(data) {}
+};
+
+int main(int argc, char* argv[])
+{
+ simgrid::s4u::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+ simgrid::plugins::Task::init();
+
+ // Retrieve hosts
+ auto tremblay = e.host_by_name("Tremblay");
+ auto jupiter = e.host_by_name("Jupiter");
+ auto fafard = e.host_by_name("Fafard");
+ auto ginette = e.host_by_name("Ginette");
+ auto bourassa = e.host_by_name("Bourassa");
+
+ // Create execution tasks
+ auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+ auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+ auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard);
+ auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette);
+ auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa);
+ auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa);
+
+ // Create communication tasks
+ auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+ auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+ auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+
+ // Create the graph by defining dependencies between tasks
+ // Some dependencies are defined dynamically
+ SA_to_B1->add_successor(B1);
+ SA_to_B2->add_successor(B2);
+ SB->add_successor(SB_to_B3);
+ SB_to_B3->add_successor(B3);
+ B3->add_successor(B4);
+
+ /* Dynamic modification of the graph and bytes sent
+ Alternatively we: remove/add the link between SA and SA_to_B2
+ add/remove the link between SA and SA_to_B1
+ */
+ SA->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ int count = t->get_count();
+ simgrid::plugins::CommTaskPtr comm;
+ if (count % 2 == 0) {
+ t->remove_successor(SA_to_B2);
+ t->add_successor(SA_to_B1);
+ comm = SA_to_B1;
+ }
+ else {
+ t->remove_successor(SA_to_B1);
+ t->add_successor(SA_to_B2);
+ comm = SA_to_B2;
+ }
+ std::vector<double> amount = {1e3,1e6,1e9};
+ comm->set_amount(amount[count % 3]);
+ auto token = std::make_shared<Token>(amount[count % 3]);
+ t->set_token(token);
+ });
+
+ // The token sent by SA is forwarded by both communication tasks
+ SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ t->set_token(t->get_tokens()[SA]);
+ });
+ SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ t->set_token(t->get_tokens()[SA]);
+ });
+
+ /* B1 and B2 read the value of the token received by their predecessors
+ and use it to adapt their amount of work to do.
+ */
+ B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ auto tokens_map = t->get_tokens();
+ Token* tok = (Token*)(tokens_map[SA_to_B1].get());
+ t->set_amount(tok->data_ * 10);
+ });
+ B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ auto tokens_map = t->get_tokens();
+ Token* tok = (Token*)(tokens_map[SA_to_B2].get());
+ t->set_amount(tok->data_ * 10);
+ });
+
+ // Enqueue executions for tasks without predecessors
+ SA->enqueue_execs(5);
+ SB->enqueue_execs(5);
+
+ // Add a function to be called when tasks end for log purpose
+ simgrid::plugins::Task::on_end_cb([]
+ (const simgrid::plugins::Task* t) {
+ XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
+ });
+
+ // Start the simulation
+ e.run();
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml
+> [0.100000] [task_storm/INFO] Task SA finished (1)
+> [0.125841] [task_storm/INFO] Task SA_to_B1 finished (1)
+> [0.125972] [task_storm/INFO] Task B1 finished (1)
+> [0.200000] [task_storm/INFO] Task SB finished (1)
+> [0.200000] [task_storm/INFO] Task SA finished (2)
+> [0.300000] [task_storm/INFO] Task SA finished (3)
+> [0.364429] [task_storm/INFO] Task SA_to_B2 finished (1)
+> [0.400000] [task_storm/INFO] Task SB finished (2)
+> [0.400000] [task_storm/INFO] Task SA finished (4)
+> [0.416759] [task_storm/INFO] Task SA_to_B2 finished (2)
+> [0.500000] [task_storm/INFO] Task SA finished (5)
+> [0.557036] [task_storm/INFO] Task SB_to_B3 finished (1)
+> [0.570648] [task_storm/INFO] Task B2 finished (1)
+> [0.570855] [task_storm/INFO] Task B2 finished (2)
+> [0.600000] [task_storm/INFO] Task SB finished (3)
+> [0.800000] [task_storm/INFO] Task SB finished (4)
+> [0.867388] [task_storm/INFO] Task SB_to_B3 finished (2)
+> [1.000000] [task_storm/INFO] Task SB finished (5)
+> [1.177739] [task_storm/INFO] Task SB_to_B3 finished (3)
+> [1.488090] [task_storm/INFO] Task SB_to_B3 finished (4)
+> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
+> [2.619232] [task_storm/INFO] Task B3 finished (1)
+> [6.743624] [task_storm/INFO] Task B3 finished (2)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
+> [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [14.992407] [task_storm/INFO] Task B3 finished (4)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
+> [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [23.241190] [task_storm/INFO] Task B4 finished (3)
+> [27.365582] [task_storm/INFO] Task B4 finished (4)
+> [31.489974] [task_storm/INFO] Task B4 finished (5)
+> [133.367321] [task_storm/INFO] Task SA_to_B1 finished (2)
+> [133.525717] [task_storm/INFO] Task SA_to_B1 finished (3)
+> [264.435791] [task_storm/INFO] Task B1 finished (2)
+> [264.566859] [task_storm/INFO] Task B1 finished (3)
#include <xbt/Extendable.hpp>
#include <atomic>
+#include <deque>
#include <map>
#include <memory>
#include <set>
protected:
std::string name_;
double amount_;
+ std::shared_ptr<void> token_ = NULL;
+ std::deque<std::map<TaskPtr, std::shared_ptr<void>>> tokens_received_;
int queued_execs_ = 0;
int count_ = 0;
bool working_ = false;
void enqueue_execs(int n);
void set_amount(double amount);
double get_amount() const { return amount_; }
+ void set_token(std::shared_ptr<void> token);
+ std::map<TaskPtr, std::shared_ptr<void>> get_tokens() const;
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
void remove_all_successors();
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- predecessors_[source]++;
+ auto source_count = predecessors_[source]++;
+ if (tokens_received_.size() <= queued_execs_ + source_count)
+ tokens_received_.push_back({});
+ tokens_received_[queued_execs_ + source_count][source] = source->token_;
bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
}
+/** @ingroup plugin_task
+ * @param token The token to set.
+ * @brief Set the token to send to successors.
+ * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<void> token)
+{
+ simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @ingroup plugin_task
+ * @return Map of tokens received for the next execution.
+ * @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::map<TaskPtr, std::shared_ptr<void>> Task::get_tokens() const
+{
+ return tokens_received_.front();
+}
+
/** @ingroup plugin_task
* @param successor The Task to add.
* @brief Add a successor to this Task.
Task::on_start(this);
working_ = true;
queued_execs_ = std::max(queued_execs_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
s4u::ExecPtr exec = s4u::Exec::init();
exec->set_name(name_);
exec->set_flops_amount(amount_);
Task::on_start(this);
working_ = true;
queued_execs_ = std::max(queued_execs_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
comm->set_name(name_);
comm->set_payload_size(amount_);
Task::on_start(this);
working_ = true;
queued_execs_ = std::max(queued_execs_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
s4u::IoPtr io = s4u::Io::init();
io->set_name(name_);
io->set_size(amount_);