From: Adrien Gougeon Date: Mon, 12 Jun 2023 15:42:42 +0000 (+0200) Subject: task can now pass tokens (void*) to other tasks. add example using tokens and apache... X-Git-Tag: v3.34~14^2~7^2~15 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/efee1893d1b905f1181db807b66965f1828dcea2 task can now pass tokens (void*) to other tasks. add example using tokens and apache storm terminology --- diff --git a/MANIFEST.in b/MANIFEST.in index 296c687d9b..13eeb1e84b 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -398,6 +398,8 @@ include examples/cpp/task-io/s4u-task-io.cpp include examples/cpp/task-io/s4u-task-io.tesh include examples/cpp/task-simple/s4u-task-simple.cpp include examples/cpp/task-simple/s4u-task-simple.tesh +include examples/cpp/task-storm/s4u-task-storm.cpp +include examples/cpp/task-storm/s4u-task-storm.thesh include examples/cpp/task-switch-host/s4u-task-switch-host.cpp include examples/cpp/task-switch-host/s4u-task-switch-host.tesh include examples/cpp/task-variable-load/s4u-task-variable-load.cpp diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index dc66fc99c2..66187a8b84 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -171,7 +171,7 @@ foreach (example activity-testany activity-waitany mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert network-ns3 network-ns3-wifi network-wifi io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent - task-io task-simple task-variable-load task-switch-host + task-io task-simple task-variable-load task-storm task-switch-host photovoltaic-simple platform-comm-serialize platform-failures platform-profile platform-properties plugin-host-load plugin-link-load plugin-prodcons diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp new file mode 100644 index 0000000000..0319d17434 --- /dev/null +++ b/examples/cpp/task-storm/s4u-task-storm.cpp @@ -0,0 +1,137 @@ +/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html + and use them to build a simulation of a stream processing application + + Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes. + Spout SB produces 1e6 bytes every 200ms. + + Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes + Bolt B3 processes data from Spout SB. + Bolt B4 processes data from Bolt B3. + + Fafard + ┌────┐ + ┌──►│ B1 │ + Tremblay │ └────┘ + ┌────┐ │ + │ SA ├────┤ Ginette + └────┘ │ ┌────┐ + └──►│ B2 │ + └────┘ + + + Bourassa + Jupiter ┌──────────┐ + ┌────┐ │ │ + │ SB ├─────┤ B3 ──► B4│ + └────┘ │ │ + └──────────┘ + */ + +#include "simgrid/plugins/task.hpp" +#include "simgrid/s4u.hpp" + +XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example"); + +struct Token { + double data_ = 0; + Token(double data) : data_(data) {} +}; + +int main(int argc, char* argv[]) +{ + simgrid::s4u::Engine e(&argc, argv); + e.load_platform(argv[1]); + simgrid::plugins::Task::init(); + + // Retrieve hosts + auto tremblay = e.host_by_name("Tremblay"); + auto jupiter = e.host_by_name("Jupiter"); + auto fafard = e.host_by_name("Fafard"); + auto ginette = e.host_by_name("Ginette"); + auto bourassa = e.host_by_name("Bourassa"); + + // Create execution tasks + auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay); + auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter); + auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard); + auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette); + auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa); + auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa); + + // Create communication tasks + auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard); + auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette); + auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa); + + // Create the graph by defining dependencies between tasks + // Some dependencies are defined dynamically + SA_to_B1->add_successor(B1); + SA_to_B2->add_successor(B2); + SB->add_successor(SB_to_B3); + SB_to_B3->add_successor(B3); + B3->add_successor(B4); + + /* Dynamic modification of the graph and bytes sent + Alternatively we: remove/add the link between SA and SA_to_B2 + add/remove the link between SA and SA_to_B1 + */ + SA->on_this_start_cb([&](simgrid::plugins::Task* t) { + int count = t->get_count(); + simgrid::plugins::CommTaskPtr comm; + if (count % 2 == 0) { + t->remove_successor(SA_to_B2); + t->add_successor(SA_to_B1); + comm = SA_to_B1; + } + else { + t->remove_successor(SA_to_B1); + t->add_successor(SA_to_B2); + comm = SA_to_B2; + } + std::vector amount = {1e3,1e6,1e9}; + comm->set_amount(amount[count % 3]); + auto token = std::make_shared(amount[count % 3]); + t->set_token(token); + }); + + // The token sent by SA is forwarded by both communication tasks + SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) { + t->set_token(t->get_tokens()[SA]); + }); + SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) { + t->set_token(t->get_tokens()[SA]); + }); + + /* B1 and B2 read the value of the token received by their predecessors + and use it to adapt their amount of work to do. + */ + B1->on_this_start_cb([&](simgrid::plugins::Task* t) { + auto tokens_map = t->get_tokens(); + Token* tok = (Token*)(tokens_map[SA_to_B1].get()); + t->set_amount(tok->data_ * 10); + }); + B2->on_this_start_cb([&](simgrid::plugins::Task* t) { + auto tokens_map = t->get_tokens(); + Token* tok = (Token*)(tokens_map[SA_to_B2].get()); + t->set_amount(tok->data_ * 10); + }); + + // Enqueue executions for tasks without predecessors + SA->enqueue_execs(5); + SB->enqueue_execs(5); + + // Add a function to be called when tasks end for log purpose + simgrid::plugins::Task::on_end_cb([] + (const simgrid::plugins::Task* t) { + XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); + }); + + // Start the simulation + e.run(); + return 0; +} diff --git a/examples/cpp/task-storm/s4u-task-storm.tesh b/examples/cpp/task-storm/s4u-task-storm.tesh new file mode 100644 index 0000000000..d7c364a837 --- /dev/null +++ b/examples/cpp/task-storm/s4u-task-storm.tesh @@ -0,0 +1,38 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml +> [0.100000] [task_storm/INFO] Task SA finished (1) +> [0.125841] [task_storm/INFO] Task SA_to_B1 finished (1) +> [0.125972] [task_storm/INFO] Task B1 finished (1) +> [0.200000] [task_storm/INFO] Task SB finished (1) +> [0.200000] [task_storm/INFO] Task SA finished (2) +> [0.300000] [task_storm/INFO] Task SA finished (3) +> [0.364429] [task_storm/INFO] Task SA_to_B2 finished (1) +> [0.400000] [task_storm/INFO] Task SB finished (2) +> [0.400000] [task_storm/INFO] Task SA finished (4) +> [0.416759] [task_storm/INFO] Task SA_to_B2 finished (2) +> [0.500000] [task_storm/INFO] Task SA finished (5) +> [0.557036] [task_storm/INFO] Task SB_to_B3 finished (1) +> [0.570648] [task_storm/INFO] Task B2 finished (1) +> [0.570855] [task_storm/INFO] Task B2 finished (2) +> [0.600000] [task_storm/INFO] Task SB finished (3) +> [0.800000] [task_storm/INFO] Task SB finished (4) +> [0.867388] [task_storm/INFO] Task SB_to_B3 finished (2) +> [1.000000] [task_storm/INFO] Task SB finished (5) +> [1.177739] [task_storm/INFO] Task SB_to_B3 finished (3) +> [1.488090] [task_storm/INFO] Task SB_to_B3 finished (4) +> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5) +> [2.619232] [task_storm/INFO] Task B3 finished (1) +> [6.743624] [task_storm/INFO] Task B3 finished (2) +> [10.868015] [task_storm/INFO] Task B3 finished (3) +> [10.868015] [task_storm/INFO] Task B4 finished (1) +> [14.992407] [task_storm/INFO] Task B3 finished (4) +> [19.116799] [task_storm/INFO] Task B3 finished (5) +> [19.116799] [task_storm/INFO] Task B4 finished (2) +> [23.241190] [task_storm/INFO] Task B4 finished (3) +> [27.365582] [task_storm/INFO] Task B4 finished (4) +> [31.489974] [task_storm/INFO] Task B4 finished (5) +> [133.367321] [task_storm/INFO] Task SA_to_B1 finished (2) +> [133.525717] [task_storm/INFO] Task SA_to_B1 finished (3) +> [264.435791] [task_storm/INFO] Task B1 finished (2) +> [264.566859] [task_storm/INFO] Task B1 finished (3) diff --git a/include/simgrid/plugins/task.hpp b/include/simgrid/plugins/task.hpp index 8143333d86..a652cb65d7 100644 --- a/include/simgrid/plugins/task.hpp +++ b/include/simgrid/plugins/task.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -39,6 +40,8 @@ class Task { protected: std::string name_; double amount_; + std::shared_ptr token_ = NULL; + std::deque>> tokens_received_; int queued_execs_ = 0; int count_ = 0; bool working_ = false; @@ -61,6 +64,8 @@ public: void enqueue_execs(int n); void set_amount(double amount); double get_amount() const { return amount_; } + void set_token(std::shared_ptr token); + std::map> get_tokens() const; void add_successor(TaskPtr t); void remove_successor(TaskPtr t); void remove_all_successors(); diff --git a/src/plugins/task.cpp b/src/plugins/task.cpp index dca0c9013d..4edfc3fb1a 100644 --- a/src/plugins/task.cpp +++ b/src/plugins/task.cpp @@ -52,7 +52,10 @@ bool Task::ready_to_run() const void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); - predecessors_[source]++; + auto source_count = predecessors_[source]++; + if (tokens_received_.size() <= queued_execs_ + source_count) + tokens_received_.push_back({}); + tokens_received_[queued_execs_ + source_count][source] = source->token_; bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { @@ -134,6 +137,25 @@ void Task::set_amount(double amount) simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; }); } +/** @ingroup plugin_task + * @param token The token to set. + * @brief Set the token to send to successors. + * @note The token is passed to each successor after the task end, i.e., after the on_end callback. + */ +void Task::set_token(std::shared_ptr token) +{ + simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); +} + +/** @ingroup plugin_task + * @return Map of tokens received for the next execution. + * @note If there is no queued execution for this task the map might not exist or be partially empty. + */ +std::map> Task::get_tokens() const +{ + return tokens_received_.front(); +} + /** @ingroup plugin_task * @param successor The Task to add. * @brief Add a successor to this Task. @@ -231,6 +253,8 @@ void ExecTask::fire() Task::on_start(this); working_ = true; queued_execs_ = std::max(queued_execs_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); s4u::ExecPtr exec = s4u::Exec::init(); exec->set_name(name_); exec->set_flops_amount(amount_); @@ -292,6 +316,8 @@ void CommTask::fire() Task::on_start(this); working_ = true; queued_execs_ = std::max(queued_execs_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); comm->set_name(name_); comm->set_payload_size(amount_); @@ -383,6 +409,8 @@ void IoTask::fire() Task::on_start(this); working_ = true; queued_execs_ = std::max(queued_execs_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); s4u::IoPtr io = s4u::Io::init(); io->set_name(name_); io->set_size(amount_);