X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/efee1893d1b905f1181db807b66965f1828dcea2..4af8aca04c6ee72db752866796034d67ca2e900d:/examples/cpp/task-storm/s4u-task-storm.cpp diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp index 0319d17434..ca0f61ea5c 100644 --- a/examples/cpp/task-storm/s4u-task-storm.cpp +++ b/examples/cpp/task-storm/s4u-task-storm.cpp @@ -3,16 +3,16 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html - and use them to build a simulation of a stream processing application +/* This example takes the main concepts of Apache Storm presented here + https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing + application Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes. - Spout SB produces 1e6 bytes every 200ms. - - Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes - Bolt B3 processes data from Spout SB. - Bolt B4 processes data from Bolt B3. - + Spout SB produces 1e6 bytes every 200ms. + + Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per + bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3. + Fafard ┌────┐ ┌──►│ B1 │ @@ -29,44 +29,38 @@ ┌────┐ │ │ │ SB ├─────┤ B3 ──► B4│ └────┘ │ │ - └──────────┘ + └──────────┘ */ -#include "simgrid/plugins/task.hpp" #include "simgrid/s4u.hpp" XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example"); - -struct Token { - double data_ = 0; - Token(double data) : data_(data) {} -}; +namespace sg4 = simgrid::s4u; int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retrieve hosts auto tremblay = e.host_by_name("Tremblay"); auto jupiter = e.host_by_name("Jupiter"); - auto fafard = e.host_by_name("Fafard"); + auto fafard = e.host_by_name("Fafard"); auto ginette = e.host_by_name("Ginette"); auto bourassa = e.host_by_name("Bourassa"); // Create execution tasks - auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay); - auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter); - auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard); - auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette); - auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa); - auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa); + auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay); + auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter); + auto B1 = sg4::ExecTask::init("B1", 1e8, fafard); + auto B2 = sg4::ExecTask::init("B2", 1e8, ginette); + auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa); + auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa); // Create communication tasks - auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard); - auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette); - auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa); + auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard); + auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette); + auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa); // Create the graph by defining dependencies between tasks // Some dependencies are defined dynamically @@ -80,56 +74,59 @@ int main(int argc, char* argv[]) Alternatively we: remove/add the link between SA and SA_to_B2 add/remove the link between SA and SA_to_B1 */ - SA->on_this_start_cb([&](simgrid::plugins::Task* t) { + SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) { int count = t->get_count(); - simgrid::plugins::CommTaskPtr comm; - if (count % 2 == 0) { + sg4::CommTaskPtr comm; + if (count % 2 == 1) { t->remove_successor(SA_to_B2); t->add_successor(SA_to_B1); comm = SA_to_B1; - } - else { + } else { t->remove_successor(SA_to_B1); t->add_successor(SA_to_B2); comm = SA_to_B2; } - std::vector amount = {1e3,1e6,1e9}; + std::vector amount = {1e9, 1e3, 1e6}; + // XBT_INFO("Comm %f", amount[count % 3]); comm->set_amount(amount[count % 3]); - auto token = std::make_shared(amount[count % 3]); + auto token = std::make_shared(); + token->set_data(new double(amount[count % 3])); t->set_token(token); }); // The token sent by SA is forwarded by both communication tasks - SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) { - t->set_token(t->get_tokens()[SA]); + SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) { + t->set_token(t->get_token_from(SA)); + t->deque_token_from(SA); }); - SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) { - t->set_token(t->get_tokens()[SA]); + SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) { + t->set_token(t->get_token_from(SA)); + t->deque_token_from(SA); }); /* B1 and B2 read the value of the token received by their predecessors and use it to adapt their amount of work to do. - */ - B1->on_this_start_cb([&](simgrid::plugins::Task* t) { - auto tokens_map = t->get_tokens(); - Token* tok = (Token*)(tokens_map[SA_to_B1].get()); - t->set_amount(tok->data_ * 10); + */ + B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) { + auto data = t->get_token_from(SA_to_B1)->get_data(); + t->deque_token_from(SA_to_B1); + t->set_amount(*data * 10); + delete data; }); - B2->on_this_start_cb([&](simgrid::plugins::Task* t) { - auto tokens_map = t->get_tokens(); - Token* tok = (Token*)(tokens_map[SA_to_B2].get()); - t->set_amount(tok->data_ * 10); + B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) { + auto data = t->get_token_from(SA_to_B2)->get_data(); + t->deque_token_from(SA_to_B2); + t->set_amount(*data * 10); + delete data; }); - // Enqueue executions for tasks without predecessors - SA->enqueue_execs(5); - SB->enqueue_execs(5); + // Enqueue firings for tasks without predecessors + SA->enqueue_firings(5); + SB->enqueue_firings(5); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([] - (const simgrid::plugins::Task* t) { - XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); - }); + sg4::Task::on_completion_cb( + [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); // Start the simulation e.run();