From: Martin Quinson Date: Thu, 22 Jun 2023 00:25:49 +0000 (+0200) Subject: Merge branch 'master' of framagit.org:simgrid/simgrid X-Git-Tag: v3.34~14 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/aaefeb18b3307df62ee762f3de31decf7ae3d442?hp=48b94f2d67a3249b5c1f4cfbe2b7a3c5bd8b7e71 Merge branch 'master' of framagit.org:simgrid/simgrid --- diff --git a/ChangeLog b/ChangeLog index 78392f19c1..9ae720e54c 100644 --- a/ChangeLog +++ b/ChangeLog @@ -17,13 +17,15 @@ S4U: Comm::set_payload_size() to change the size of the simulated data. - New function: Engine::flatify_platform(), to get a fully detailed vision of the configured platform. + - New Task abstraction: They are designed to represent dataflows, i.e, graphs of repeatable Activities. + See the examples under examples/cpp/task-* and the associated documentation. - Full simDAG integration: Activity::start() actually starts only when all dependencies are fullfiled. If it cannot be started right away, it will start as soon as it becomes possible. - Allow to set a concurrency limit on disks and hosts, as it was already the case for links. - Rename Link::get_usage() to Link::get_load() for consistency with Host:: - Every signal now come with a static version that is invoked for every object of that class, - and an instance version that is invoked for this specific object only. For example, + and an instance version that is invoked for this specific object only. For example, s4u::Actor::on_suspend_cb() adds a callback that is invoked for the suspend of any actor while s4u::Actor::on_this_suspend_cb() adds a callback for this specific actor only. - Activity::on_suspended_cb() is renamed to Activity::on_suspend_cb(), and fired right before the suspend. @@ -34,8 +36,6 @@ S4U: That is, callbacks registered in Exec::on_suspend_cb will not be fired for Comms nor Ios. New S4U plugins: - - Task: They are designed to represent dataflows, i.e, graphs of repeatable Activities. - See the examples under examples/cpp/task-* and the documentation in the Plugins page. - Battery: Enable the management of batteries on hosts. See the examples under examples/cpp/battery-* and the documentation in the Plugins page. - Photovoltaic: Enable the management of photovoltaic panels on hosts. diff --git a/MANIFEST.in b/MANIFEST.in index 296c687d9b..76abec2d71 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -398,6 +398,8 @@ include examples/cpp/task-io/s4u-task-io.cpp include examples/cpp/task-io/s4u-task-io.tesh include examples/cpp/task-simple/s4u-task-simple.cpp include examples/cpp/task-simple/s4u-task-simple.tesh +include examples/cpp/task-storm/s4u-task-storm.cpp +include examples/cpp/task-storm/s4u-task-storm.tesh include examples/cpp/task-switch-host/s4u-task-switch-host.cpp include examples/cpp/task-switch-host/s4u-task-switch-host.tesh include examples/cpp/task-variable-load/s4u-task-variable-load.cpp @@ -1945,7 +1947,6 @@ include include/simgrid/plugins/live_migration.h include include/simgrid/plugins/load.h include include/simgrid/plugins/ns3.hpp include include/simgrid/plugins/photovoltaic.hpp -include include/simgrid/plugins/task.hpp include include/simgrid/s4u.hpp include include/simgrid/s4u/Activity.hpp include include/simgrid/s4u/Actor.hpp @@ -1962,6 +1963,7 @@ include include/simgrid/s4u/Mailbox.hpp include include/simgrid/s4u/Mutex.hpp include include/simgrid/s4u/NetZone.hpp include include/simgrid/s4u/Semaphore.hpp +include include/simgrid/s4u/Task.hpp include include/simgrid/s4u/VirtualMachine.hpp include include/simgrid/semaphore.h include include/simgrid/simix.h @@ -2228,9 +2230,9 @@ include src/mc/explo/udpor/Configuration_test.cpp include src/mc/explo/udpor/EventSet.cpp include src/mc/explo/udpor/EventSet.hpp include src/mc/explo/udpor/EventSet_test.cpp -include src/mc/explo/udpor/ExtensionSet_test.cpp include src/mc/explo/udpor/ExtensionSetCalculator.cpp include src/mc/explo/udpor/ExtensionSetCalculator.hpp +include src/mc/explo/udpor/ExtensionSet_test.cpp include src/mc/explo/udpor/History.cpp include src/mc/explo/udpor/History.hpp include src/mc/explo/udpor/History_test.cpp @@ -2322,7 +2324,6 @@ include src/plugins/link_energy.cpp include src/plugins/link_energy_wifi.cpp include src/plugins/link_load.cpp include src/plugins/photovoltaic.cpp -include src/plugins/task.cpp include src/plugins/vm/VmLiveMigration.cpp include src/plugins/vm/VmLiveMigration.hpp include src/plugins/vm/dirty_page_tracking.cpp @@ -2341,6 +2342,7 @@ include src/s4u/s4u_Mailbox.cpp include src/s4u/s4u_Mutex.cpp include src/s4u/s4u_Netzone.cpp include src/s4u/s4u_Semaphore.cpp +include src/s4u/s4u_Task.cpp include src/s4u/s4u_VirtualMachine.cpp include src/simgrid/Exception.cpp include src/simgrid/math_utils.h diff --git a/docs/source/app_s4u.rst b/docs/source/app_s4u.rst index e5148affab..4a0f3fe0a5 100644 --- a/docs/source/app_s4u.rst +++ b/docs/source/app_s4u.rst @@ -96,6 +96,9 @@ provides many helper functions to simplify the code of actors. .. |API_s4u_Activities| replace:: **Activities** .. _API_s4u_Activities: #api-s4u-activity +.. |API_s4u_Tasks| replace:: **Tasks** +.. _API_s4u_Tasks: #api-s4u-task + .. |API_s4u_Hosts| replace:: **Hosts** .. _API_s4u_Hosts: #api-s4u-host @@ -187,13 +190,39 @@ with :cpp:func:`s4u::Comm::wait_all() `. :dedent: 2 ===================== -Activities Life cycle +Activities Life Cycle ===================== Sometimes, you want to change the setting of an activity before it even starts. .. todo:: write this section +===================== +Repeatable Activities +===================== + +In order to simulate the execution of Dataflow applications, we introduced the +concept of |API_s4u_Tasks|, that can be seen as repeatable activities. A Dataflow +is defined as a graph of |API_s4u_Tasks| through which circulate Tokens. Tokens +can carry any user-defined data, using the same internal mechanisms as for the +other simulated objects. Each Task has to receive a token from each of its +predecessor to fire a new instance of a |API_s4u_Comm|, |API_s4u_Exec|, or +|API_s4u_Io| activity. On completion of this activity, the Task propagates tokens +to its successors, and waits for the next set of tokens to arrive. + +To initiate the execution of a Dataflow, it is possible to some make +|API_s4u_Tasks| fire one or more activities without waiting for any token with the +:cpp:func:`s4u::Task::enqueue_firings() ` +function. + +The parameters and successors of a Task can be redefined at runtime by attaching +callbacks to the +:cpp:func:`s4u::Task::on_this_start ` +and +:cpp:func:`s4u::Task::on_this_completion ` +signals. + + .. _s4u_mailbox: Mailboxes @@ -2116,8 +2145,8 @@ Querying info .. group-tab:: C++ - .. doxygenfunction:: simgrid::s4u::Activity::get_cname - .. doxygenfunction:: simgrid::s4u::Activity::get_name + .. doxygenfunction:: simgrid::s4u::Activity::get_cname() const + .. doxygenfunction:: simgrid::s4u::Activity::get_name() const .. doxygenfunction:: simgrid::s4u::Activity::get_remaining() const .. doxygenfunction:: simgrid::s4u::Activity::get_state() const .. doxygenfunction:: simgrid::s4u::Activity::set_remaining(double remains) @@ -2310,11 +2339,11 @@ Signals .. group-tab:: C++ .. doxygenfunction:: simgrid::s4u::Comm::on_start_cb + .. doxygenfunction:: simgrid::s4u::Comm::on_this_start_cb .. doxygenfunction:: simgrid::s4u::Comm::on_completion_cb + .. doxygenfunction:: simgrid::s4u::Comm::on_this_completion_cb .. doxygenfunction:: simgrid::s4u::Comm::on_recv_cb .. doxygenfunction:: simgrid::s4u::Comm::on_send_cb - - .. doxygenfunction:: simgrid::s4u::Comm::on_completion_cb .. doxygenfunction:: simgrid::s4u::Comm::on_suspended_cb .. doxygenfunction:: simgrid::s4u::Comm::on_resumed_cb .. doxygenfunction:: simgrid::s4u::Comm::on_veto_cb @@ -2453,9 +2482,10 @@ Signals .. group-tab:: C++ .. doxygenfunction:: simgrid::s4u::Exec::on_start_cb + .. doxygenfunction:: simgrid::s4u::Exec::on_this_start_cb .. doxygenfunction:: simgrid::s4u::Exec::on_completion_cb + .. doxygenfunction:: simgrid::s4u::Exec::on_this_completion_cb - .. doxygenfunction:: simgrid::s4u::Exec::on_completion_cb .. doxygenfunction:: simgrid::s4u::Exec::on_suspended_cb .. doxygenfunction:: simgrid::s4u::Exec::on_resumed_cb .. doxygenfunction:: simgrid::s4u::Exec::on_veto_cb @@ -2529,13 +2559,211 @@ Signals .. group-tab:: C++ .. doxygenfunction:: simgrid::s4u::Io::on_start_cb + .. doxygenfunction:: simgrid::s4u::Io::on_this_start_cb .. doxygenfunction:: simgrid::s4u::Io::on_completion_cb + .. doxygenfunction:: simgrid::s4u::Io::on_this_completion_cb - .. doxygenfunction:: simgrid::s4u::Io::on_completion_cb .. doxygenfunction:: simgrid::s4u::Io::on_suspended_cb .. doxygenfunction:: simgrid::s4u::Io::on_resumed_cb .. doxygenfunction:: simgrid::s4u::Io::on_veto_cb + +.. _API_s4u_Tasks: + +========== +Tasks +========== + +============== +class Task +============== + +.. doxygenclass:: simgrid::s4u::Task + +**Known subclasses:** +:ref:`Communication Tasks `, +:ref:`Executions Tasks `, +:ref:`I/O Tasks `. +See also the :ref:`section on activities ` above. + +Basic management +---------------- + +.. tabs:: + + .. group-tab:: C++ + + .. code-block:: C++ + + #include + + .. doxygentypedef:: TaskPtr + +Querying info +------------- + +.. tabs:: + + .. group-tab:: C++ + + .. doxygenfunction:: simgrid::s4u::Task::get_cname() const + .. doxygenfunction:: simgrid::s4u::Task::get_name() const + .. doxygenfunction:: simgrid::s4u::Task::get_count() const + .. doxygenfunction:: simgrid::s4u::Task::get_amount() const + .. doxygenfunction:: simgrid::s4u::Task::set_amount(double amount) + +Life cycle +---------- + +.. tabs:: + + .. group-tab:: C++ + .. doxygenfunction:: simgrid::s4u::Task::enqueue_firings(int n) + +Managing Dependencies +--------------------- +.. tabs:: + + .. group-tab:: C++ + .. doxygenfunction:: simgrid::s4u::Task::add_successor(TaskPtr t) + .. doxygenfunction:: simgrid::s4u::Task::remove_successor(TaskPtr t) + .. doxygenfunction:: simgrid::s4u::Task::remove_all_successors() + .. doxygenfunction:: simgrid::s4u::Task::get_successors() const + +Managing Tokens +--------------- +.. doxygenclass:: simgrid::s4u::Token + +.. tabs:: + + .. group-tab:: C++ + .. doxygenfunction:: simgrid::s4u::Task::set_token(std::shared_ptr token) + .. doxygenfunction:: simgrid::s4u::Task::get_next_token_from(TaskPtr t) + +Signals +------- + +.. tabs:: + + .. group-tab:: C++ + .. doxygenfunction:: simgrid::s4u::Task::on_start_cb + .. doxygenfunction:: simgrid::s4u::Task::on_this_start_cb + .. doxygenfunction:: simgrid::s4u::Task::on_completion_cb + .. doxygenfunction:: simgrid::s4u::Task::on_this_completion_cb + +.. _API_s4u_CommTask: + +================ +⁣  class CommTask +================ +.. tabs:: + + .. group-tab:: C++ + + .. doxygenclass:: simgrid::s4u::CommTask + +Basic management +---------------- + +.. tabs:: + + .. group-tab:: C++ + + .. code-block:: C++ + + #include + + .. doxygentypedef:: CommTaskPtr + +Querying info +------------- + +.. tabs:: + + .. group-tab:: C++ + + .. doxygenfunction:: simgrid::s4u::Task::get_source() const + .. doxygenfunction:: simgrid::s4u::Task::get_destination() const + .. doxygenfunction:: simgrid::s4u::Task::get_bytes() const + .. doxygenfunction:: simgrid::s4u::Task::set_source(simgrid::s4u::Host* source); + .. doxygenfunction:: simgrid::s4u::Task::set_destination(simgrid::s4u::Host* destination); + .. doxygenfunction:: simgrid::s4u::Task::set_bytes(double bytes) + + +.. _API_s4u_ExecTask: + +================ +⁣  class ExecTask +================ +.. tabs:: + + .. group-tab:: C++ + + .. doxygenclass:: simgrid::s4u::ExecTask + +Basic management +---------------- + +.. tabs:: + + .. group-tab:: C++ + + .. code-block:: C++ + + #include + + .. doxygentypedef:: ExecTaskPtr + +Querying info +------------- + +.. tabs:: + + .. group-tab:: C++ + + .. doxygenfunction:: simgrid::s4u::Task::get_host() const + .. doxygenfunction:: simgrid::s4u::Task::get_flops() const + .. doxygenfunction:: simgrid::s4u::Task::set_host(simgrid::s4u::Host* host); + .. doxygenfunction:: simgrid::s4u::Task::set_flops(double flops); + +.. _API_s4u_IoTask: + +================ +⁣  class IoTask +================ +.. tabs:: + + .. group-tab:: C++ + + .. doxygenclass:: simgrid::s4u::IoTask + +Basic management +---------------- + +.. tabs:: + + .. group-tab:: C++ + + .. code-block:: C++ + + #include + + .. doxygentypedef:: IoTaskPtr + +Querying info +------------- + +.. tabs:: + + .. group-tab:: C++ + + .. doxygenfunction:: simgrid::s4u::Task::get_disk() const + .. doxygenfunction:: simgrid::s4u::Task::get_bytes() const + .. doxygenfunction:: simgrid::s4u::Task::get_op_type() const + .. doxygenfunction:: simgrid::s4u::Task::set_disk(simgrid::s4u::Disk* disk); + .. doxygenfunction:: simgrid::s4u::Task::set_bytes(simgrid::double bytes); + .. doxygenfunction:: simgrid::s4u::Task::set_op_type(simgrid::s4u::Io::OpType type); + .. _API_s4u_Synchronizations: ======================= diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index dc66fc99c2..66187a8b84 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -171,7 +171,7 @@ foreach (example activity-testany activity-waitany mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert network-ns3 network-ns3-wifi network-wifi io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent - task-io task-simple task-variable-load task-switch-host + task-io task-simple task-variable-load task-storm task-switch-host photovoltaic-simple platform-comm-serialize platform-failures platform-profile platform-properties plugin-host-load plugin-link-load plugin-prodcons diff --git a/examples/cpp/io-file-system/s4u-io-file-system.cpp b/examples/cpp/io-file-system/s4u-io-file-system.cpp index ef57282804..40e0048e09 100644 --- a/examples/cpp/io-file-system/s4u-io-file-system.cpp +++ b/examples/cpp/io-file-system/s4u-io-file-system.cpp @@ -58,9 +58,8 @@ public: // Test attaching some user data to the file file->set_data(new std::string("777")); - const auto* file_data = file->get_data(); + auto file_data = file->get_unique_data(); XBT_INFO("User data attached to the file: %s", file_data->c_str()); - delete file_data; // Close the file file->close(); diff --git a/examples/cpp/task-io/s4u-task-io.cpp b/examples/cpp/task-io/s4u-task-io.cpp index 6301657dad..f5d3c1560c 100644 --- a/examples/cpp/task-io/s4u-task-io.cpp +++ b/examples/cpp/task-io/s4u-task-io.cpp @@ -3,7 +3,7 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -/* This example demonstrate basic use of the task plugin. +/* This example demonstrate basic use of tasks. * * We model the following graph: * @@ -13,27 +13,27 @@ * comm is a communication task. */ -#include "simgrid/plugins/task.hpp" +#include "simgrid/s4u/Task.hpp" #include "simgrid/s4u.hpp" #include XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example"); +namespace sg4 = simgrid::s4u; int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retrieve hosts auto* bob = e.host_by_name("bob"); auto* carl = e.host_by_name("carl"); // Create tasks - auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, bob); - auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, carl); - auto write = simgrid::plugins::IoTask::init("write", 1e7, bob->get_disks().front(), simgrid::s4u::Io::OpType::WRITE); - auto read = simgrid::plugins::IoTask::init("read", 1e7, carl->get_disks().front(), simgrid::s4u::Io::OpType::READ); + auto exec1 = sg4::ExecTask::init("exec1", 1e9, bob); + auto exec2 = sg4::ExecTask::init("exec2", 1e9, carl); + auto write = sg4::IoTask::init("write", 1e7, bob->get_disks().front(), sg4::Io::OpType::WRITE); + auto read = sg4::IoTask::init("read", 1e7, carl->get_disks().front(), sg4::Io::OpType::READ); // Create the graph by defining dependencies between tasks exec1->add_successor(write); @@ -41,12 +41,12 @@ int main(int argc, char* argv[]) read->add_successor(exec2); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); - // Enqueue two executions for task exec1 - exec1->enqueue_execs(2); + // Enqueue two firings for task exec1 + exec1->enqueue_firings(2); // Start the simulation e.run(); diff --git a/examples/cpp/task-simple/s4u-task-simple.cpp b/examples/cpp/task-simple/s4u-task-simple.cpp index 3e9d14fa13..72ddf4c0d9 100644 --- a/examples/cpp/task-simple/s4u-task-simple.cpp +++ b/examples/cpp/task-simple/s4u-task-simple.cpp @@ -3,7 +3,7 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -/* This example demonstrate basic use of the task plugin. +/* This example demonstrate basic use of tasks. * * We model the following graph: * @@ -13,37 +13,37 @@ * comm is a communication task. */ -#include "simgrid/plugins/task.hpp" #include "simgrid/s4u.hpp" XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example"); +namespace sg4 = simgrid::s4u; + int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retrieve hosts auto* tremblay = e.host_by_name("Tremblay"); auto* jupiter = e.host_by_name("Jupiter"); // Create tasks - auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, tremblay); - auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, jupiter); - auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter); + auto exec1 = sg4::ExecTask::init("exec1", 1e9, tremblay); + auto exec2 = sg4::ExecTask::init("exec2", 1e9, jupiter); + auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter); // Create the graph by defining dependencies between tasks exec1->add_successor(comm); comm->add_successor(exec2); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); - // Enqueue two executions for task exec1 - exec1->enqueue_execs(2); + // Enqueue two firings for task exec1 + exec1->enqueue_firings(2); // Start the simulation e.run(); diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp new file mode 100644 index 0000000000..2c4edb1cb4 --- /dev/null +++ b/examples/cpp/task-storm/s4u-task-storm.cpp @@ -0,0 +1,130 @@ +/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html + and use them to build a simulation of a stream processing application + + Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes. + Spout SB produces 1e6 bytes every 200ms. + + Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes + Bolt B3 processes data from Spout SB. + Bolt B4 processes data from Bolt B3. + + Fafard + ┌────┐ + ┌──►│ B1 │ + Tremblay │ └────┘ + ┌────┐ │ + │ SA ├────┤ Ginette + └────┘ │ ┌────┐ + └──►│ B2 │ + └────┘ + + + Bourassa + Jupiter ┌──────────┐ + ┌────┐ │ │ + │ SB ├─────┤ B3 ──► B4│ + └────┘ │ │ + └──────────┘ + */ + +#include "simgrid/s4u.hpp" + +XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example"); +namespace sg4 = simgrid::s4u; + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + e.load_platform(argv[1]); + + // Retrieve hosts + auto tremblay = e.host_by_name("Tremblay"); + auto jupiter = e.host_by_name("Jupiter"); + auto fafard = e.host_by_name("Fafard"); + auto ginette = e.host_by_name("Ginette"); + auto bourassa = e.host_by_name("Bourassa"); + + // Create execution tasks + auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay); + auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter); + auto B1 = sg4::ExecTask::init("B1", 1e8, fafard); + auto B2 = sg4::ExecTask::init("B2", 1e8, ginette); + auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa); + auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa); + + // Create communication tasks + auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard); + auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette); + auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa); + + // Create the graph by defining dependencies between tasks + // Some dependencies are defined dynamically + SA_to_B1->add_successor(B1); + SA_to_B2->add_successor(B2); + SB->add_successor(SB_to_B3); + SB_to_B3->add_successor(B3); + B3->add_successor(B4); + + /* Dynamic modification of the graph and bytes sent + Alternatively we: remove/add the link between SA and SA_to_B2 + add/remove the link between SA and SA_to_B1 + */ + SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) { + int count = t->get_count(); + sg4::CommTaskPtr comm; + if (count % 2 == 0) { + t->remove_successor(SA_to_B2); + t->add_successor(SA_to_B1); + comm = SA_to_B1; + } + else { + t->remove_successor(SA_to_B1); + t->add_successor(SA_to_B2); + comm = SA_to_B2; + } + std::vector amount = {1e3,1e6,1e9}; + comm->set_amount(amount[count % 3]); + auto token = std::make_shared(); + token->set_data(new double(amount[count % 3])); + t->set_token(token); + }); + + // The token sent by SA is forwarded by both communication tasks + SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { + t->set_token(t->get_next_token_from(SA)); + }); + SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { + t->set_token(t->get_next_token_from(SA)); + }); + + /* B1 and B2 read the value of the token received by their predecessors + and use it to adapt their amount of work to do. + */ + B1->on_this_start_cb([SA_to_B1](sg4::Task* t) { + auto data = t->get_next_token_from(SA_to_B1)->get_unique_data(); + t->set_amount(*data * 10); + }); + B2->on_this_start_cb([SA_to_B2](sg4::Task* t) { + auto data = t->get_next_token_from(SA_to_B2)->get_unique_data(); + t->set_amount(*data * 10); + }); + + // Enqueue firings for tasks without predecessors + SA->enqueue_firings(5); + SB->enqueue_firings(5); + + // Add a function to be called when tasks end for log purpose + sg4::Task::on_completion_cb([] + (const sg4::Task* t) { + XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); + }); + + // Start the simulation + e.run(); + return 0; +} diff --git a/examples/cpp/task-storm/s4u-task-storm.tesh b/examples/cpp/task-storm/s4u-task-storm.tesh new file mode 100644 index 0000000000..d7c364a837 --- /dev/null +++ b/examples/cpp/task-storm/s4u-task-storm.tesh @@ -0,0 +1,38 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml +> [0.100000] [task_storm/INFO] Task SA finished (1) +> [0.125841] [task_storm/INFO] Task SA_to_B1 finished (1) +> [0.125972] [task_storm/INFO] Task B1 finished (1) +> [0.200000] [task_storm/INFO] Task SB finished (1) +> [0.200000] [task_storm/INFO] Task SA finished (2) +> [0.300000] [task_storm/INFO] Task SA finished (3) +> [0.364429] [task_storm/INFO] Task SA_to_B2 finished (1) +> [0.400000] [task_storm/INFO] Task SB finished (2) +> [0.400000] [task_storm/INFO] Task SA finished (4) +> [0.416759] [task_storm/INFO] Task SA_to_B2 finished (2) +> [0.500000] [task_storm/INFO] Task SA finished (5) +> [0.557036] [task_storm/INFO] Task SB_to_B3 finished (1) +> [0.570648] [task_storm/INFO] Task B2 finished (1) +> [0.570855] [task_storm/INFO] Task B2 finished (2) +> [0.600000] [task_storm/INFO] Task SB finished (3) +> [0.800000] [task_storm/INFO] Task SB finished (4) +> [0.867388] [task_storm/INFO] Task SB_to_B3 finished (2) +> [1.000000] [task_storm/INFO] Task SB finished (5) +> [1.177739] [task_storm/INFO] Task SB_to_B3 finished (3) +> [1.488090] [task_storm/INFO] Task SB_to_B3 finished (4) +> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5) +> [2.619232] [task_storm/INFO] Task B3 finished (1) +> [6.743624] [task_storm/INFO] Task B3 finished (2) +> [10.868015] [task_storm/INFO] Task B3 finished (3) +> [10.868015] [task_storm/INFO] Task B4 finished (1) +> [14.992407] [task_storm/INFO] Task B3 finished (4) +> [19.116799] [task_storm/INFO] Task B3 finished (5) +> [19.116799] [task_storm/INFO] Task B4 finished (2) +> [23.241190] [task_storm/INFO] Task B4 finished (3) +> [27.365582] [task_storm/INFO] Task B4 finished (4) +> [31.489974] [task_storm/INFO] Task B4 finished (5) +> [133.367321] [task_storm/INFO] Task SA_to_B1 finished (2) +> [133.525717] [task_storm/INFO] Task SA_to_B1 finished (3) +> [264.435791] [task_storm/INFO] Task B1 finished (2) +> [264.566859] [task_storm/INFO] Task B1 finished (3) diff --git a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp index 6ebca129da..7023f68b48 100644 --- a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp +++ b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp @@ -16,16 +16,15 @@ * With exec1 and exec2 on different hosts. */ -#include "simgrid/plugins/task.hpp" #include "simgrid/s4u.hpp" XBT_LOG_NEW_DEFAULT_CATEGORY(task_switch_host, "Messages specific for this task example"); +namespace sg4 = simgrid::s4u; int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retrieve hosts auto* tremblay = e.host_by_name("Tremblay"); @@ -33,13 +32,13 @@ int main(int argc, char* argv[]) auto* fafard = e.host_by_name("Fafard"); // Create tasks - auto comm0 = simgrid::plugins::CommTask::init("comm0"); + auto comm0 = sg4::CommTask::init("comm0"); comm0->set_bytes(1e7); comm0->set_source(tremblay); - auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, jupiter); - auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, fafard); - auto comm1 = simgrid::plugins::CommTask::init("comm1", 1e7, jupiter, tremblay); - auto comm2 = simgrid::plugins::CommTask::init("comm2", 1e7, fafard, tremblay); + auto exec1 = sg4::ExecTask::init("exec1", 1e9, jupiter); + auto exec2 = sg4::ExecTask::init("exec2", 1e9, fafard); + auto comm1 = sg4::CommTask::init("comm1", 1e7, jupiter, tremblay); + auto comm2 = sg4::CommTask::init("comm2", 1e7, fafard, tremblay); // Create the initial graph by defining dependencies between tasks comm0->add_successor(exec2); @@ -47,15 +46,14 @@ int main(int argc, char* argv[]) exec2->add_successor(comm2); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); - // Add a function to be called before each executions of comm0 + // Add a function to be called before each firing of comm0 // This function modifies the graph of tasks by adding or removing // successors to comm0 - comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) { - auto* comm0 = dynamic_cast(t); + comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) { static int count = 0; if (count % 2 == 0) { comm0->set_destination(jupiter); @@ -69,8 +67,8 @@ int main(int argc, char* argv[]) count++; }); - // Enqueue four executions for task comm0 - comm0->enqueue_execs(4); + // Enqueue four firings for task comm0 + comm0->enqueue_firings(4); // Start the simulation e.run(); diff --git a/examples/cpp/task-variable-load/s4u-task-variable-load.cpp b/examples/cpp/task-variable-load/s4u-task-variable-load.cpp index 960fb660a6..0955c2e2a4 100644 --- a/examples/cpp/task-variable-load/s4u-task-variable-load.cpp +++ b/examples/cpp/task-variable-load/s4u-task-variable-load.cpp @@ -13,50 +13,49 @@ * With a heavy load there is a burst of comm before the exec task can even finish once. */ -#include "simgrid/plugins/task.hpp" #include "simgrid/s4u.hpp" XBT_LOG_NEW_DEFAULT_CATEGORY(task_variable_load, "Messages specific for this s4u example"); +namespace sg4 = simgrid::s4u; -static void variable_load(simgrid::plugins::TaskPtr t) +static void variable_load(sg4::TaskPtr t) { XBT_INFO("--- Small load ---"); for (int i = 0; i < 3; i++) { - t->enqueue_execs(1); - simgrid::s4u::this_actor::sleep_for(100); + t->enqueue_firings(1); + sg4::this_actor::sleep_for(100); } - simgrid::s4u::this_actor::sleep_until(1000); + sg4::this_actor::sleep_until(1000); XBT_INFO("--- Heavy load ---"); for (int i = 0; i < 3; i++) { - t->enqueue_execs(1); - simgrid::s4u::this_actor::sleep_for(1); + t->enqueue_firings(1); + sg4::this_actor::sleep_for(1); } } int main(int argc, char* argv[]) { - simgrid::s4u::Engine e(&argc, argv); + sg4::Engine e(&argc, argv); e.load_platform(argv[1]); - simgrid::plugins::Task::init(); // Retreive hosts auto* tremblay = e.host_by_name("Tremblay"); auto* jupiter = e.host_by_name("Jupiter"); // Create tasks - auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter); - auto exec = simgrid::plugins::ExecTask::init("exec", 1e9, jupiter); + auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter); + auto exec = sg4::ExecTask::init("exec", 1e9, jupiter); // Create the graph by defining dependencies between tasks comm->add_successor(exec); // Add a function to be called when tasks end for log purpose - simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) { + sg4::Task::on_completion_cb([](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); }); // Create the actor that will inject load during the simulation - simgrid::s4u::Actor::create("input", tremblay, variable_load, comm); + sg4::Actor::create("input", tremblay, variable_load, comm); // Start the simulation e.run(); diff --git a/examples/python/task-io/task-io.py b/examples/python/task-io/task-io.py index b74c41169c..e75215a0f7 100644 --- a/examples/python/task-io/task-io.py +++ b/examples/python/task-io/task-io.py @@ -24,7 +24,6 @@ if __name__ == '__main__': args = parse() e = Engine(sys.argv) e.load_platform(args.platform) - Task.init() # Retrieve hosts bob = e.host_by_name('bob') @@ -42,10 +41,10 @@ if __name__ == '__main__': read.add_successor(exec2) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) - # Enqueue two executions for task exec1 - exec1.enqueue_execs(2) + # Enqueue two firings for task exec1 + exec1.enqueue_firings(2) # runs the simulation e.run() diff --git a/examples/python/task-simple/task-simple.py b/examples/python/task-simple/task-simple.py index 1219906ce9..23e9fc0c8a 100644 --- a/examples/python/task-simple/task-simple.py +++ b/examples/python/task-simple/task-simple.py @@ -34,7 +34,6 @@ if __name__ == '__main__': args = parse() e = Engine(sys.argv) e.load_platform(args.platform) - Task.init() # Retrieve hosts tremblay = e.host_by_name('Tremblay') @@ -50,11 +49,10 @@ if __name__ == '__main__': comm.add_successor(exec2) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) - # Enqueue two executions for task exec1 - exec1.enqueue_execs(2) + # Enqueue two firings for task exec1 + exec1.enqueue_firings(2) # runs the simulation e.run() - diff --git a/examples/python/task-switch-host/task-switch-host.py b/examples/python/task-switch-host/task-switch-host.py index a95db1645f..b2904c3f56 100644 --- a/examples/python/task-switch-host/task-switch-host.py +++ b/examples/python/task-switch-host/task-switch-host.py @@ -55,7 +55,6 @@ if __name__ == '__main__': args = parse() e = Engine(sys.argv) e.load_platform(args.platform) - Task.init() # Retrieve hosts tremblay = e.host_by_name('Tremblay') @@ -76,15 +75,15 @@ if __name__ == '__main__': exec2.add_successor(comm2) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) - # Add a function to be called before each executions of comm0 + # Add a function to be called before each firing of comm0 # This function modifies the graph of tasks by adding or removing # successors to comm0 comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2])) - # Enqueue two executions for task exec1 - comm0.enqueue_execs(4) + # Enqueue two firings for task exec1 + comm0.enqueue_firings(4) # runs the simulation e.run() diff --git a/examples/python/task-variable-load/task-variable-load.py b/examples/python/task-variable-load/task-variable-load.py index 73105847ce..51dbc1a6c6 100644 --- a/examples/python/task-variable-load/task-variable-load.py +++ b/examples/python/task-variable-load/task-variable-load.py @@ -33,19 +33,18 @@ def callback(t): def variable_load(t): print('--- Small load ---') for _ in range(3): - t.enqueue_execs(1) + t.enqueue_firings(1) this_actor.sleep_for(100) this_actor.sleep_for(1000) print('--- Heavy load ---') for _ in range(3): - t.enqueue_execs(1) + t.enqueue_firings(1) this_actor.sleep_for(1) if __name__ == '__main__': args = parse() e = Engine(sys.argv) e.load_platform(args.platform) - Task.init() # Retrieve hosts tremblay = e.host_by_name('Tremblay') @@ -59,7 +58,7 @@ if __name__ == '__main__': comm.add_successor(exec) # Add a function to be called when tasks end for log purpose - Task.on_end_cb(callback) + Task.on_completion_cb(callback) # Create the actor that will inject load during the simulation Actor.create("input", tremblay, variable_load, comm) diff --git a/include/simgrid/s4u.hpp b/include/simgrid/s4u.hpp index f694696d23..77a71df2c4 100644 --- a/include/simgrid/s4u.hpp +++ b/include/simgrid/s4u.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include diff --git a/include/simgrid/s4u/Comm.hpp b/include/simgrid/s4u/Comm.hpp index a92f87f542..a6a7f083d0 100644 --- a/include/simgrid/s4u/Comm.hpp +++ b/include/simgrid/s4u/Comm.hpp @@ -39,7 +39,6 @@ class XBT_PUBLIC Comm : public Activity_T { Comm() = default; Comm* do_start() override; -protected: static xbt::signal on_send; xbt::signal on_this_send; static xbt::signal on_recv; @@ -47,6 +46,7 @@ protected: inline static xbt::signal on_start; xbt::signal on_this_start; +protected: void fire_on_completion() const override { /* The completion signal of a Comm has to be thrown only once and not by the sender AND the receiver. then Comm::on_completion is thrown in the kernel in CommImpl::finish. diff --git a/include/simgrid/s4u/Exec.hpp b/include/simgrid/s4u/Exec.hpp index 5cd7857552..7de8e8e6a8 100644 --- a/include/simgrid/s4u/Exec.hpp +++ b/include/simgrid/s4u/Exec.hpp @@ -36,14 +36,15 @@ class XBT_PUBLIC Exec : public Activity_T { bool parallel_ = false; + inline static xbt::signal on_start; + xbt::signal on_this_start; + protected: explicit Exec(kernel::activity::ExecImplPtr pimpl); Exec* do_start() override; void reset() const; - inline static xbt::signal on_start; - xbt::signal on_this_start; void fire_on_completion() const override { on_completion(*this); } void fire_on_this_completion() const override { on_this_completion(*this); } void fire_on_suspend() const override { on_suspend(*this); } diff --git a/include/simgrid/plugins/task.hpp b/include/simgrid/s4u/Task.hpp similarity index 55% rename from include/simgrid/plugins/task.hpp rename to include/simgrid/s4u/Task.hpp index 8143333d86..c0677da108 100644 --- a/include/simgrid/plugins/task.hpp +++ b/include/simgrid/s4u/Task.hpp @@ -1,16 +1,17 @@ -#ifndef SIMGRID_PLUGINS_TASK_H_ -#define SIMGRID_PLUGINS_TASK_H_ +#ifndef SIMGRID_S4U_TASK_H_ +#define SIMGRID_S4U_TASK_H_ #include #include #include #include +#include #include #include #include -namespace simgrid::plugins { +namespace simgrid::s4u { class Task; using TaskPtr = boost::intrusive_ptr; @@ -23,59 +24,68 @@ using CommTaskPtr = boost::intrusive_ptr; class IoTask; using IoTaskPtr = boost::intrusive_ptr; -struct ExtendedAttributeActivity { - static simgrid::xbt::Extension EXTENSION_ID; - Task* task_; -}; +class XBT_PUBLIC Token : public xbt::Extendable {}; class Task { + std::string name_; + double amount_; + int queued_firings_ = 0; + int count_ = 0; + bool working_ = false; + std::set successors_ = {}; std::map predecessors_ = {}; + std::atomic_int_fast32_t refcount_{0}; bool ready_to_run() const; void receive(Task* source); - void complete(); + + std::shared_ptr token_ = nullptr; + std::deque>> tokens_received_; + ActivityPtr previous_activity_; + ActivityPtr current_activity_; protected: - std::string name_; - double amount_; - int queued_execs_ = 0; - int count_ = 0; - bool working_ = false; - s4u::ActivityPtr previous_activity_; - s4u::ActivityPtr current_activity_; - xbt::signal on_this_start_; - xbt::signal on_this_end_; explicit Task(const std::string& name); virtual ~Task() = default; - virtual void fire() = 0; - static xbt::signal on_start; - static xbt::signal on_end; - std::atomic_int_fast32_t refcount_{0}; + virtual void fire(); + void complete(); + + void set_current_activity (ActivityPtr a) { current_activity_ = a; } + + inline static xbt::signal on_start; + xbt::signal on_this_start; + inline static xbt::signal on_completion; + xbt::signal on_this_completion; public: - static void init(); const std::string& get_name() const { return name_; } const char* get_cname() const { return name_.c_str(); } - void enqueue_execs(int n); void set_amount(double amount); double get_amount() const { return amount_; } + int get_count() const { return count_; } + + void set_token(std::shared_ptr token); + std::shared_ptr get_next_token_from(TaskPtr t); + void add_successor(TaskPtr t); void remove_successor(TaskPtr t); void remove_all_successors(); const std::set& get_successors() const { return successors_; } - void on_this_start_cb(const std::function& func); - void on_this_end_cb(const std::function& func); - int get_count() const; - /** Add a callback fired before a task activity start. + void enqueue_firings(int n); + + /** Add a callback fired before this task activity starts */ + void on_this_start_cb(const std::function& func){ on_this_start.connect(func); } + /** Add a callback fired before a task activity starts. * Triggered after the on_this_start function**/ static void on_start_cb(const std::function& cb) { on_start.connect(cb); } - /** Add a callback fired after a task activity end. - * Triggered after the on_this_end function, but before - * sending tokens to successors.**/ - static void on_end_cb(const std::function& cb) { on_end.connect(cb); } + /** Add a callback fired before this task activity ends */ + void on_this_completion_cb(const std::function& func) { on_this_completion.connect(func); }; + /** Add a callback fired after a task activity ends. + * Triggered after the on_this_end function, but before sending tokens to successors.**/ + static void on_completion_cb(const std::function& cb) { on_completion.connect(cb); } #ifndef DOXYGEN friend void intrusive_ptr_release(Task* o) @@ -89,54 +99,57 @@ public: #endif }; -class ExecTask : public Task { - s4u::Host* host_; +class CommTask : public Task { + Host* source_; + Host* destination_; - explicit ExecTask(const std::string& name); + explicit CommTask(const std::string& name); void fire() override; public: - static ExecTaskPtr init(const std::string& name); - static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host); - ExecTaskPtr set_host(s4u::Host* host); - s4u::Host* get_host() const { return host_; } - ExecTaskPtr set_flops(double flops); - double get_flops() const { return get_amount(); } + static CommTaskPtr init(const std::string& name); + static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination); + + CommTaskPtr set_source(Host* source); + Host* get_source() const { return source_; } + CommTaskPtr set_destination(Host* destination); + Host* get_destination() const { return destination_; } + CommTaskPtr set_bytes(double bytes); + double get_bytes() const { return get_amount(); } }; -class CommTask : public Task { - s4u::Host* source_; - s4u::Host* destination_; +class ExecTask : public Task { + Host* host_; - explicit CommTask(const std::string& name); + explicit ExecTask(const std::string& name); void fire() override; public: - static CommTaskPtr init(const std::string& name); - static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination); - CommTaskPtr set_source(s4u::Host* source); - s4u::Host* get_source() const { return source_; } - CommTaskPtr set_destination(s4u::Host* destination); - s4u::Host* get_destination() const { return destination_; } - CommTaskPtr set_bytes(double bytes); - double get_bytes() const { return get_amount(); } + static ExecTaskPtr init(const std::string& name); + static ExecTaskPtr init(const std::string& name, double flops, Host* host); + + ExecTaskPtr set_host(Host* host); + Host* get_host() const { return host_; } + ExecTaskPtr set_flops(double flops); + double get_flops() const { return get_amount(); } }; class IoTask : public Task { - s4u::Disk* disk_; - s4u::Io::OpType type_; + Disk* disk_; + Io::OpType type_; explicit IoTask(const std::string& name); void fire() override; public: static IoTaskPtr init(const std::string& name); - static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type); - IoTaskPtr set_disk(s4u::Disk* disk); - s4u::Disk* get_disk() const { return disk_; } + static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type); + + IoTaskPtr set_disk(Disk* disk); + Disk* get_disk() const { return disk_; } IoTaskPtr set_bytes(double bytes); double get_bytes() const { return get_amount(); } - IoTaskPtr set_op_type(s4u::Io::OpType type); - s4u::Io::OpType get_op_type() const { return type_; } + IoTaskPtr set_op_type(Io::OpType type); + Io::OpType get_op_type() const { return type_; } }; -} // namespace simgrid::plugins +} // namespace simgrid::s4u #endif diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index 5f0f33599c..f71d652224 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -11,7 +11,6 @@ #include "simgrid/kernel/ProfileBuilder.hpp" #include "simgrid/kernel/routing/NetPoint.hpp" #include -#include #include #include #include @@ -25,6 +24,7 @@ #include #include #include +#include #include #include @@ -33,14 +33,14 @@ #include namespace py = pybind11; -using simgrid::plugins::CommTask; -using simgrid::plugins::CommTaskPtr; -using simgrid::plugins::ExecTask; -using simgrid::plugins::ExecTaskPtr; -using simgrid::plugins::IoTask; -using simgrid::plugins::IoTaskPtr; -using simgrid::plugins::Task; -using simgrid::plugins::TaskPtr; +using simgrid::s4u::CommTask; +using simgrid::s4u::CommTaskPtr; +using simgrid::s4u::ExecTask; +using simgrid::s4u::ExecTaskPtr; +using simgrid::s4u::IoTask; +using simgrid::s4u::IoTaskPtr; +using simgrid::s4u::Task; +using simgrid::s4u::TaskPtr; using simgrid::s4u::Actor; using simgrid::s4u::ActorPtr; using simgrid::s4u::Barrier; @@ -921,7 +921,6 @@ PYBIND11_MODULE(simgrid, m) /* Class Task */ py::class_(m, "Task", "Task. See the C++ documentation for details.") - .def_static("init", &Task::init) .def_static( "on_start_cb", [](py::object cb) { @@ -934,11 +933,11 @@ PYBIND11_MODULE(simgrid, m) }, "Add a callback called when each task starts.") .def_static( - "on_end_cb", + "on_completion_cb", [](py::object cb) { cb.inc_ref(); // keep alive after return const py::gil_scoped_release gil_release; - Task::on_end_cb([cb_p = cb.ptr()](Task* op) { + Task::on_completion_cb([cb_p = cb.ptr()](Task* op) { const py::gil_scoped_acquire py_context; // need a new context for callback py::reinterpret_borrow(cb_p)(op); }); @@ -948,8 +947,8 @@ PYBIND11_MODULE(simgrid, m) .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).") .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).") .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.") - .def("enqueue_execs", py::overload_cast(&Task::enqueue_execs), py::call_guard(), - py::arg("n"), "Enqueue executions for this task.") + .def("enqueue_firings", py::overload_cast(&Task::enqueue_firings), py::call_guard(), + py::arg("n"), "Enqueue firings for this task.") .def("add_successor", py::overload_cast(&Task::add_successor), py::call_guard(), py::arg("op"), "Add a successor to this task.") .def("remove_successor", py::overload_cast(&Task::remove_successor), @@ -958,7 +957,7 @@ PYBIND11_MODULE(simgrid, m) "Remove all successors of this task.") .def("on_this_start_cb", py::overload_cast&>(&Task::on_this_start_cb), py::arg("func"), "Add a callback called when this task starts.") - .def("on_this_end_cb", py::overload_cast&>(&Task::on_this_end_cb), + .def("on_this_completion_cb", py::overload_cast&>(&Task::on_this_completion_cb), py::arg("func"), "Add a callback called when this task ends.") .def( "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; }, diff --git a/src/mc/explo/odpor/WakeupTreeIterator.cpp b/src/mc/explo/odpor/WakeupTreeIterator.cpp index 6c81203193..9017fd8223 100644 --- a/src/mc/explo/odpor/WakeupTreeIterator.cpp +++ b/src/mc/explo/odpor/WakeupTreeIterator.cpp @@ -5,6 +5,7 @@ #include "src/mc/explo/odpor/WakeupTreeIterator.hpp" #include "src/mc/explo/odpor/WakeupTree.hpp" +#include "xbt/asserts.h" namespace simgrid::mc::odpor { @@ -20,19 +21,19 @@ void WakeupTreeIterator::push_until_left_most_found() // there are no cycles. This means that at least // one node in the tree won't have any children, // so the loop will eventually terminate - auto* cur_top_node = *post_order_iteration.top(); + WakeupTreeNode* cur_top_node = *post_order_iteration.top(); while (not cur_top_node->is_leaf()) { // INVARIANT: Since we push children in // reverse order (right-most to left-most), // we ensure that we'll always process left-most // children first auto& children = cur_top_node->children_; - for (auto iter = children.rbegin(); iter != children.rend(); ++iter) { - // iter.base() points one element past where we seek; hence, - // we move it over one position + // iter.base() points one element past where we seek; that is, + // we want the value one position forward post_order_iteration.push(std::prev(iter.base())); } + has_added_children.push(cur_top_node); cur_top_node = *post_order_iteration.top(); } } @@ -46,7 +47,6 @@ void WakeupTreeIterator::increment() return; } - auto prev_top_handle = post_order_iteration.top(); post_order_iteration.pop(); // If there are now no longer any nodes left, @@ -57,15 +57,28 @@ void WakeupTreeIterator::increment() return; } - // Otherwise, look at the next top node. If - // `prev_top` is that node's right-most child, - // then we don't attempt to re-add `next_top`'s - // children again for we would have already seen them. - // To actually determine "right-most", we check if - // moving over to the right one spot brings us to the - // end of the candidate parent's list - const auto* next_top_node = *post_order_iteration.top(); - if ((++prev_top_handle) != next_top_node->get_ordered_children().end()) { + xbt_assert(not has_added_children.empty(), "Invariant violated: There are more " + "nodes in the iteration that we must search " + "yet nobody has claimed to have added these nodes. " + "This implies that the algorithm is not iterating over " + "the wakeup tree is not following the post-fix order " + "correctly"); + + // Otherwise, look at what is the new, current top node. + // We're traversing the tree in + // + // If we've already added our children, we want + // to be sure not to add them again; but we ALSO + // want to be sure that we now start checking against + // the the node that's next in line as "finished" + // + // INVARIANT: Since we're searching in post-fix order, + // it always suffices to compare the current node + // with the top of the stack of nodes which have added their + // children + if (*post_order_iteration.top() == has_added_children.top()) { + has_added_children.pop(); + } else { push_until_left_most_found(); } } diff --git a/src/mc/explo/odpor/WakeupTreeIterator.hpp b/src/mc/explo/odpor/WakeupTreeIterator.hpp index e42184cbf4..c33d483dfa 100644 --- a/src/mc/explo/odpor/WakeupTreeIterator.hpp +++ b/src/mc/explo/odpor/WakeupTreeIterator.hpp @@ -56,6 +56,18 @@ private: */ std::stack post_order_iteration; + /** + * @brief The nodes in the current ordering that have already + * added their own children + * + * We need to be able to determine whether to add the children + * of a given node. Eventually, we want to search that node itself, + * but we have to first search its children. Later, when we + * reach each node in this stack again, we'll remember not to add + * its children and will search the node in the stack instead. + */ + std::stack has_added_children; + /** * @brief Search the wakeup tree until a leaf node appears at the front * of the iteration, pushing all children towards the top of the stack diff --git a/src/plugins/task.cpp b/src/s4u/s4u_Task.cpp similarity index 57% rename from src/plugins/task.cpp rename to src/s4u/s4u_Task.cpp index dca0c9013d..bda7ec170e 100644 --- a/src/plugins/task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -1,5 +1,6 @@ +#include #include -#include +#include #include #include #include @@ -8,16 +9,13 @@ #include "src/simgrid/module.hpp" SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr) -/** @defgroup plugin_task plugin_task Plugin Task - +/** @beginrst -This is the task plugin, enabling management of Tasks. -To activate this plugin, first call :cpp:func:`Task::init`. Tasks are designed to represent dataflows, i.e, graphs of Tasks. Tasks can only be instancied using either -:cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init` +:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init` An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec `. A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm `. @@ -25,12 +23,7 @@ A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm ExtendedAttributeActivity::EXTENSION_ID; - -xbt::signal Task::on_start; -xbt::signal Task::on_end; +namespace simgrid::s4u { Task::Task(const std::string& name) : name_(name) {} @@ -40,7 +33,7 @@ Task::Task(const std::string& name) : name_(name) {} */ bool Task::ready_to_run() const { - return not working_ && queued_execs_ > 0; + return not working_ && queued_firings_ > 0; } /** @@ -52,7 +45,10 @@ bool Task::ready_to_run() const void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); - predecessors_[source]++; + auto source_count = predecessors_[source]++; + if (tokens_received_.size() <= queued_firings_ + source_count) + tokens_received_.push_back({}); + tokens_received_[queued_firings_ + source_count][source] = source->token_; bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { @@ -62,7 +58,7 @@ void Task::receive(Task* source) if (enough_tokens) { for (auto& [key, val] : predecessors_) val--; - enqueue_execs(1); + enqueue_firings(1); } } @@ -77,11 +73,11 @@ void Task::receive(Task* source) */ void Task::complete() { - xbt_assert(s4u::Actor::is_maestro()); + xbt_assert(Actor::is_maestro()); working_ = false; count_++; - on_this_end_(this); - Task::on_end(this); + on_this_completion(this); + on_completion(this); if (current_activity_) previous_activity_ = std::move(current_activity_); for (auto const& t : successors_) @@ -90,42 +86,20 @@ void Task::complete() fire(); } -/** @ingroup plugin_task - * @brief Init the Task plugin. - * @note Add a completion callback to all Activities to call Task::complete(). - */ -void Task::init() -{ - static bool inited = false; - if (inited) - return; - - inited = true; - ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create(); - simgrid::s4u::Exec::on_completion_cb( - [](simgrid::s4u::Exec const& exec) { exec.extension()->task_->complete(); }); - simgrid::s4u::Comm::on_completion_cb( - [](simgrid::s4u::Comm const& comm) { comm.extension()->task_->complete(); }); - simgrid::s4u::Io::on_completion_cb( - [](simgrid::s4u::Io const& io) { io.extension()->task_->complete(); }); -} - -/** @ingroup plugin_task - * @param n The number of executions to enqueue. - * @brief Enqueue executions. - * @note Immediatly starts an execution if possible. +/** @param n The number of firings to enqueue. + * @brief Enqueue firing. + * @note Immediatly fire an activity if possible. */ -void Task::enqueue_execs(int n) +void Task::enqueue_firings(int n) { simgrid::kernel::actor::simcall_answered([this, n] { - queued_execs_ += n; + queued_firings_ += n; if (ready_to_run()) fire(); }); } -/** @ingroup plugin_task - * @param amount The amount to set. +/** @param amount The amount to set. * @brief Set the amout of work to do. * @note Amount in flop for ExecTask and in bytes for CommTask. */ @@ -134,8 +108,33 @@ void Task::set_amount(double amount) simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; }); } -/** @ingroup plugin_task - * @param successor The Task to add. +/** @param token The token to set. + * @brief Set the token to send to successors. + * @note The token is passed to each successor after the task end, i.e., after the on_end callback. + */ +void Task::set_token(std::shared_ptr token) +{ + simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); +} + +/** @return Map of tokens received for the next execution. + * @note If there is no queued execution for this task the map might not exist or be partially empty. + */ +std::shared_ptr Task::get_next_token_from(TaskPtr t) +{ + return tokens_received_.front()[t]; +} + +void Task::fire() { + on_this_start(this); + on_start(this); + working_ = true; + queued_firings_ = std::max(queued_firings_ - 1, 0); + if (tokens_received_.size() > 0) + tokens_received_.pop_front(); +} + +/** @param successor The Task to add. * @brief Add a successor to this Task. * @note It also adds this as a predecessor of successor. */ @@ -147,8 +146,7 @@ void Task::add_successor(TaskPtr successor) }); } -/** @ingroup plugin_task - * @param successor The Task to remove. +/** @param successor The Task to remove. * @brief Remove a successor from this Task. * @note It also remove this from the predecessors of successor. */ @@ -171,34 +169,6 @@ void Task::remove_all_successors() }); } -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called before each execution. - * @note The function is called before the underlying Activity starts. - */ -void Task::on_this_start_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); }); -} - -/** @ingroup plugin_task - * @param func The function to set. - * @brief Set a function to be called after each execution. - * @note The function is called after the underlying Activity ends, but before sending tokens to successors. - */ -void Task::on_this_end_cb(const std::function& func) -{ - simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); }); -} - -/** @ingroup plugin_task - * @brief Return the number of completed executions. - */ -int Task::get_count() const -{ - return count_; -} - /** * @brief Default constructor. */ @@ -215,7 +185,7 @@ ExecTaskPtr ExecTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host) +ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) { return init(name)->set_flops(flops)->set_host(host); } @@ -227,25 +197,18 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos */ void ExecTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::ExecPtr exec = s4u::Exec::init(); - exec->set_name(name_); - exec->set_flops_amount(amount_); - exec->set_host(host_); + Task::fire(); + auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_); exec->start(); - exec->extension_set(new ExtendedAttributeActivity()); - exec->extension()->task_ = this; - current_activity_ = exec; + exec->on_this_completion_cb([this](Exec const&) { this->complete(); }); + set_current_activity(exec); } /** @ingroup plugin_task * @param host The host to set. * @brief Set a new host. */ -ExecTaskPtr ExecTask::set_host(s4u::Host* host) +ExecTaskPtr ExecTask::set_host(Host* host) { kernel::actor::simcall_answered([this, host] { host_ = host; }); return this; @@ -256,7 +219,7 @@ ExecTaskPtr ExecTask::set_host(s4u::Host* host) */ ExecTaskPtr ExecTask::set_flops(double flops) { - kernel::actor::simcall_answered([this, flops] { amount_ = flops; }); + kernel::actor::simcall_answered([this, flops] { set_amount(flops); }); return this; } @@ -276,7 +239,7 @@ CommTaskPtr CommTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart constructor. */ -CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination) +CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination) { return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } @@ -288,24 +251,18 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou */ void CommTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); - comm->set_name(name_); - comm->set_payload_size(amount_); + Task::fire(); + auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount()); comm->start(); - comm->extension_set(new ExtendedAttributeActivity()); - comm->extension()->task_ = this; - current_activity_ = comm; + comm->on_this_completion_cb([this](Comm const&) { this->complete(); }); + set_current_activity(comm); } /** @ingroup plugin_task * @param source The host to set. * @brief Set a new source host. */ -CommTaskPtr CommTask::set_source(s4u::Host* source) +CommTaskPtr CommTask::set_source(Host* source) { kernel::actor::simcall_answered([this, source] { source_ = source; }); return this; @@ -315,7 +272,7 @@ CommTaskPtr CommTask::set_source(s4u::Host* source) * @param destination The host to set. * @brief Set a new destination host. */ -CommTaskPtr CommTask::set_destination(s4u::Host* destination) +CommTaskPtr CommTask::set_destination(Host* destination) { kernel::actor::simcall_answered([this, destination] { destination_ = destination; }); return this; @@ -326,7 +283,7 @@ CommTaskPtr CommTask::set_destination(s4u::Host* destination) */ CommTaskPtr CommTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } @@ -346,7 +303,7 @@ IoTaskPtr IoTask::init(const std::string& name) /** @ingroup plugin_task * @brief Smart Constructor. */ -IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type) +IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type) { return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type); } @@ -355,7 +312,7 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s * @param disk The disk to set. * @brief Set a new disk. */ -IoTaskPtr IoTask::set_disk(s4u::Disk* disk) +IoTaskPtr IoTask::set_disk(Disk* disk) { kernel::actor::simcall_answered([this, disk] { disk_ = disk; }); return this; @@ -366,12 +323,12 @@ IoTaskPtr IoTask::set_disk(s4u::Disk* disk) */ IoTaskPtr IoTask::set_bytes(double bytes) { - kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; }); + kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); }); return this; } /** @ingroup plugin_task */ -IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) +IoTaskPtr IoTask::set_op_type(Io::OpType type) { kernel::actor::simcall_answered([this, type] { type_ = type; }); return this; @@ -379,19 +336,11 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) void IoTask::fire() { - on_this_start_(this); - Task::on_start(this); - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - s4u::IoPtr io = s4u::Io::init(); - io->set_name(name_); - io->set_size(amount_); - io->set_disk(disk_); - io->set_op_type(type_); + Task::fire(); + auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_); io->start(); - io->extension_set(new ExtendedAttributeActivity()); - io->extension()->task_ = this; - current_activity_ = io; + io->on_this_completion_cb([this](Io const&) { this->complete(); }); + set_current_activity(io); } -} // namespace simgrid::plugins +} // namespace simgrid::s4u diff --git a/teshsuite/s4u/storage_client_server/storage_client_server.cpp b/teshsuite/s4u/storage_client_server/storage_client_server.cpp index 351c4e4b75..8170ff0f83 100644 --- a/teshsuite/s4u/storage_client_server/storage_client_server.cpp +++ b/teshsuite/s4u/storage_client_server/storage_client_server.cpp @@ -76,12 +76,11 @@ static void get_set_disk_data(simgrid::s4u::Disk* disk) { XBT_INFO("*** GET/SET DATA for disk: %s ***", disk->get_cname()); - const std::string* data = disk->get_data(); + auto data = disk->get_unique_data(); XBT_INFO("Get data: '%s'", data ? data->c_str() : "No User Data"); disk->set_data(new std::string("Some data")); - data = disk->get_data(); + data = disk->get_unique_data(); XBT_INFO(" Set and get data: '%s'", data->c_str()); - delete data; } static void dump_platform_disks() diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 5fa39d64f4..9b9aaa8e06 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -25,7 +25,7 @@ set(EXTRA_DIST src/kernel/resource/models/network_ns3.hpp src/kernel/resource/models/ns3/ns3_simulator.hpp src/kernel/resource/models/ptask_L07.hpp - + src/mc/datatypes.h src/mc/mc.h src/mc/mc_mmu.hpp @@ -56,7 +56,7 @@ set(EXTRA_DIST src/xbt/log_private.hpp src/xbt/mallocator_private.h src/xbt/parmap.hpp - + src/xbt/mmalloc/mmalloc.h src/xbt/mmalloc/mfree.c src/xbt/mmalloc/mm_legacy.c @@ -342,7 +342,7 @@ set(KERNEL_SRC src/kernel/actor/SimcallObserver.hpp src/kernel/actor/SynchroObserver.cpp src/kernel/actor/SynchroObserver.hpp - + src/kernel/context/Context.cpp src/kernel/context/Context.hpp src/kernel/context/ContextRaw.cpp @@ -454,7 +454,6 @@ set(PLUGINS_SRC src/plugins/vm/VmLiveMigration.hpp src/plugins/vm/dirty_page_tracking.cpp src/plugins/battery.cpp - src/plugins/task.cpp src/plugins/photovoltaic.cpp ) @@ -475,6 +474,7 @@ set(S4U_SRC src/s4u/s4u_Mutex.cpp src/s4u/s4u_Netzone.cpp src/s4u/s4u_Semaphore.cpp + src/s4u/s4u_Task.cpp src/s4u/s4u_VirtualMachine.cpp ) @@ -521,7 +521,7 @@ set(MC_SRC_BASE src/mc/mc_replay.hpp src/mc/transition/Transition.cpp ) - + set(MC_SRC_STATELESS src/mc/api/ActorState.hpp src/mc/api/ClockVector.cpp @@ -530,7 +530,7 @@ set(MC_SRC_STATELESS src/mc/api/State.hpp src/mc/api/RemoteApp.cpp src/mc/api/RemoteApp.hpp - + src/mc/explo/DFSExplorer.cpp src/mc/explo/DFSExplorer.hpp src/mc/explo/Exploration.cpp @@ -577,7 +577,7 @@ set(MC_SRC_STATEFUL src/mc/explo/LivenessChecker.hpp src/mc/explo/UdporChecker.cpp src/mc/explo/UdporChecker.hpp - + src/mc/explo/udpor/Comb.hpp src/mc/explo/udpor/Configuration.hpp src/mc/explo/udpor/Configuration.cpp @@ -595,7 +595,7 @@ set(MC_SRC_STATEFUL src/mc/explo/udpor/Unfolding.hpp src/mc/explo/udpor/udpor_forward.hpp src/mc/explo/udpor/udpor_tests_private.hpp - + src/mc/inspect/DwarfExpression.cpp src/mc/inspect/DwarfExpression.hpp src/mc/inspect/Frame.cpp @@ -614,7 +614,7 @@ set(MC_SRC_STATEFUL src/mc/inspect/mc_unw.cpp src/mc/inspect/mc_unw.hpp src/mc/inspect/mc_unw_vmread.cpp - + src/mc/sosp/ChunkedData.cpp src/mc/sosp/ChunkedData.hpp src/mc/sosp/PageStore.cpp @@ -641,7 +641,7 @@ set(MC_SRC_STATEFUL src/mc/api/strategy/MinMatchComm.hpp src/mc/api/strategy/Strategy.hpp src/mc/api/strategy/UniformStrategy.hpp - + src/xbt/mmalloc/mm_interface.c ) @@ -661,7 +661,6 @@ set(headers_to_install include/simgrid/plugins/file_system.h include/simgrid/plugins/live_migration.h include/simgrid/plugins/load.h - include/simgrid/plugins/task.hpp include/simgrid/plugins/photovoltaic.hpp include/simgrid/plugins/ProducerConsumer.hpp include/simgrid/instr.h @@ -695,6 +694,7 @@ set(headers_to_install include/simgrid/s4u/Mutex.hpp include/simgrid/s4u/NetZone.hpp include/simgrid/s4u/Semaphore.hpp + include/simgrid/s4u/Task.hpp include/simgrid/s4u/VirtualMachine.hpp include/simgrid/s4u.hpp