+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
+ and use them to build a simulation of a stream processing application
+
+ Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+ Spout SB produces 1e6 bytes every 200ms.
+
+ Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
+ Bolt B3 processes data from Spout SB.
+ Bolt B4 processes data from Bolt B3.
+
+ Fafard
+ ┌────┐
+ ┌──►│ B1 │
+ Tremblay │ └────┘
+ ┌────┐ │
+ │ SA ├────┤ Ginette
+ └────┘ │ ┌────┐
+ └──►│ B2 │
+ └────┘
+
+
+ Bourassa
+ Jupiter ┌──────────┐
+ ┌────┐ │ │
+ │ SB ├─────┤ B3 ──► B4│
+ └────┘ │ │
+ └──────────┘
+ */
+
+#include "simgrid/plugins/task.hpp"
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+
+struct Token {
+ double data_ = 0;
+ Token(double data) : data_(data) {}
+};
+
+int main(int argc, char* argv[])
+{
+ simgrid::s4u::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+ simgrid::plugins::Task::init();
+
+ // Retrieve hosts
+ auto tremblay = e.host_by_name("Tremblay");
+ auto jupiter = e.host_by_name("Jupiter");
+ auto fafard = e.host_by_name("Fafard");
+ auto ginette = e.host_by_name("Ginette");
+ auto bourassa = e.host_by_name("Bourassa");
+
+ // Create execution tasks
+ auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+ auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+ auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard);
+ auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette);
+ auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa);
+ auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa);
+
+ // Create communication tasks
+ auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+ auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+ auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+
+ // Create the graph by defining dependencies between tasks
+ // Some dependencies are defined dynamically
+ SA_to_B1->add_successor(B1);
+ SA_to_B2->add_successor(B2);
+ SB->add_successor(SB_to_B3);
+ SB_to_B3->add_successor(B3);
+ B3->add_successor(B4);
+
+ /* Dynamic modification of the graph and bytes sent
+ Alternatively we: remove/add the link between SA and SA_to_B2
+ add/remove the link between SA and SA_to_B1
+ */
+ SA->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ int count = t->get_count();
+ simgrid::plugins::CommTaskPtr comm;
+ if (count % 2 == 0) {
+ t->remove_successor(SA_to_B2);
+ t->add_successor(SA_to_B1);
+ comm = SA_to_B1;
+ }
+ else {
+ t->remove_successor(SA_to_B1);
+ t->add_successor(SA_to_B2);
+ comm = SA_to_B2;
+ }
+ std::vector<double> amount = {1e3,1e6,1e9};
+ comm->set_amount(amount[count % 3]);
+ auto token = std::make_shared<Token>(amount[count % 3]);
+ t->set_token(token);
+ });
+
+ // The token sent by SA is forwarded by both communication tasks
+ SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ t->set_token(t->get_tokens()[SA]);
+ });
+ SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ t->set_token(t->get_tokens()[SA]);
+ });
+
+ /* B1 and B2 read the value of the token received by their predecessors
+ and use it to adapt their amount of work to do.
+ */
+ B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ auto tokens_map = t->get_tokens();
+ Token* tok = (Token*)(tokens_map[SA_to_B1].get());
+ t->set_amount(tok->data_ * 10);
+ });
+ B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
+ auto tokens_map = t->get_tokens();
+ Token* tok = (Token*)(tokens_map[SA_to_B2].get());
+ t->set_amount(tok->data_ * 10);
+ });
+
+ // Enqueue executions for tasks without predecessors
+ SA->enqueue_execs(5);
+ SB->enqueue_execs(5);
+
+ // Add a function to be called when tasks end for log purpose
+ simgrid::plugins::Task::on_end_cb([]
+ (const simgrid::plugins::Task* t) {
+ XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
+ });
+
+ // Start the simulation
+ e.run();
+ return 0;
+}