Comm::set_payload_size() to change the size of the simulated data.
- New function: Engine::flatify_platform(), to get a fully detailed vision of the
configured platform.
+ - New Task abstraction: They are designed to represent dataflows, i.e, graphs of repeatable Activities.
+ See the examples under examples/cpp/task-* and the associated documentation.
- Full simDAG integration: Activity::start() actually starts only when all dependencies
are fullfiled. If it cannot be started right away, it will start as soon as it becomes
possible.
- Allow to set a concurrency limit on disks and hosts, as it was already the case for links.
- Rename Link::get_usage() to Link::get_load() for consistency with Host::
- Every signal now come with a static version that is invoked for every object of that class,
- and an instance version that is invoked for this specific object only. For example,
+ and an instance version that is invoked for this specific object only. For example,
s4u::Actor::on_suspend_cb() adds a callback that is invoked for the suspend of any actor while
s4u::Actor::on_this_suspend_cb() adds a callback for this specific actor only.
- Activity::on_suspended_cb() is renamed to Activity::on_suspend_cb(), and fired right before the suspend.
That is, callbacks registered in Exec::on_suspend_cb will not be fired for Comms nor Ios.
New S4U plugins:
- - Task: They are designed to represent dataflows, i.e, graphs of repeatable Activities.
- See the examples under examples/cpp/task-* and the documentation in the Plugins page.
- Battery: Enable the management of batteries on hosts.
See the examples under examples/cpp/battery-* and the documentation in the Plugins page.
- Photovoltaic: Enable the management of photovoltaic panels on hosts.
include examples/cpp/task-io/s4u-task-io.tesh
include examples/cpp/task-simple/s4u-task-simple.cpp
include examples/cpp/task-simple/s4u-task-simple.tesh
+include examples/cpp/task-storm/s4u-task-storm.cpp
+include examples/cpp/task-storm/s4u-task-storm.tesh
include examples/cpp/task-switch-host/s4u-task-switch-host.cpp
include examples/cpp/task-switch-host/s4u-task-switch-host.tesh
include examples/cpp/task-variable-load/s4u-task-variable-load.cpp
include include/simgrid/plugins/load.h
include include/simgrid/plugins/ns3.hpp
include include/simgrid/plugins/photovoltaic.hpp
-include include/simgrid/plugins/task.hpp
include include/simgrid/s4u.hpp
include include/simgrid/s4u/Activity.hpp
include include/simgrid/s4u/Actor.hpp
include include/simgrid/s4u/Mutex.hpp
include include/simgrid/s4u/NetZone.hpp
include include/simgrid/s4u/Semaphore.hpp
+include include/simgrid/s4u/Task.hpp
include include/simgrid/s4u/VirtualMachine.hpp
include include/simgrid/semaphore.h
include include/simgrid/simix.h
include src/mc/explo/udpor/EventSet.cpp
include src/mc/explo/udpor/EventSet.hpp
include src/mc/explo/udpor/EventSet_test.cpp
-include src/mc/explo/udpor/ExtensionSet_test.cpp
include src/mc/explo/udpor/ExtensionSetCalculator.cpp
include src/mc/explo/udpor/ExtensionSetCalculator.hpp
+include src/mc/explo/udpor/ExtensionSet_test.cpp
include src/mc/explo/udpor/History.cpp
include src/mc/explo/udpor/History.hpp
include src/mc/explo/udpor/History_test.cpp
include src/plugins/link_energy_wifi.cpp
include src/plugins/link_load.cpp
include src/plugins/photovoltaic.cpp
-include src/plugins/task.cpp
include src/plugins/vm/VmLiveMigration.cpp
include src/plugins/vm/VmLiveMigration.hpp
include src/plugins/vm/dirty_page_tracking.cpp
include src/s4u/s4u_Mutex.cpp
include src/s4u/s4u_Netzone.cpp
include src/s4u/s4u_Semaphore.cpp
+include src/s4u/s4u_Task.cpp
include src/s4u/s4u_VirtualMachine.cpp
include src/simgrid/Exception.cpp
include src/simgrid/math_utils.h
.. |API_s4u_Activities| replace:: **Activities**
.. _API_s4u_Activities: #api-s4u-activity
+.. |API_s4u_Tasks| replace:: **Tasks**
+.. _API_s4u_Tasks: #api-s4u-task
+
.. |API_s4u_Hosts| replace:: **Hosts**
.. _API_s4u_Hosts: #api-s4u-host
:dedent: 2
=====================
-Activities Life cycle
+Activities Life Cycle
=====================
Sometimes, you want to change the setting of an activity before it even starts.
.. todo:: write this section
+=====================
+Repeatable Activities
+=====================
+
+In order to simulate the execution of Dataflow applications, we introduced the
+concept of |API_s4u_Tasks|, that can be seen as repeatable activities. A Dataflow
+is defined as a graph of |API_s4u_Tasks| through which circulate Tokens. Tokens
+can carry any user-defined data, using the same internal mechanisms as for the
+other simulated objects. Each Task has to receive a token from each of its
+predecessor to fire a new instance of a |API_s4u_Comm|, |API_s4u_Exec|, or
+|API_s4u_Io| activity. On completion of this activity, the Task propagates tokens
+to its successors, and waits for the next set of tokens to arrive.
+
+To initiate the execution of a Dataflow, it is possible to some make
+|API_s4u_Tasks| fire one or more activities without waiting for any token with the
+:cpp:func:`s4u::Task::enqueue_firings() <simgrid::s4u::Task::enqueue_firings>`
+function.
+
+The parameters and successors of a Task can be redefined at runtime by attaching
+callbacks to the
+:cpp:func:`s4u::Task::on_this_start <simgrid::s4u::Task::on_this_start>`
+and
+:cpp:func:`s4u::Task::on_this_completion <simgrid::s4u::Task::on_this_completion>`
+signals.
+
+
.. _s4u_mailbox:
Mailboxes
.. group-tab:: C++
- .. doxygenfunction:: simgrid::s4u::Activity::get_cname
- .. doxygenfunction:: simgrid::s4u::Activity::get_name
+ .. doxygenfunction:: simgrid::s4u::Activity::get_cname() const
+ .. doxygenfunction:: simgrid::s4u::Activity::get_name() const
.. doxygenfunction:: simgrid::s4u::Activity::get_remaining() const
.. doxygenfunction:: simgrid::s4u::Activity::get_state() const
.. doxygenfunction:: simgrid::s4u::Activity::set_remaining(double remains)
.. group-tab:: C++
.. doxygenfunction:: simgrid::s4u::Comm::on_start_cb
+ .. doxygenfunction:: simgrid::s4u::Comm::on_this_start_cb
.. doxygenfunction:: simgrid::s4u::Comm::on_completion_cb
+ .. doxygenfunction:: simgrid::s4u::Comm::on_this_completion_cb
.. doxygenfunction:: simgrid::s4u::Comm::on_recv_cb
.. doxygenfunction:: simgrid::s4u::Comm::on_send_cb
-
- .. doxygenfunction:: simgrid::s4u::Comm::on_completion_cb
.. doxygenfunction:: simgrid::s4u::Comm::on_suspended_cb
.. doxygenfunction:: simgrid::s4u::Comm::on_resumed_cb
.. doxygenfunction:: simgrid::s4u::Comm::on_veto_cb
.. group-tab:: C++
.. doxygenfunction:: simgrid::s4u::Exec::on_start_cb
+ .. doxygenfunction:: simgrid::s4u::Exec::on_this_start_cb
.. doxygenfunction:: simgrid::s4u::Exec::on_completion_cb
+ .. doxygenfunction:: simgrid::s4u::Exec::on_this_completion_cb
- .. doxygenfunction:: simgrid::s4u::Exec::on_completion_cb
.. doxygenfunction:: simgrid::s4u::Exec::on_suspended_cb
.. doxygenfunction:: simgrid::s4u::Exec::on_resumed_cb
.. doxygenfunction:: simgrid::s4u::Exec::on_veto_cb
.. group-tab:: C++
.. doxygenfunction:: simgrid::s4u::Io::on_start_cb
+ .. doxygenfunction:: simgrid::s4u::Io::on_this_start_cb
.. doxygenfunction:: simgrid::s4u::Io::on_completion_cb
+ .. doxygenfunction:: simgrid::s4u::Io::on_this_completion_cb
- .. doxygenfunction:: simgrid::s4u::Io::on_completion_cb
.. doxygenfunction:: simgrid::s4u::Io::on_suspended_cb
.. doxygenfunction:: simgrid::s4u::Io::on_resumed_cb
.. doxygenfunction:: simgrid::s4u::Io::on_veto_cb
+
+.. _API_s4u_Tasks:
+
+==========
+Tasks
+==========
+
+==============
+class Task
+==============
+
+.. doxygenclass:: simgrid::s4u::Task
+
+**Known subclasses:**
+:ref:`Communication Tasks <API_s4u_CommTask>`,
+:ref:`Executions Tasks <API_s4u_ExecTask>`,
+:ref:`I/O Tasks <API_s4u_Task>`.
+See also the :ref:`section on activities <s4u_Tasks>` above.
+
+Basic management
+----------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. code-block:: C++
+
+ #include <simgrid/s4u/Task.hpp>
+
+ .. doxygentypedef:: TaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. doxygenfunction:: simgrid::s4u::Task::get_cname() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_name() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_count() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_amount() const
+ .. doxygenfunction:: simgrid::s4u::Task::set_amount(double amount)
+
+Life cycle
+----------
+
+.. tabs::
+
+ .. group-tab:: C++
+ .. doxygenfunction:: simgrid::s4u::Task::enqueue_firings(int n)
+
+Managing Dependencies
+---------------------
+.. tabs::
+
+ .. group-tab:: C++
+ .. doxygenfunction:: simgrid::s4u::Task::add_successor(TaskPtr t)
+ .. doxygenfunction:: simgrid::s4u::Task::remove_successor(TaskPtr t)
+ .. doxygenfunction:: simgrid::s4u::Task::remove_all_successors()
+ .. doxygenfunction:: simgrid::s4u::Task::get_successors() const
+
+Managing Tokens
+---------------
+.. doxygenclass:: simgrid::s4u::Token
+
+.. tabs::
+
+ .. group-tab:: C++
+ .. doxygenfunction:: simgrid::s4u::Task::set_token(std::shared_ptr<Token> token)
+ .. doxygenfunction:: simgrid::s4u::Task::get_next_token_from(TaskPtr t)
+
+Signals
+-------
+
+.. tabs::
+
+ .. group-tab:: C++
+ .. doxygenfunction:: simgrid::s4u::Task::on_start_cb
+ .. doxygenfunction:: simgrid::s4u::Task::on_this_start_cb
+ .. doxygenfunction:: simgrid::s4u::Task::on_completion_cb
+ .. doxygenfunction:: simgrid::s4u::Task::on_this_completion_cb
+
+.. _API_s4u_CommTask:
+
+================
+ class CommTask
+================
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. doxygenclass:: simgrid::s4u::CommTask
+
+Basic management
+----------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. code-block:: C++
+
+ #include <simgrid/s4u/Task.hpp>
+
+ .. doxygentypedef:: CommTaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. doxygenfunction:: simgrid::s4u::Task::get_source() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_destination() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_bytes() const
+ .. doxygenfunction:: simgrid::s4u::Task::set_source(simgrid::s4u::Host* source);
+ .. doxygenfunction:: simgrid::s4u::Task::set_destination(simgrid::s4u::Host* destination);
+ .. doxygenfunction:: simgrid::s4u::Task::set_bytes(double bytes)
+
+
+.. _API_s4u_ExecTask:
+
+================
+ class ExecTask
+================
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. doxygenclass:: simgrid::s4u::ExecTask
+
+Basic management
+----------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. code-block:: C++
+
+ #include <simgrid/s4u/Task.hpp>
+
+ .. doxygentypedef:: ExecTaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. doxygenfunction:: simgrid::s4u::Task::get_host() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_flops() const
+ .. doxygenfunction:: simgrid::s4u::Task::set_host(simgrid::s4u::Host* host);
+ .. doxygenfunction:: simgrid::s4u::Task::set_flops(double flops);
+
+.. _API_s4u_IoTask:
+
+================
+ class IoTask
+================
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. doxygenclass:: simgrid::s4u::IoTask
+
+Basic management
+----------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. code-block:: C++
+
+ #include <simgrid/s4u/Task.hpp>
+
+ .. doxygentypedef:: IoTaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+ .. group-tab:: C++
+
+ .. doxygenfunction:: simgrid::s4u::Task::get_disk() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_bytes() const
+ .. doxygenfunction:: simgrid::s4u::Task::get_op_type() const
+ .. doxygenfunction:: simgrid::s4u::Task::set_disk(simgrid::s4u::Disk* disk);
+ .. doxygenfunction:: simgrid::s4u::Task::set_bytes(simgrid::double bytes);
+ .. doxygenfunction:: simgrid::s4u::Task::set_op_type(simgrid::s4u::Io::OpType type);
+
.. _API_s4u_Synchronizations:
=======================
mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
network-ns3 network-ns3-wifi network-wifi
io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
- task-io task-simple task-variable-load task-switch-host
+ task-io task-simple task-variable-load task-storm task-switch-host
photovoltaic-simple
platform-comm-serialize platform-failures platform-profile platform-properties
plugin-host-load plugin-link-load plugin-prodcons
// Test attaching some user data to the file
file->set_data(new std::string("777"));
- const auto* file_data = file->get_data<std::string>();
+ auto file_data = file->get_unique_data<std::string>();
XBT_INFO("User data attached to the file: %s", file_data->c_str());
- delete file_data;
// Close the file
file->close();
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-/* This example demonstrate basic use of the task plugin.
+/* This example demonstrate basic use of tasks.
*
* We model the following graph:
*
* comm is a communication task.
*/
-#include "simgrid/plugins/task.hpp"
+#include "simgrid/s4u/Task.hpp"
#include "simgrid/s4u.hpp"
#include <simgrid/plugins/file_system.h>
XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
+namespace sg4 = simgrid::s4u;
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retrieve hosts
auto* bob = e.host_by_name("bob");
auto* carl = e.host_by_name("carl");
// Create tasks
- auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, bob);
- auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, carl);
- auto write = simgrid::plugins::IoTask::init("write", 1e7, bob->get_disks().front(), simgrid::s4u::Io::OpType::WRITE);
- auto read = simgrid::plugins::IoTask::init("read", 1e7, carl->get_disks().front(), simgrid::s4u::Io::OpType::READ);
+ auto exec1 = sg4::ExecTask::init("exec1", 1e9, bob);
+ auto exec2 = sg4::ExecTask::init("exec2", 1e9, carl);
+ auto write = sg4::IoTask::init("write", 1e7, bob->get_disks().front(), sg4::Io::OpType::WRITE);
+ auto read = sg4::IoTask::init("read", 1e7, carl->get_disks().front(), sg4::Io::OpType::READ);
// Create the graph by defining dependencies between tasks
exec1->add_successor(write);
read->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
- // Enqueue two executions for task exec1
- exec1->enqueue_execs(2);
+ // Enqueue two firings for task exec1
+ exec1->enqueue_firings(2);
// Start the simulation
e.run();
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-/* This example demonstrate basic use of the task plugin.
+/* This example demonstrate basic use of tasks.
*
* We model the following graph:
*
* comm is a communication task.
*/
-#include "simgrid/plugins/task.hpp"
#include "simgrid/s4u.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
+namespace sg4 = simgrid::s4u;
+
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retrieve hosts
auto* tremblay = e.host_by_name("Tremblay");
auto* jupiter = e.host_by_name("Jupiter");
// Create tasks
- auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, tremblay);
- auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, jupiter);
- auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
+ auto exec1 = sg4::ExecTask::init("exec1", 1e9, tremblay);
+ auto exec2 = sg4::ExecTask::init("exec2", 1e9, jupiter);
+ auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
// Create the graph by defining dependencies between tasks
exec1->add_successor(comm);
comm->add_successor(exec2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
- // Enqueue two executions for task exec1
- exec1->enqueue_execs(2);
+ // Enqueue two firings for task exec1
+ exec1->enqueue_firings(2);
// Start the simulation
e.run();
--- /dev/null
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
+ and use them to build a simulation of a stream processing application
+
+ Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+ Spout SB produces 1e6 bytes every 200ms.
+
+ Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
+ Bolt B3 processes data from Spout SB.
+ Bolt B4 processes data from Bolt B3.
+
+ Fafard
+ ┌────┐
+ ┌──►│ B1 │
+ Tremblay │ └────┘
+ ┌────┐ │
+ │ SA ├────┤ Ginette
+ └────┘ │ ┌────┐
+ └──►│ B2 │
+ └────┘
+
+
+ Bourassa
+ Jupiter ┌──────────┐
+ ┌────┐ │ │
+ │ SB ├─────┤ B3 ──► B4│
+ └────┘ │ │
+ └──────────┘
+ */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+
+ // Retrieve hosts
+ auto tremblay = e.host_by_name("Tremblay");
+ auto jupiter = e.host_by_name("Jupiter");
+ auto fafard = e.host_by_name("Fafard");
+ auto ginette = e.host_by_name("Ginette");
+ auto bourassa = e.host_by_name("Bourassa");
+
+ // Create execution tasks
+ auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+ auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+ auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+ auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+ auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+ auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
+
+ // Create communication tasks
+ auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+ auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+ auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+
+ // Create the graph by defining dependencies between tasks
+ // Some dependencies are defined dynamically
+ SA_to_B1->add_successor(B1);
+ SA_to_B2->add_successor(B2);
+ SB->add_successor(SB_to_B3);
+ SB_to_B3->add_successor(B3);
+ B3->add_successor(B4);
+
+ /* Dynamic modification of the graph and bytes sent
+ Alternatively we: remove/add the link between SA and SA_to_B2
+ add/remove the link between SA and SA_to_B1
+ */
+ SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) {
+ int count = t->get_count();
+ sg4::CommTaskPtr comm;
+ if (count % 2 == 0) {
+ t->remove_successor(SA_to_B2);
+ t->add_successor(SA_to_B1);
+ comm = SA_to_B1;
+ }
+ else {
+ t->remove_successor(SA_to_B1);
+ t->add_successor(SA_to_B2);
+ comm = SA_to_B2;
+ }
+ std::vector<double> amount = {1e3,1e6,1e9};
+ comm->set_amount(amount[count % 3]);
+ auto token = std::make_shared<sg4::Token>();
+ token->set_data(new double(amount[count % 3]));
+ t->set_token(token);
+ });
+
+ // The token sent by SA is forwarded by both communication tasks
+ SA_to_B1->on_this_start_cb([SA](sg4::Task* t) {
+ t->set_token(t->get_next_token_from(SA));
+ });
+ SA_to_B2->on_this_start_cb([SA](sg4::Task* t) {
+ t->set_token(t->get_next_token_from(SA));
+ });
+
+ /* B1 and B2 read the value of the token received by their predecessors
+ and use it to adapt their amount of work to do.
+ */
+ B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
+ auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+ t->set_amount(*data * 10);
+ });
+ B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
+ auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+ t->set_amount(*data * 10);
+ });
+
+ // Enqueue firings for tasks without predecessors
+ SA->enqueue_firings(5);
+ SB->enqueue_firings(5);
+
+ // Add a function to be called when tasks end for log purpose
+ sg4::Task::on_completion_cb([]
+ (const sg4::Task* t) {
+ XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
+ });
+
+ // Start the simulation
+ e.run();
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml
+> [0.100000] [task_storm/INFO] Task SA finished (1)
+> [0.125841] [task_storm/INFO] Task SA_to_B1 finished (1)
+> [0.125972] [task_storm/INFO] Task B1 finished (1)
+> [0.200000] [task_storm/INFO] Task SB finished (1)
+> [0.200000] [task_storm/INFO] Task SA finished (2)
+> [0.300000] [task_storm/INFO] Task SA finished (3)
+> [0.364429] [task_storm/INFO] Task SA_to_B2 finished (1)
+> [0.400000] [task_storm/INFO] Task SB finished (2)
+> [0.400000] [task_storm/INFO] Task SA finished (4)
+> [0.416759] [task_storm/INFO] Task SA_to_B2 finished (2)
+> [0.500000] [task_storm/INFO] Task SA finished (5)
+> [0.557036] [task_storm/INFO] Task SB_to_B3 finished (1)
+> [0.570648] [task_storm/INFO] Task B2 finished (1)
+> [0.570855] [task_storm/INFO] Task B2 finished (2)
+> [0.600000] [task_storm/INFO] Task SB finished (3)
+> [0.800000] [task_storm/INFO] Task SB finished (4)
+> [0.867388] [task_storm/INFO] Task SB_to_B3 finished (2)
+> [1.000000] [task_storm/INFO] Task SB finished (5)
+> [1.177739] [task_storm/INFO] Task SB_to_B3 finished (3)
+> [1.488090] [task_storm/INFO] Task SB_to_B3 finished (4)
+> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
+> [2.619232] [task_storm/INFO] Task B3 finished (1)
+> [6.743624] [task_storm/INFO] Task B3 finished (2)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
+> [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [14.992407] [task_storm/INFO] Task B3 finished (4)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
+> [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [23.241190] [task_storm/INFO] Task B4 finished (3)
+> [27.365582] [task_storm/INFO] Task B4 finished (4)
+> [31.489974] [task_storm/INFO] Task B4 finished (5)
+> [133.367321] [task_storm/INFO] Task SA_to_B1 finished (2)
+> [133.525717] [task_storm/INFO] Task SA_to_B1 finished (3)
+> [264.435791] [task_storm/INFO] Task B1 finished (2)
+> [264.566859] [task_storm/INFO] Task B1 finished (3)
* With exec1 and exec2 on different hosts.
*/
-#include "simgrid/plugins/task.hpp"
#include "simgrid/s4u.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(task_switch_host, "Messages specific for this task example");
+namespace sg4 = simgrid::s4u;
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retrieve hosts
auto* tremblay = e.host_by_name("Tremblay");
auto* fafard = e.host_by_name("Fafard");
// Create tasks
- auto comm0 = simgrid::plugins::CommTask::init("comm0");
+ auto comm0 = sg4::CommTask::init("comm0");
comm0->set_bytes(1e7);
comm0->set_source(tremblay);
- auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, jupiter);
- auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, fafard);
- auto comm1 = simgrid::plugins::CommTask::init("comm1", 1e7, jupiter, tremblay);
- auto comm2 = simgrid::plugins::CommTask::init("comm2", 1e7, fafard, tremblay);
+ auto exec1 = sg4::ExecTask::init("exec1", 1e9, jupiter);
+ auto exec2 = sg4::ExecTask::init("exec2", 1e9, fafard);
+ auto comm1 = sg4::CommTask::init("comm1", 1e7, jupiter, tremblay);
+ auto comm2 = sg4::CommTask::init("comm2", 1e7, fafard, tremblay);
// Create the initial graph by defining dependencies between tasks
comm0->add_successor(exec2);
exec2->add_successor(comm2);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
- // Add a function to be called before each executions of comm0
+ // Add a function to be called before each firing of comm0
// This function modifies the graph of tasks by adding or removing
// successors to comm0
- comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
- auto* comm0 = dynamic_cast<simgrid::plugins::CommTask*>(t);
+ comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) {
static int count = 0;
if (count % 2 == 0) {
comm0->set_destination(jupiter);
count++;
});
- // Enqueue four executions for task comm0
- comm0->enqueue_execs(4);
+ // Enqueue four firings for task comm0
+ comm0->enqueue_firings(4);
// Start the simulation
e.run();
* With a heavy load there is a burst of comm before the exec task can even finish once.
*/
-#include "simgrid/plugins/task.hpp"
#include "simgrid/s4u.hpp"
XBT_LOG_NEW_DEFAULT_CATEGORY(task_variable_load, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
-static void variable_load(simgrid::plugins::TaskPtr t)
+static void variable_load(sg4::TaskPtr t)
{
XBT_INFO("--- Small load ---");
for (int i = 0; i < 3; i++) {
- t->enqueue_execs(1);
- simgrid::s4u::this_actor::sleep_for(100);
+ t->enqueue_firings(1);
+ sg4::this_actor::sleep_for(100);
}
- simgrid::s4u::this_actor::sleep_until(1000);
+ sg4::this_actor::sleep_until(1000);
XBT_INFO("--- Heavy load ---");
for (int i = 0; i < 3; i++) {
- t->enqueue_execs(1);
- simgrid::s4u::this_actor::sleep_for(1);
+ t->enqueue_firings(1);
+ sg4::this_actor::sleep_for(1);
}
}
int main(int argc, char* argv[])
{
- simgrid::s4u::Engine e(&argc, argv);
+ sg4::Engine e(&argc, argv);
e.load_platform(argv[1]);
- simgrid::plugins::Task::init();
// Retreive hosts
auto* tremblay = e.host_by_name("Tremblay");
auto* jupiter = e.host_by_name("Jupiter");
// Create tasks
- auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
- auto exec = simgrid::plugins::ExecTask::init("exec", 1e9, jupiter);
+ auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
+ auto exec = sg4::ExecTask::init("exec", 1e9, jupiter);
// Create the graph by defining dependencies between tasks
comm->add_successor(exec);
// Add a function to be called when tasks end for log purpose
- simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+ sg4::Task::on_completion_cb([](const sg4::Task* t) {
XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
});
// Create the actor that will inject load during the simulation
- simgrid::s4u::Actor::create("input", tremblay, variable_load, comm);
+ sg4::Actor::create("input", tremblay, variable_load, comm);
// Start the simulation
e.run();
args = parse()
e = Engine(sys.argv)
e.load_platform(args.platform)
- Task.init()
# Retrieve hosts
bob = e.host_by_name('bob')
read.add_successor(exec2)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
- # Enqueue two executions for task exec1
- exec1.enqueue_execs(2)
+ # Enqueue two firings for task exec1
+ exec1.enqueue_firings(2)
# runs the simulation
e.run()
args = parse()
e = Engine(sys.argv)
e.load_platform(args.platform)
- Task.init()
# Retrieve hosts
tremblay = e.host_by_name('Tremblay')
comm.add_successor(exec2)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
- # Enqueue two executions for task exec1
- exec1.enqueue_execs(2)
+ # Enqueue two firings for task exec1
+ exec1.enqueue_firings(2)
# runs the simulation
e.run()
-
args = parse()
e = Engine(sys.argv)
e.load_platform(args.platform)
- Task.init()
# Retrieve hosts
tremblay = e.host_by_name('Tremblay')
exec2.add_successor(comm2)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
- # Add a function to be called before each executions of comm0
+ # Add a function to be called before each firing of comm0
# This function modifies the graph of tasks by adding or removing
# successors to comm0
comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
- # Enqueue two executions for task exec1
- comm0.enqueue_execs(4)
+ # Enqueue two firings for task exec1
+ comm0.enqueue_firings(4)
# runs the simulation
e.run()
def variable_load(t):
print('--- Small load ---')
for _ in range(3):
- t.enqueue_execs(1)
+ t.enqueue_firings(1)
this_actor.sleep_for(100)
this_actor.sleep_for(1000)
print('--- Heavy load ---')
for _ in range(3):
- t.enqueue_execs(1)
+ t.enqueue_firings(1)
this_actor.sleep_for(1)
if __name__ == '__main__':
args = parse()
e = Engine(sys.argv)
e.load_platform(args.platform)
- Task.init()
# Retrieve hosts
tremblay = e.host_by_name('Tremblay')
comm.add_successor(exec)
# Add a function to be called when tasks end for log purpose
- Task.on_end_cb(callback)
+ Task.on_completion_cb(callback)
# Create the actor that will inject load during the simulation
Actor.create("input", tremblay, variable_load, comm)
#include <simgrid/s4u/Mutex.hpp>
#include <simgrid/s4u/NetZone.hpp>
#include <simgrid/s4u/Semaphore.hpp>
+#include <simgrid/s4u/Task.hpp>
#include <simgrid/s4u/VirtualMachine.hpp>
#include <simgrid/Exception.hpp>
Comm() = default;
Comm* do_start() override;
-protected:
static xbt::signal<void(Comm const&)> on_send;
xbt::signal<void(Comm const&)> on_this_send;
static xbt::signal<void(Comm const&)> on_recv;
inline static xbt::signal<void(Comm const&)> on_start;
xbt::signal<void(Comm const&)> on_this_start;
+protected:
void fire_on_completion() const override {
/* The completion signal of a Comm has to be thrown only once and not by the sender AND the receiver.
then Comm::on_completion is thrown in the kernel in CommImpl::finish.
bool parallel_ = false;
+ inline static xbt::signal<void(Exec const&)> on_start;
+ xbt::signal<void(Exec const&)> on_this_start;
+
protected:
explicit Exec(kernel::activity::ExecImplPtr pimpl);
Exec* do_start() override;
void reset() const;
- inline static xbt::signal<void(Exec const&)> on_start;
- xbt::signal<void(Exec const&)> on_this_start;
void fire_on_completion() const override { on_completion(*this); }
void fire_on_this_completion() const override { on_this_completion(*this); }
void fire_on_suspend() const override { on_suspend(*this); }
-#ifndef SIMGRID_PLUGINS_TASK_H_
-#define SIMGRID_PLUGINS_TASK_H_
+#ifndef SIMGRID_S4U_TASK_H_
+#define SIMGRID_S4U_TASK_H_
#include <simgrid/s4u/Activity.hpp>
#include <simgrid/s4u/Io.hpp>
#include <xbt/Extendable.hpp>
#include <atomic>
+#include <deque>
#include <map>
#include <memory>
#include <set>
-namespace simgrid::plugins {
+namespace simgrid::s4u {
class Task;
using TaskPtr = boost::intrusive_ptr<Task>;
class IoTask;
using IoTaskPtr = boost::intrusive_ptr<IoTask>;
-struct ExtendedAttributeActivity {
- static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
- Task* task_;
-};
+class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
class Task {
+ std::string name_;
+ double amount_;
+ int queued_firings_ = 0;
+ int count_ = 0;
+ bool working_ = false;
+
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
+ std::atomic_int_fast32_t refcount_{0};
bool ready_to_run() const;
void receive(Task* source);
- void complete();
+
+ std::shared_ptr<Token> token_ = nullptr;
+ std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+ ActivityPtr previous_activity_;
+ ActivityPtr current_activity_;
protected:
- std::string name_;
- double amount_;
- int queued_execs_ = 0;
- int count_ = 0;
- bool working_ = false;
- s4u::ActivityPtr previous_activity_;
- s4u::ActivityPtr current_activity_;
- xbt::signal<void(Task*)> on_this_start_;
- xbt::signal<void(Task*)> on_this_end_;
explicit Task(const std::string& name);
virtual ~Task() = default;
- virtual void fire() = 0;
- static xbt::signal<void(Task*)> on_start;
- static xbt::signal<void(Task*)> on_end;
- std::atomic_int_fast32_t refcount_{0};
+ virtual void fire();
+ void complete();
+
+ void set_current_activity (ActivityPtr a) { current_activity_ = a; }
+
+ inline static xbt::signal<void(Task*)> on_start;
+ xbt::signal<void(Task*)> on_this_start;
+ inline static xbt::signal<void(Task*)> on_completion;
+ xbt::signal<void(Task*)> on_this_completion;
public:
- static void init();
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
- void enqueue_execs(int n);
void set_amount(double amount);
double get_amount() const { return amount_; }
+ int get_count() const { return count_; }
+
+ void set_token(std::shared_ptr<Token> token);
+ std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
void remove_all_successors();
const std::set<Task*>& get_successors() const { return successors_; }
- void on_this_start_cb(const std::function<void(Task*)>& func);
- void on_this_end_cb(const std::function<void(Task*)>& func);
- int get_count() const;
- /** Add a callback fired before a task activity start.
+ void enqueue_firings(int n);
+
+ /** Add a callback fired before this task activity starts */
+ void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
+ /** Add a callback fired before a task activity starts.
* Triggered after the on_this_start function**/
static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
- /** Add a callback fired after a task activity end.
- * Triggered after the on_this_end function, but before
- * sending tokens to successors.**/
- static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+ /** Add a callback fired before this task activity ends */
+ void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+ /** Add a callback fired after a task activity ends.
+ * Triggered after the on_this_end function, but before sending tokens to successors.**/
+ static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
#ifndef DOXYGEN
friend void intrusive_ptr_release(Task* o)
#endif
};
-class ExecTask : public Task {
- s4u::Host* host_;
+class CommTask : public Task {
+ Host* source_;
+ Host* destination_;
- explicit ExecTask(const std::string& name);
+ explicit CommTask(const std::string& name);
void fire() override;
public:
- static ExecTaskPtr init(const std::string& name);
- static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host);
- ExecTaskPtr set_host(s4u::Host* host);
- s4u::Host* get_host() const { return host_; }
- ExecTaskPtr set_flops(double flops);
- double get_flops() const { return get_amount(); }
+ static CommTaskPtr init(const std::string& name);
+ static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination);
+
+ CommTaskPtr set_source(Host* source);
+ Host* get_source() const { return source_; }
+ CommTaskPtr set_destination(Host* destination);
+ Host* get_destination() const { return destination_; }
+ CommTaskPtr set_bytes(double bytes);
+ double get_bytes() const { return get_amount(); }
};
-class CommTask : public Task {
- s4u::Host* source_;
- s4u::Host* destination_;
+class ExecTask : public Task {
+ Host* host_;
- explicit CommTask(const std::string& name);
+ explicit ExecTask(const std::string& name);
void fire() override;
public:
- static CommTaskPtr init(const std::string& name);
- static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination);
- CommTaskPtr set_source(s4u::Host* source);
- s4u::Host* get_source() const { return source_; }
- CommTaskPtr set_destination(s4u::Host* destination);
- s4u::Host* get_destination() const { return destination_; }
- CommTaskPtr set_bytes(double bytes);
- double get_bytes() const { return get_amount(); }
+ static ExecTaskPtr init(const std::string& name);
+ static ExecTaskPtr init(const std::string& name, double flops, Host* host);
+
+ ExecTaskPtr set_host(Host* host);
+ Host* get_host() const { return host_; }
+ ExecTaskPtr set_flops(double flops);
+ double get_flops() const { return get_amount(); }
};
class IoTask : public Task {
- s4u::Disk* disk_;
- s4u::Io::OpType type_;
+ Disk* disk_;
+ Io::OpType type_;
explicit IoTask(const std::string& name);
void fire() override;
public:
static IoTaskPtr init(const std::string& name);
- static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type);
- IoTaskPtr set_disk(s4u::Disk* disk);
- s4u::Disk* get_disk() const { return disk_; }
+ static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type);
+
+ IoTaskPtr set_disk(Disk* disk);
+ Disk* get_disk() const { return disk_; }
IoTaskPtr set_bytes(double bytes);
double get_bytes() const { return get_amount(); }
- IoTaskPtr set_op_type(s4u::Io::OpType type);
- s4u::Io::OpType get_op_type() const { return type_; }
+ IoTaskPtr set_op_type(Io::OpType type);
+ Io::OpType get_op_type() const { return type_; }
};
-} // namespace simgrid::plugins
+} // namespace simgrid::s4u
#endif
#include "simgrid/kernel/ProfileBuilder.hpp"
#include "simgrid/kernel/routing/NetPoint.hpp"
#include <simgrid/Exception.hpp>
-#include <simgrid/plugins/task.hpp>
#include <simgrid/s4u/Actor.hpp>
#include <simgrid/s4u/Barrier.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Mutex.hpp>
#include <simgrid/s4u/NetZone.hpp>
#include <simgrid/s4u/Semaphore.hpp>
+#include <simgrid/s4u/Task.hpp>
#include <simgrid/version.h>
#include <algorithm>
#include <vector>
namespace py = pybind11;
-using simgrid::plugins::CommTask;
-using simgrid::plugins::CommTaskPtr;
-using simgrid::plugins::ExecTask;
-using simgrid::plugins::ExecTaskPtr;
-using simgrid::plugins::IoTask;
-using simgrid::plugins::IoTaskPtr;
-using simgrid::plugins::Task;
-using simgrid::plugins::TaskPtr;
+using simgrid::s4u::CommTask;
+using simgrid::s4u::CommTaskPtr;
+using simgrid::s4u::ExecTask;
+using simgrid::s4u::ExecTaskPtr;
+using simgrid::s4u::IoTask;
+using simgrid::s4u::IoTaskPtr;
+using simgrid::s4u::Task;
+using simgrid::s4u::TaskPtr;
using simgrid::s4u::Actor;
using simgrid::s4u::ActorPtr;
using simgrid::s4u::Barrier;
/* Class Task */
py::class_<Task, TaskPtr>(m, "Task", "Task. See the C++ documentation for details.")
- .def_static("init", &Task::init)
.def_static(
"on_start_cb",
[](py::object cb) {
},
"Add a callback called when each task starts.")
.def_static(
- "on_end_cb",
+ "on_completion_cb",
[](py::object cb) {
cb.inc_ref(); // keep alive after return
const py::gil_scoped_release gil_release;
- Task::on_end_cb([cb_p = cb.ptr()](Task* op) {
+ Task::on_completion_cb([cb_p = cb.ptr()](Task* op) {
const py::gil_scoped_acquire py_context; // need a new context for callback
py::reinterpret_borrow<py::function>(cb_p)(op);
});
.def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
.def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
.def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
- .def("enqueue_execs", py::overload_cast<int>(&Task::enqueue_execs), py::call_guard<py::gil_scoped_release>(),
- py::arg("n"), "Enqueue executions for this task.")
+ .def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
+ py::arg("n"), "Enqueue firings for this task.")
.def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
py::arg("op"), "Add a successor to this task.")
.def("remove_successor", py::overload_cast<TaskPtr>(&Task::remove_successor),
"Remove all successors of this task.")
.def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
py::arg("func"), "Add a callback called when this task starts.")
- .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+ .def("on_this_completion_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_completion_cb),
py::arg("func"), "Add a callback called when this task ends.")
.def(
"__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
#include "src/mc/explo/odpor/WakeupTreeIterator.hpp"
#include "src/mc/explo/odpor/WakeupTree.hpp"
+#include "xbt/asserts.h"
namespace simgrid::mc::odpor {
// there are no cycles. This means that at least
// one node in the tree won't have any children,
// so the loop will eventually terminate
- auto* cur_top_node = *post_order_iteration.top();
+ WakeupTreeNode* cur_top_node = *post_order_iteration.top();
while (not cur_top_node->is_leaf()) {
// INVARIANT: Since we push children in
// reverse order (right-most to left-most),
// we ensure that we'll always process left-most
// children first
auto& children = cur_top_node->children_;
-
for (auto iter = children.rbegin(); iter != children.rend(); ++iter) {
- // iter.base() points one element past where we seek; hence,
- // we move it over one position
+ // iter.base() points one element past where we seek; that is,
+ // we want the value one position forward
post_order_iteration.push(std::prev(iter.base()));
}
+ has_added_children.push(cur_top_node);
cur_top_node = *post_order_iteration.top();
}
}
return;
}
- auto prev_top_handle = post_order_iteration.top();
post_order_iteration.pop();
// If there are now no longer any nodes left,
return;
}
- // Otherwise, look at the next top node. If
- // `prev_top` is that node's right-most child,
- // then we don't attempt to re-add `next_top`'s
- // children again for we would have already seen them.
- // To actually determine "right-most", we check if
- // moving over to the right one spot brings us to the
- // end of the candidate parent's list
- const auto* next_top_node = *post_order_iteration.top();
- if ((++prev_top_handle) != next_top_node->get_ordered_children().end()) {
+ xbt_assert(not has_added_children.empty(), "Invariant violated: There are more "
+ "nodes in the iteration that we must search "
+ "yet nobody has claimed to have added these nodes. "
+ "This implies that the algorithm is not iterating over "
+ "the wakeup tree is not following the post-fix order "
+ "correctly");
+
+ // Otherwise, look at what is the new, current top node.
+ // We're traversing the tree in
+ //
+ // If we've already added our children, we want
+ // to be sure not to add them again; but we ALSO
+ // want to be sure that we now start checking against
+ // the the node that's next in line as "finished"
+ //
+ // INVARIANT: Since we're searching in post-fix order,
+ // it always suffices to compare the current node
+ // with the top of the stack of nodes which have added their
+ // children
+ if (*post_order_iteration.top() == has_added_children.top()) {
+ has_added_children.pop();
+ } else {
push_until_left_most_found();
}
}
*/
std::stack<node_handle> post_order_iteration;
+ /**
+ * @brief The nodes in the current ordering that have already
+ * added their own children
+ *
+ * We need to be able to determine whether to add the children
+ * of a given node. Eventually, we want to search that node itself,
+ * but we have to first search its children. Later, when we
+ * reach each node in this stack again, we'll remember not to add
+ * its children and will search the node in the stack instead.
+ */
+ std::stack<WakeupTreeNode*> has_added_children;
+
/**
* @brief Search the wakeup tree until a leaf node appears at the front
* of the iteration, pushing all children towards the top of the stack
+#include <memory>
#include <simgrid/Exception.hpp>
-#include <simgrid/plugins/task.hpp>
+#include <simgrid/s4u/Task.hpp>
#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/Exec.hpp>
#include <simgrid/s4u/Io.hpp>
#include "src/simgrid/module.hpp"
SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
-/** @defgroup plugin_task plugin_task Plugin Task
-
+/**
@beginrst
-This is the task plugin, enabling management of Tasks.
-To activate this plugin, first call :cpp:func:`Task::init`.
Tasks are designed to represent dataflows, i.e, graphs of Tasks.
Tasks can only be instancied using either
-:cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
+:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
*/
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
-namespace simgrid::plugins {
-
-xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
-
-xbt::signal<void(Task*)> Task::on_start;
-xbt::signal<void(Task*)> Task::on_end;
+namespace simgrid::s4u {
Task::Task(const std::string& name) : name_(name) {}
*/
bool Task::ready_to_run() const
{
- return not working_ && queued_execs_ > 0;
+ return not working_ && queued_firings_ > 0;
}
/**
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- predecessors_[source]++;
+ auto source_count = predecessors_[source]++;
+ if (tokens_received_.size() <= queued_firings_ + source_count)
+ tokens_received_.push_back({});
+ tokens_received_[queued_firings_ + source_count][source] = source->token_;
bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
if (enough_tokens) {
for (auto& [key, val] : predecessors_)
val--;
- enqueue_execs(1);
+ enqueue_firings(1);
}
}
*/
void Task::complete()
{
- xbt_assert(s4u::Actor::is_maestro());
+ xbt_assert(Actor::is_maestro());
working_ = false;
count_++;
- on_this_end_(this);
- Task::on_end(this);
+ on_this_completion(this);
+ on_completion(this);
if (current_activity_)
previous_activity_ = std::move(current_activity_);
for (auto const& t : successors_)
fire();
}
-/** @ingroup plugin_task
- * @brief Init the Task plugin.
- * @note Add a completion callback to all Activities to call Task::complete().
- */
-void Task::init()
-{
- static bool inited = false;
- if (inited)
- return;
-
- inited = true;
- ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
- simgrid::s4u::Exec::on_completion_cb(
- [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
- simgrid::s4u::Comm::on_completion_cb(
- [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
- simgrid::s4u::Io::on_completion_cb(
- [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
-}
-
-/** @ingroup plugin_task
- * @param n The number of executions to enqueue.
- * @brief Enqueue executions.
- * @note Immediatly starts an execution if possible.
+/** @param n The number of firings to enqueue.
+ * @brief Enqueue firing.
+ * @note Immediatly fire an activity if possible.
*/
-void Task::enqueue_execs(int n)
+void Task::enqueue_firings(int n)
{
simgrid::kernel::actor::simcall_answered([this, n] {
- queued_execs_ += n;
+ queued_firings_ += n;
if (ready_to_run())
fire();
});
}
-/** @ingroup plugin_task
- * @param amount The amount to set.
+/** @param amount The amount to set.
* @brief Set the amout of work to do.
* @note Amount in flop for ExecTask and in bytes for CommTask.
*/
simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
}
-/** @ingroup plugin_task
- * @param successor The Task to add.
+/** @param token The token to set.
+ * @brief Set the token to send to successors.
+ * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<Token> token)
+{
+ simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @return Map of tokens received for the next execution.
+ * @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+{
+ return tokens_received_.front()[t];
+}
+
+void Task::fire() {
+ on_this_start(this);
+ on_start(this);
+ working_ = true;
+ queued_firings_ = std::max(queued_firings_ - 1, 0);
+ if (tokens_received_.size() > 0)
+ tokens_received_.pop_front();
+}
+
+/** @param successor The Task to add.
* @brief Add a successor to this Task.
* @note It also adds this as a predecessor of successor.
*/
});
}
-/** @ingroup plugin_task
- * @param successor The Task to remove.
+/** @param successor The Task to remove.
* @brief Remove a successor from this Task.
* @note It also remove this from the predecessors of successor.
*/
});
}
-/** @ingroup plugin_task
- * @param func The function to set.
- * @brief Set a function to be called before each execution.
- * @note The function is called before the underlying Activity starts.
- */
-void Task::on_this_start_cb(const std::function<void(Task*)>& func)
-{
- simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
-}
-
-/** @ingroup plugin_task
- * @param func The function to set.
- * @brief Set a function to be called after each execution.
- * @note The function is called after the underlying Activity ends, but before sending tokens to successors.
- */
-void Task::on_this_end_cb(const std::function<void(Task*)>& func)
-{
- simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
-}
-
-/** @ingroup plugin_task
- * @brief Return the number of completed executions.
- */
-int Task::get_count() const
-{
- return count_;
-}
-
/**
* @brief Default constructor.
*/
/** @ingroup plugin_task
* @brief Smart Constructor.
*/
-ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
+ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
{
return init(name)->set_flops(flops)->set_host(host);
}
*/
void ExecTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- s4u::ExecPtr exec = s4u::Exec::init();
- exec->set_name(name_);
- exec->set_flops_amount(amount_);
- exec->set_host(host_);
+ Task::fire();
+ auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
exec->start();
- exec->extension_set(new ExtendedAttributeActivity());
- exec->extension<ExtendedAttributeActivity>()->task_ = this;
- current_activity_ = exec;
+ exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
+ set_current_activity(exec);
}
/** @ingroup plugin_task
* @param host The host to set.
* @brief Set a new host.
*/
-ExecTaskPtr ExecTask::set_host(s4u::Host* host)
+ExecTaskPtr ExecTask::set_host(Host* host)
{
kernel::actor::simcall_answered([this, host] { host_ = host; });
return this;
*/
ExecTaskPtr ExecTask::set_flops(double flops)
{
- kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
+ kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
return this;
}
/** @ingroup plugin_task
* @brief Smart constructor.
*/
-CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
+CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
{
return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
}
*/
void CommTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
- comm->set_name(name_);
- comm->set_payload_size(amount_);
+ Task::fire();
+ auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
comm->start();
- comm->extension_set(new ExtendedAttributeActivity());
- comm->extension<ExtendedAttributeActivity>()->task_ = this;
- current_activity_ = comm;
+ comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
+ set_current_activity(comm);
}
/** @ingroup plugin_task
* @param source The host to set.
* @brief Set a new source host.
*/
-CommTaskPtr CommTask::set_source(s4u::Host* source)
+CommTaskPtr CommTask::set_source(Host* source)
{
kernel::actor::simcall_answered([this, source] { source_ = source; });
return this;
* @param destination The host to set.
* @brief Set a new destination host.
*/
-CommTaskPtr CommTask::set_destination(s4u::Host* destination)
+CommTaskPtr CommTask::set_destination(Host* destination)
{
kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
return this;
*/
CommTaskPtr CommTask::set_bytes(double bytes)
{
- kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+ kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
return this;
}
/** @ingroup plugin_task
* @brief Smart Constructor.
*/
-IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
+IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
{
return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
}
* @param disk The disk to set.
* @brief Set a new disk.
*/
-IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
+IoTaskPtr IoTask::set_disk(Disk* disk)
{
kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
return this;
*/
IoTaskPtr IoTask::set_bytes(double bytes)
{
- kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+ kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
return this;
}
/** @ingroup plugin_task */
-IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
+IoTaskPtr IoTask::set_op_type(Io::OpType type)
{
kernel::actor::simcall_answered([this, type] { type_ = type; });
return this;
void IoTask::fire()
{
- on_this_start_(this);
- Task::on_start(this);
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- s4u::IoPtr io = s4u::Io::init();
- io->set_name(name_);
- io->set_size(amount_);
- io->set_disk(disk_);
- io->set_op_type(type_);
+ Task::fire();
+ auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
io->start();
- io->extension_set(new ExtendedAttributeActivity());
- io->extension<ExtendedAttributeActivity>()->task_ = this;
- current_activity_ = io;
+ io->on_this_completion_cb([this](Io const&) { this->complete(); });
+ set_current_activity(io);
}
-} // namespace simgrid::plugins
+} // namespace simgrid::s4u
{
XBT_INFO("*** GET/SET DATA for disk: %s ***", disk->get_cname());
- const std::string* data = disk->get_data<std::string>();
+ auto data = disk->get_unique_data<std::string>();
XBT_INFO("Get data: '%s'", data ? data->c_str() : "No User Data");
disk->set_data(new std::string("Some data"));
- data = disk->get_data<std::string>();
+ data = disk->get_unique_data<std::string>();
XBT_INFO(" Set and get data: '%s'", data->c_str());
- delete data;
}
static void dump_platform_disks()
src/kernel/resource/models/network_ns3.hpp
src/kernel/resource/models/ns3/ns3_simulator.hpp
src/kernel/resource/models/ptask_L07.hpp
-
+
src/mc/datatypes.h
src/mc/mc.h
src/mc/mc_mmu.hpp
src/xbt/log_private.hpp
src/xbt/mallocator_private.h
src/xbt/parmap.hpp
-
+
src/xbt/mmalloc/mmalloc.h
src/xbt/mmalloc/mfree.c
src/xbt/mmalloc/mm_legacy.c
src/kernel/actor/SimcallObserver.hpp
src/kernel/actor/SynchroObserver.cpp
src/kernel/actor/SynchroObserver.hpp
-
+
src/kernel/context/Context.cpp
src/kernel/context/Context.hpp
src/kernel/context/ContextRaw.cpp
src/plugins/vm/VmLiveMigration.hpp
src/plugins/vm/dirty_page_tracking.cpp
src/plugins/battery.cpp
- src/plugins/task.cpp
src/plugins/photovoltaic.cpp
)
src/s4u/s4u_Mutex.cpp
src/s4u/s4u_Netzone.cpp
src/s4u/s4u_Semaphore.cpp
+ src/s4u/s4u_Task.cpp
src/s4u/s4u_VirtualMachine.cpp
)
src/mc/mc_replay.hpp
src/mc/transition/Transition.cpp
)
-
+
set(MC_SRC_STATELESS
src/mc/api/ActorState.hpp
src/mc/api/ClockVector.cpp
src/mc/api/State.hpp
src/mc/api/RemoteApp.cpp
src/mc/api/RemoteApp.hpp
-
+
src/mc/explo/DFSExplorer.cpp
src/mc/explo/DFSExplorer.hpp
src/mc/explo/Exploration.cpp
src/mc/explo/LivenessChecker.hpp
src/mc/explo/UdporChecker.cpp
src/mc/explo/UdporChecker.hpp
-
+
src/mc/explo/udpor/Comb.hpp
src/mc/explo/udpor/Configuration.hpp
src/mc/explo/udpor/Configuration.cpp
src/mc/explo/udpor/Unfolding.hpp
src/mc/explo/udpor/udpor_forward.hpp
src/mc/explo/udpor/udpor_tests_private.hpp
-
+
src/mc/inspect/DwarfExpression.cpp
src/mc/inspect/DwarfExpression.hpp
src/mc/inspect/Frame.cpp
src/mc/inspect/mc_unw.cpp
src/mc/inspect/mc_unw.hpp
src/mc/inspect/mc_unw_vmread.cpp
-
+
src/mc/sosp/ChunkedData.cpp
src/mc/sosp/ChunkedData.hpp
src/mc/sosp/PageStore.cpp
src/mc/api/strategy/MinMatchComm.hpp
src/mc/api/strategy/Strategy.hpp
src/mc/api/strategy/UniformStrategy.hpp
-
+
src/xbt/mmalloc/mm_interface.c
)
include/simgrid/plugins/file_system.h
include/simgrid/plugins/live_migration.h
include/simgrid/plugins/load.h
- include/simgrid/plugins/task.hpp
include/simgrid/plugins/photovoltaic.hpp
include/simgrid/plugins/ProducerConsumer.hpp
include/simgrid/instr.h
include/simgrid/s4u/Mutex.hpp
include/simgrid/s4u/NetZone.hpp
include/simgrid/s4u/Semaphore.hpp
+ include/simgrid/s4u/Task.hpp
include/simgrid/s4u/VirtualMachine.hpp
include/simgrid/s4u.hpp