Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of framagit.org:simgrid/simgrid
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 22 Jun 2023 00:25:49 +0000 (02:25 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 22 Jun 2023 00:25:49 +0000 (02:25 +0200)
25 files changed:
ChangeLog
MANIFEST.in
docs/source/app_s4u.rst
examples/cpp/CMakeLists.txt
examples/cpp/io-file-system/s4u-io-file-system.cpp
examples/cpp/task-io/s4u-task-io.cpp
examples/cpp/task-simple/s4u-task-simple.cpp
examples/cpp/task-storm/s4u-task-storm.cpp [new file with mode: 0644]
examples/cpp/task-storm/s4u-task-storm.tesh [new file with mode: 0644]
examples/cpp/task-switch-host/s4u-task-switch-host.cpp
examples/cpp/task-variable-load/s4u-task-variable-load.cpp
examples/python/task-io/task-io.py
examples/python/task-simple/task-simple.py
examples/python/task-switch-host/task-switch-host.py
examples/python/task-variable-load/task-variable-load.py
include/simgrid/s4u.hpp
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Exec.hpp
include/simgrid/s4u/Task.hpp [moved from include/simgrid/plugins/task.hpp with 55% similarity]
src/bindings/python/simgrid_python.cpp
src/mc/explo/odpor/WakeupTreeIterator.cpp
src/mc/explo/odpor/WakeupTreeIterator.hpp
src/s4u/s4u_Task.cpp [moved from src/plugins/task.cpp with 57% similarity]
teshsuite/s4u/storage_client_server/storage_client_server.cpp
tools/cmake/DefinePackages.cmake

index 78392f1..9ae720e 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -17,13 +17,15 @@ S4U:
    Comm::set_payload_size() to change the size of the simulated data.
  - New function: Engine::flatify_platform(), to get a fully detailed vision of the
    configured platform.
+ - New Task abstraction: They are designed to represent dataflows, i.e, graphs of repeatable Activities.
+   See the examples under examples/cpp/task-* and the associated documentation.
  - Full simDAG integration: Activity::start() actually starts only when all dependencies
    are fullfiled. If it cannot be started right away, it will start as soon as it becomes
    possible.
  - Allow to set a concurrency limit on disks and hosts, as it was already the case for links.
  - Rename Link::get_usage() to Link::get_load() for consistency with Host::
  - Every signal now come with a static version that is invoked for every object of that class,
-   and an instance version that is invoked for this specific object only. For example, 
+   and an instance version that is invoked for this specific object only. For example,
    s4u::Actor::on_suspend_cb() adds a callback that is invoked for the suspend of any actor while
    s4u::Actor::on_this_suspend_cb() adds a callback for this specific actor only.
  - Activity::on_suspended_cb() is renamed to Activity::on_suspend_cb(), and fired right before the suspend.
@@ -34,8 +36,6 @@ S4U:
    That is, callbacks registered in Exec::on_suspend_cb will not be fired for Comms nor Ios.
 
 New S4U plugins:
- - Task: They are designed to represent dataflows, i.e, graphs of repeatable Activities.
-   See the examples under examples/cpp/task-* and the documentation in the Plugins page.
  - Battery: Enable the management of batteries on hosts.
    See the examples under examples/cpp/battery-* and the documentation in the Plugins page.
  - Photovoltaic: Enable the management of photovoltaic panels on hosts.
index 296c687..76abec2 100644 (file)
@@ -398,6 +398,8 @@ include examples/cpp/task-io/s4u-task-io.cpp
 include examples/cpp/task-io/s4u-task-io.tesh
 include examples/cpp/task-simple/s4u-task-simple.cpp
 include examples/cpp/task-simple/s4u-task-simple.tesh
+include examples/cpp/task-storm/s4u-task-storm.cpp
+include examples/cpp/task-storm/s4u-task-storm.tesh
 include examples/cpp/task-switch-host/s4u-task-switch-host.cpp
 include examples/cpp/task-switch-host/s4u-task-switch-host.tesh
 include examples/cpp/task-variable-load/s4u-task-variable-load.cpp
@@ -1945,7 +1947,6 @@ include include/simgrid/plugins/live_migration.h
 include include/simgrid/plugins/load.h
 include include/simgrid/plugins/ns3.hpp
 include include/simgrid/plugins/photovoltaic.hpp
-include include/simgrid/plugins/task.hpp
 include include/simgrid/s4u.hpp
 include include/simgrid/s4u/Activity.hpp
 include include/simgrid/s4u/Actor.hpp
@@ -1962,6 +1963,7 @@ include include/simgrid/s4u/Mailbox.hpp
 include include/simgrid/s4u/Mutex.hpp
 include include/simgrid/s4u/NetZone.hpp
 include include/simgrid/s4u/Semaphore.hpp
+include include/simgrid/s4u/Task.hpp
 include include/simgrid/s4u/VirtualMachine.hpp
 include include/simgrid/semaphore.h
 include include/simgrid/simix.h
@@ -2228,9 +2230,9 @@ include src/mc/explo/udpor/Configuration_test.cpp
 include src/mc/explo/udpor/EventSet.cpp
 include src/mc/explo/udpor/EventSet.hpp
 include src/mc/explo/udpor/EventSet_test.cpp
-include src/mc/explo/udpor/ExtensionSet_test.cpp
 include src/mc/explo/udpor/ExtensionSetCalculator.cpp
 include src/mc/explo/udpor/ExtensionSetCalculator.hpp
+include src/mc/explo/udpor/ExtensionSet_test.cpp
 include src/mc/explo/udpor/History.cpp
 include src/mc/explo/udpor/History.hpp
 include src/mc/explo/udpor/History_test.cpp
@@ -2322,7 +2324,6 @@ include src/plugins/link_energy.cpp
 include src/plugins/link_energy_wifi.cpp
 include src/plugins/link_load.cpp
 include src/plugins/photovoltaic.cpp
-include src/plugins/task.cpp
 include src/plugins/vm/VmLiveMigration.cpp
 include src/plugins/vm/VmLiveMigration.hpp
 include src/plugins/vm/dirty_page_tracking.cpp
@@ -2341,6 +2342,7 @@ include src/s4u/s4u_Mailbox.cpp
 include src/s4u/s4u_Mutex.cpp
 include src/s4u/s4u_Netzone.cpp
 include src/s4u/s4u_Semaphore.cpp
+include src/s4u/s4u_Task.cpp
 include src/s4u/s4u_VirtualMachine.cpp
 include src/simgrid/Exception.cpp
 include src/simgrid/math_utils.h
index e5148af..4a0f3fe 100644 (file)
@@ -96,6 +96,9 @@ provides many helper functions to simplify the code of actors.
 .. |API_s4u_Activities| replace:: **Activities**
 .. _API_s4u_Activities: #api-s4u-activity
 
+.. |API_s4u_Tasks| replace:: **Tasks**
+.. _API_s4u_Tasks: #api-s4u-task
+
 .. |API_s4u_Hosts| replace:: **Hosts**
 .. _API_s4u_Hosts: #api-s4u-host
 
@@ -187,13 +190,39 @@ with :cpp:func:`s4u::Comm::wait_all() <simgrid::s4u::Comm::wait_all>`.
    :dedent: 2
 
 =====================
-Activities Life cycle
+Activities Life Cycle
 =====================
 
 Sometimes, you want to change the setting of an activity before it even starts.
 
 .. todo:: write this section
 
+=====================
+Repeatable Activities
+=====================
+
+In order to simulate the execution of Dataflow applications, we introduced the
+concept of |API_s4u_Tasks|, that can be seen as repeatable activities. A Dataflow
+is defined as a graph of |API_s4u_Tasks| through which circulate Tokens. Tokens
+can carry any user-defined data, using the same internal mechanisms as for the
+other simulated objects. Each Task has to receive a token from each of its
+predecessor to fire a new instance of a |API_s4u_Comm|, |API_s4u_Exec|, or
+|API_s4u_Io| activity. On completion of this activity, the Task propagates tokens
+to its successors, and waits for the next set of tokens to arrive.
+
+To initiate the execution of a Dataflow, it is possible to some make
+|API_s4u_Tasks| fire one or more activities without waiting for any token with the
+:cpp:func:`s4u::Task::enqueue_firings() <simgrid::s4u::Task::enqueue_firings>`
+function.
+
+The parameters and successors of a Task can be redefined at runtime by attaching
+callbacks to the
+:cpp:func:`s4u::Task::on_this_start <simgrid::s4u::Task::on_this_start>`
+and
+:cpp:func:`s4u::Task::on_this_completion <simgrid::s4u::Task::on_this_completion>`
+signals.
+
+
 .. _s4u_mailbox:
 
 Mailboxes
@@ -2116,8 +2145,8 @@ Querying info
 
    .. group-tab:: C++
 
-      .. doxygenfunction:: simgrid::s4u::Activity::get_cname
-      .. doxygenfunction:: simgrid::s4u::Activity::get_name
+      .. doxygenfunction:: simgrid::s4u::Activity::get_cname() const
+      .. doxygenfunction:: simgrid::s4u::Activity::get_name() const
       .. doxygenfunction:: simgrid::s4u::Activity::get_remaining() const
       .. doxygenfunction:: simgrid::s4u::Activity::get_state() const
       .. doxygenfunction:: simgrid::s4u::Activity::set_remaining(double remains)
@@ -2310,11 +2339,11 @@ Signals
    .. group-tab:: C++
 
       .. doxygenfunction:: simgrid::s4u::Comm::on_start_cb
+      .. doxygenfunction:: simgrid::s4u::Comm::on_this_start_cb
       .. doxygenfunction:: simgrid::s4u::Comm::on_completion_cb
+      .. doxygenfunction:: simgrid::s4u::Comm::on_this_completion_cb
       .. doxygenfunction:: simgrid::s4u::Comm::on_recv_cb
       .. doxygenfunction:: simgrid::s4u::Comm::on_send_cb
-
-      .. doxygenfunction:: simgrid::s4u::Comm::on_completion_cb
       .. doxygenfunction:: simgrid::s4u::Comm::on_suspended_cb
       .. doxygenfunction:: simgrid::s4u::Comm::on_resumed_cb
       .. doxygenfunction:: simgrid::s4u::Comm::on_veto_cb
@@ -2453,9 +2482,10 @@ Signals
    .. group-tab:: C++
 
       .. doxygenfunction:: simgrid::s4u::Exec::on_start_cb
+      .. doxygenfunction:: simgrid::s4u::Exec::on_this_start_cb
       .. doxygenfunction:: simgrid::s4u::Exec::on_completion_cb
+      .. doxygenfunction:: simgrid::s4u::Exec::on_this_completion_cb
 
-      .. doxygenfunction:: simgrid::s4u::Exec::on_completion_cb
       .. doxygenfunction:: simgrid::s4u::Exec::on_suspended_cb
       .. doxygenfunction:: simgrid::s4u::Exec::on_resumed_cb
       .. doxygenfunction:: simgrid::s4u::Exec::on_veto_cb
@@ -2529,13 +2559,211 @@ Signals
    .. group-tab:: C++
 
       .. doxygenfunction:: simgrid::s4u::Io::on_start_cb
+      .. doxygenfunction:: simgrid::s4u::Io::on_this_start_cb
       .. doxygenfunction:: simgrid::s4u::Io::on_completion_cb
+      .. doxygenfunction:: simgrid::s4u::Io::on_this_completion_cb
 
-      .. doxygenfunction:: simgrid::s4u::Io::on_completion_cb
       .. doxygenfunction:: simgrid::s4u::Io::on_suspended_cb
       .. doxygenfunction:: simgrid::s4u::Io::on_resumed_cb
       .. doxygenfunction:: simgrid::s4u::Io::on_veto_cb
 
+
+.. _API_s4u_Tasks:
+
+==========
+Tasks
+==========
+
+==============
+class Task
+==============
+
+.. doxygenclass:: simgrid::s4u::Task
+
+**Known subclasses:**
+:ref:`Communication Tasks <API_s4u_CommTask>`,
+:ref:`Executions Tasks <API_s4u_ExecTask>`,
+:ref:`I/O Tasks <API_s4u_Task>`.
+See also the :ref:`section on activities <s4u_Tasks>` above.
+
+Basic management
+----------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. code-block:: C++
+
+         #include <simgrid/s4u/Task.hpp>
+
+      .. doxygentypedef:: TaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. doxygenfunction:: simgrid::s4u::Task::get_cname() const
+      .. doxygenfunction:: simgrid::s4u::Task::get_name() const
+      .. doxygenfunction:: simgrid::s4u::Task::get_count() const
+      .. doxygenfunction:: simgrid::s4u::Task::get_amount() const
+      .. doxygenfunction:: simgrid::s4u::Task::set_amount(double amount)
+
+Life cycle
+----------
+
+.. tabs::
+
+   .. group-tab:: C++
+      .. doxygenfunction:: simgrid::s4u::Task::enqueue_firings(int n)
+
+Managing Dependencies
+---------------------
+.. tabs::
+
+   .. group-tab:: C++
+      .. doxygenfunction:: simgrid::s4u::Task::add_successor(TaskPtr t)
+      .. doxygenfunction:: simgrid::s4u::Task::remove_successor(TaskPtr t)
+      .. doxygenfunction:: simgrid::s4u::Task::remove_all_successors()
+      .. doxygenfunction:: simgrid::s4u::Task::get_successors() const
+
+Managing Tokens
+---------------
+.. doxygenclass:: simgrid::s4u::Token
+
+.. tabs::
+
+   .. group-tab:: C++
+      .. doxygenfunction:: simgrid::s4u::Task::set_token(std::shared_ptr<Token> token)
+      .. doxygenfunction:: simgrid::s4u::Task::get_next_token_from(TaskPtr t)
+
+Signals
+-------
+
+.. tabs::
+
+   .. group-tab:: C++
+      .. doxygenfunction:: simgrid::s4u::Task::on_start_cb
+      .. doxygenfunction:: simgrid::s4u::Task::on_this_start_cb
+      .. doxygenfunction:: simgrid::s4u::Task::on_completion_cb
+      .. doxygenfunction:: simgrid::s4u::Task::on_this_completion_cb
+
+.. _API_s4u_CommTask:
+
+================
+⁣  class CommTask
+================
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. doxygenclass:: simgrid::s4u::CommTask
+
+Basic management
+----------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. code-block:: C++
+
+         #include <simgrid/s4u/Task.hpp>
+
+      .. doxygentypedef:: CommTaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. doxygenfunction:: simgrid::s4u::Task::get_source() const
+      .. doxygenfunction:: simgrid::s4u::Task::get_destination() const
+      .. doxygenfunction:: simgrid::s4u::Task::get_bytes() const
+      .. doxygenfunction:: simgrid::s4u::Task::set_source(simgrid::s4u::Host* source);
+      .. doxygenfunction:: simgrid::s4u::Task::set_destination(simgrid::s4u::Host* destination);
+      .. doxygenfunction:: simgrid::s4u::Task::set_bytes(double bytes)
+
+
+.. _API_s4u_ExecTask:
+
+================
+⁣  class ExecTask
+================
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. doxygenclass:: simgrid::s4u::ExecTask
+
+Basic management
+----------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. code-block:: C++
+
+         #include <simgrid/s4u/Task.hpp>
+
+      .. doxygentypedef:: ExecTaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. doxygenfunction:: simgrid::s4u::Task::get_host() const
+      .. doxygenfunction:: simgrid::s4u::Task::get_flops() const
+      .. doxygenfunction:: simgrid::s4u::Task::set_host(simgrid::s4u::Host* host);
+      .. doxygenfunction:: simgrid::s4u::Task::set_flops(double flops);
+
+.. _API_s4u_IoTask:
+
+================
+⁣  class IoTask
+================
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. doxygenclass:: simgrid::s4u::IoTask
+
+Basic management
+----------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+      .. code-block:: C++
+
+         #include <simgrid/s4u/Task.hpp>
+
+      .. doxygentypedef:: IoTaskPtr
+
+Querying info
+-------------
+
+.. tabs::
+
+   .. group-tab:: C++
+
+     .. doxygenfunction:: simgrid::s4u::Task::get_disk() const
+     .. doxygenfunction:: simgrid::s4u::Task::get_bytes() const
+     .. doxygenfunction:: simgrid::s4u::Task::get_op_type() const
+     .. doxygenfunction:: simgrid::s4u::Task::set_disk(simgrid::s4u::Disk* disk);
+     .. doxygenfunction:: simgrid::s4u::Task::set_bytes(simgrid::double bytes);
+     .. doxygenfunction:: simgrid::s4u::Task::set_op_type(simgrid::s4u::Io::OpType type);
+
 .. _API_s4u_Synchronizations:
 
 =======================
index dc66fc9..66187a8 100644 (file)
@@ -171,7 +171,7 @@ foreach (example activity-testany activity-waitany
                  mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
                  network-ns3 network-ns3-wifi network-wifi
                  io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
-                 task-io task-simple task-variable-load task-switch-host
+                 task-io task-simple task-variable-load task-storm task-switch-host
                  photovoltaic-simple
                  platform-comm-serialize platform-failures platform-profile platform-properties
                  plugin-host-load plugin-link-load plugin-prodcons
index ef57282..40e0048 100644 (file)
@@ -58,9 +58,8 @@ public:
 
     // Test attaching some user data to the file
     file->set_data(new std::string("777"));
-    const auto* file_data = file->get_data<std::string>();
+    auto file_data = file->get_unique_data<std::string>();
     XBT_INFO("User data attached to the file: %s", file_data->c_str());
-    delete file_data;
 
     // Close the file
     file->close();
index 6301657..f5d3c15 100644 (file)
@@ -3,7 +3,7 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-/* This example demonstrate basic use of the task plugin.
+/* This example demonstrate basic use of tasks.
  *
  * We model the following graph:
  *
  * comm is a communication task.
  */
 
-#include "simgrid/plugins/task.hpp"
+#include "simgrid/s4u/Task.hpp"
 #include "simgrid/s4u.hpp"
 #include <simgrid/plugins/file_system.h>
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
+namespace sg4 = simgrid::s4u;
 
 int main(int argc, char* argv[])
 {
-  simgrid::s4u::Engine e(&argc, argv);
+  sg4::Engine e(&argc, argv);
   e.load_platform(argv[1]);
-  simgrid::plugins::Task::init();
 
   // Retrieve hosts
   auto* bob  = e.host_by_name("bob");
   auto* carl = e.host_by_name("carl");
 
   // Create tasks
-  auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, bob);
-  auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, carl);
-  auto write = simgrid::plugins::IoTask::init("write", 1e7, bob->get_disks().front(), simgrid::s4u::Io::OpType::WRITE);
-  auto read  = simgrid::plugins::IoTask::init("read", 1e7, carl->get_disks().front(), simgrid::s4u::Io::OpType::READ);
+  auto exec1 = sg4::ExecTask::init("exec1", 1e9, bob);
+  auto exec2 = sg4::ExecTask::init("exec2", 1e9, carl);
+  auto write = sg4::IoTask::init("write", 1e7, bob->get_disks().front(), sg4::Io::OpType::WRITE);
+  auto read  = sg4::IoTask::init("read", 1e7, carl->get_disks().front(), sg4::Io::OpType::READ);
 
   // Create the graph by defining dependencies between tasks
   exec1->add_successor(write);
@@ -41,12 +41,12 @@ int main(int argc, char* argv[])
   read->add_successor(exec2);
 
   // Add a function to be called when tasks end for log purpose
-  simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
-  // Enqueue two executions for task exec1
-  exec1->enqueue_execs(2);
+  // Enqueue two firings for task exec1
+  exec1->enqueue_firings(2);
 
   // Start the simulation
   e.run();
index 3e9d14f..72ddf4c 100644 (file)
@@ -3,7 +3,7 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-/* This example demonstrate basic use of the task plugin.
+/* This example demonstrate basic use of tasks.
  *
  * We model the following graph:
  *
  * comm is a communication task.
  */
 
-#include "simgrid/plugins/task.hpp"
 #include "simgrid/s4u.hpp"
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
 
+namespace sg4 = simgrid::s4u;
+
 int main(int argc, char* argv[])
 {
-  simgrid::s4u::Engine e(&argc, argv);
+  sg4::Engine e(&argc, argv);
   e.load_platform(argv[1]);
-  simgrid::plugins::Task::init();
 
   // Retrieve hosts
   auto* tremblay = e.host_by_name("Tremblay");
   auto* jupiter  = e.host_by_name("Jupiter");
 
   // Create tasks
-  auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, tremblay);
-  auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, jupiter);
-  auto comm  = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
+  auto exec1 = sg4::ExecTask::init("exec1", 1e9, tremblay);
+  auto exec2 = sg4::ExecTask::init("exec2", 1e9, jupiter);
+  auto comm  = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
 
   // Create the graph by defining dependencies between tasks
   exec1->add_successor(comm);
   comm->add_successor(exec2);
 
   // Add a function to be called when tasks end for log purpose
-  simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
-  // Enqueue two executions for task exec1
-  exec1->enqueue_execs(2);
+  // Enqueue two firings for task exec1
+  exec1->enqueue_firings(2);
 
   // Start the simulation
   e.run();
diff --git a/examples/cpp/task-storm/s4u-task-storm.cpp b/examples/cpp/task-storm/s4u-task-storm.cpp
new file mode 100644 (file)
index 0000000..2c4edb1
--- /dev/null
@@ -0,0 +1,130 @@
+/* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
+   and use them to build a simulation of a stream processing application
+
+   Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+   Spout SB produces 1e6 bytes every 200ms.
+
+   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
+   Bolt B3 processes data from Spout SB.
+   Bolt B4 processes data from Bolt B3.
+
+                        Fafard
+                        ┌────┐
+                    ┌──►│ B1 │
+         Tremblay   │   └────┘
+          ┌────┐    │
+          │ SA ├────┤  Ginette
+          └────┘    │   ┌────┐
+                    └──►│ B2 │
+                        └────┘
+
+
+                       Bourassa
+         Jupiter     ┌──────────┐
+          ┌────┐     │          │
+          │ SB ├─────┤ B3 ──► B4│
+          └────┘     │          │
+                     └──────────┘
+ */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+
+  // Retrieve hosts
+  auto tremblay = e.host_by_name("Tremblay");
+  auto jupiter  = e.host_by_name("Jupiter");
+  auto fafard  = e.host_by_name("Fafard");
+  auto ginette  = e.host_by_name("Ginette");
+  auto bourassa = e.host_by_name("Bourassa");
+
+  // Create execution tasks
+  auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+  auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+  auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+  auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+  auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+  auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
+
+  // Create communication tasks
+  auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+  auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+  auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+
+  // Create the graph by defining dependencies between tasks
+  // Some dependencies are defined dynamically
+  SA_to_B1->add_successor(B1);
+  SA_to_B2->add_successor(B2);
+  SB->add_successor(SB_to_B3);
+  SB_to_B3->add_successor(B3);
+  B3->add_successor(B4);
+
+  /* Dynamic modification of the graph and bytes sent
+     Alternatively we: remove/add the link between SA and SA_to_B2
+                       add/remove the link between SA and SA_to_B1
+  */
+  SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) {
+    int count = t->get_count();
+    sg4::CommTaskPtr comm;
+    if (count % 2 == 0) {
+      t->remove_successor(SA_to_B2);
+      t->add_successor(SA_to_B1);
+      comm = SA_to_B1;
+    }
+    else {
+      t->remove_successor(SA_to_B1);
+      t->add_successor(SA_to_B2);
+      comm = SA_to_B2;
+    }
+    std::vector<double> amount = {1e3,1e6,1e9};
+    comm->set_amount(amount[count % 3]);
+    auto token = std::make_shared<sg4::Token>();
+    token->set_data(new double(amount[count % 3]));
+    t->set_token(token);
+  });
+
+  // The token sent by SA is forwarded by both communication tasks
+  SA_to_B1->on_this_start_cb([SA](sg4::Task* t) {
+    t->set_token(t->get_next_token_from(SA));
+  });
+  SA_to_B2->on_this_start_cb([SA](sg4::Task* t) {
+    t->set_token(t->get_next_token_from(SA));
+  });
+
+  /* B1 and B2 read the value of the token received by their predecessors
+     and use it to adapt their amount of work to do.
+  */
+  B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
+    auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+    t->set_amount(*data * 10);
+  });
+  B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
+    auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+    t->set_amount(*data * 10);
+  });
+
+  // Enqueue firings for tasks without predecessors
+  SA->enqueue_firings(5);
+  SB->enqueue_firings(5);
+
+  // Add a function to be called when tasks end for log purpose
+  sg4::Task::on_completion_cb([]
+  (const sg4::Task* t) {
+    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
+  });
+
+  // Start the simulation
+  e.run();
+  return 0;
+}
diff --git a/examples/cpp/task-storm/s4u-task-storm.tesh b/examples/cpp/task-storm/s4u-task-storm.tesh
new file mode 100644 (file)
index 0000000..d7c364a
--- /dev/null
@@ -0,0 +1,38 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-task-storm ${platfdir}/small_platform.xml
+> [0.100000] [task_storm/INFO] Task SA finished (1)
+> [0.125841] [task_storm/INFO] Task SA_to_B1 finished (1)
+> [0.125972] [task_storm/INFO] Task B1 finished (1)
+> [0.200000] [task_storm/INFO] Task SB finished (1)
+> [0.200000] [task_storm/INFO] Task SA finished (2)
+> [0.300000] [task_storm/INFO] Task SA finished (3)
+> [0.364429] [task_storm/INFO] Task SA_to_B2 finished (1)
+> [0.400000] [task_storm/INFO] Task SB finished (2)
+> [0.400000] [task_storm/INFO] Task SA finished (4)
+> [0.416759] [task_storm/INFO] Task SA_to_B2 finished (2)
+> [0.500000] [task_storm/INFO] Task SA finished (5)
+> [0.557036] [task_storm/INFO] Task SB_to_B3 finished (1)
+> [0.570648] [task_storm/INFO] Task B2 finished (1)
+> [0.570855] [task_storm/INFO] Task B2 finished (2)
+> [0.600000] [task_storm/INFO] Task SB finished (3)
+> [0.800000] [task_storm/INFO] Task SB finished (4)
+> [0.867388] [task_storm/INFO] Task SB_to_B3 finished (2)
+> [1.000000] [task_storm/INFO] Task SB finished (5)
+> [1.177739] [task_storm/INFO] Task SB_to_B3 finished (3)
+> [1.488090] [task_storm/INFO] Task SB_to_B3 finished (4)
+> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
+> [2.619232] [task_storm/INFO] Task B3 finished (1)
+> [6.743624] [task_storm/INFO] Task B3 finished (2)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
+> [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [14.992407] [task_storm/INFO] Task B3 finished (4)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
+> [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [23.241190] [task_storm/INFO] Task B4 finished (3)
+> [27.365582] [task_storm/INFO] Task B4 finished (4)
+> [31.489974] [task_storm/INFO] Task B4 finished (5)
+> [133.367321] [task_storm/INFO] Task SA_to_B1 finished (2)
+> [133.525717] [task_storm/INFO] Task SA_to_B1 finished (3)
+> [264.435791] [task_storm/INFO] Task B1 finished (2)
+> [264.566859] [task_storm/INFO] Task B1 finished (3)
index 6ebca12..7023f68 100644 (file)
  * With exec1 and exec2 on different hosts.
  */
 
-#include "simgrid/plugins/task.hpp"
 #include "simgrid/s4u.hpp"
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(task_switch_host, "Messages specific for this task example");
+namespace sg4 = simgrid::s4u;
 
 int main(int argc, char* argv[])
 {
-  simgrid::s4u::Engine e(&argc, argv);
+  sg4::Engine e(&argc, argv);
   e.load_platform(argv[1]);
-  simgrid::plugins::Task::init();
 
   // Retrieve hosts
   auto* tremblay = e.host_by_name("Tremblay");
@@ -33,13 +32,13 @@ int main(int argc, char* argv[])
   auto* fafard   = e.host_by_name("Fafard");
 
   // Create tasks
-  auto comm0 = simgrid::plugins::CommTask::init("comm0");
+  auto comm0 = sg4::CommTask::init("comm0");
   comm0->set_bytes(1e7);
   comm0->set_source(tremblay);
-  auto exec1 = simgrid::plugins::ExecTask::init("exec1", 1e9, jupiter);
-  auto exec2 = simgrid::plugins::ExecTask::init("exec2", 1e9, fafard);
-  auto comm1 = simgrid::plugins::CommTask::init("comm1", 1e7, jupiter, tremblay);
-  auto comm2 = simgrid::plugins::CommTask::init("comm2", 1e7, fafard, tremblay);
+  auto exec1 = sg4::ExecTask::init("exec1", 1e9, jupiter);
+  auto exec2 = sg4::ExecTask::init("exec2", 1e9, fafard);
+  auto comm1 = sg4::CommTask::init("comm1", 1e7, jupiter, tremblay);
+  auto comm2 = sg4::CommTask::init("comm2", 1e7, fafard, tremblay);
 
   // Create the initial graph by defining dependencies between tasks
   comm0->add_successor(exec2);
@@ -47,15 +46,14 @@ int main(int argc, char* argv[])
   exec2->add_successor(comm2);
 
   // Add a function to be called when tasks end for log purpose
-  simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
-  // Add a function to be called before each executions of comm0
+  // Add a function to be called before each firing of comm0
   // This function modifies the graph of tasks by adding or removing
   // successors to comm0
-  comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
-    auto* comm0      = dynamic_cast<simgrid::plugins::CommTask*>(t);
+  comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) {
     static int count = 0;
     if (count % 2 == 0) {
       comm0->set_destination(jupiter);
@@ -69,8 +67,8 @@ int main(int argc, char* argv[])
     count++;
   });
 
-  // Enqueue four executions for task comm0
-  comm0->enqueue_execs(4);
+  // Enqueue four firings for task comm0
+  comm0->enqueue_firings(4);
 
   // Start the simulation
   e.run();
index 960fb66..0955c2e 100644 (file)
  * With a heavy load there is a burst of comm before the exec task can even finish once.
  */
 
-#include "simgrid/plugins/task.hpp"
 #include "simgrid/s4u.hpp"
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(task_variable_load, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
 
-static void variable_load(simgrid::plugins::TaskPtr t)
+static void variable_load(sg4::TaskPtr t)
 {
   XBT_INFO("--- Small load ---");
   for (int i = 0; i < 3; i++) {
-    t->enqueue_execs(1);
-    simgrid::s4u::this_actor::sleep_for(100);
+    t->enqueue_firings(1);
+    sg4::this_actor::sleep_for(100);
   }
-  simgrid::s4u::this_actor::sleep_until(1000);
+  sg4::this_actor::sleep_until(1000);
   XBT_INFO("--- Heavy load ---");
   for (int i = 0; i < 3; i++) {
-    t->enqueue_execs(1);
-    simgrid::s4u::this_actor::sleep_for(1);
+    t->enqueue_firings(1);
+    sg4::this_actor::sleep_for(1);
   }
 }
 
 int main(int argc, char* argv[])
 {
-  simgrid::s4u::Engine e(&argc, argv);
+  sg4::Engine e(&argc, argv);
   e.load_platform(argv[1]);
-  simgrid::plugins::Task::init();
 
   // Retreive hosts
   auto* tremblay = e.host_by_name("Tremblay");
   auto* jupiter  = e.host_by_name("Jupiter");
 
   // Create tasks
-  auto comm = simgrid::plugins::CommTask::init("comm", 1e7, tremblay, jupiter);
-  auto exec = simgrid::plugins::ExecTask::init("exec", 1e9, jupiter);
+  auto comm = sg4::CommTask::init("comm", 1e7, tremblay, jupiter);
+  auto exec = sg4::ExecTask::init("exec", 1e9, jupiter);
 
   // Create the graph by defining dependencies between tasks
   comm->add_successor(exec);
 
   // Add a function to be called when tasks end for log purpose
-  simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
+  sg4::Task::on_completion_cb([](const sg4::Task* t) {
     XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
   });
 
   // Create the actor that will inject load during the simulation
-  simgrid::s4u::Actor::create("input", tremblay, variable_load, comm);
+  sg4::Actor::create("input", tremblay, variable_load, comm);
 
   // Start the simulation
   e.run();
index b74c411..e75215a 100644 (file)
@@ -24,7 +24,6 @@ if __name__ == '__main__':
     args = parse()
     e = Engine(sys.argv)
     e.load_platform(args.platform)
-    Task.init()
 
     # Retrieve hosts
     bob = e.host_by_name('bob')
@@ -42,10 +41,10 @@ if __name__ == '__main__':
     read.add_successor(exec2)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
-    # Enqueue two executions for task exec1
-    exec1.enqueue_execs(2)
+    # Enqueue two firings for task exec1
+    exec1.enqueue_firings(2)
 
     # runs the simulation
     e.run()
index 1219906..23e9fc0 100644 (file)
@@ -34,7 +34,6 @@ if __name__ == '__main__':
     args = parse()
     e = Engine(sys.argv)
     e.load_platform(args.platform)
-    Task.init()
 
     # Retrieve hosts
     tremblay = e.host_by_name('Tremblay')
@@ -50,11 +49,10 @@ if __name__ == '__main__':
     comm.add_successor(exec2)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
-    # Enqueue two executions for task exec1
-    exec1.enqueue_execs(2)
+    # Enqueue two firings for task exec1
+    exec1.enqueue_firings(2)
 
     # runs the simulation
     e.run()
-
index a95db16..b2904c3 100644 (file)
@@ -55,7 +55,6 @@ if __name__ == '__main__':
     args = parse()
     e = Engine(sys.argv)
     e.load_platform(args.platform)
-    Task.init()
 
     # Retrieve hosts
     tremblay = e.host_by_name('Tremblay')
@@ -76,15 +75,15 @@ if __name__ == '__main__':
     exec2.add_successor(comm2)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
-    # Add a function to be called before each executions of comm0
+    # Add a function to be called before each firing of comm0
     # This function modifies the graph of tasks by adding or removing
     # successors to comm0
     comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
 
-    # Enqueue two executions for task exec1
-    comm0.enqueue_execs(4)
+    # Enqueue two firings for task exec1
+    comm0.enqueue_firings(4)
 
     # runs the simulation
     e.run()
index 7310584..51dbc1a 100644 (file)
@@ -33,19 +33,18 @@ def callback(t):
 def variable_load(t):
     print('--- Small load ---')
     for _ in range(3):
-        t.enqueue_execs(1)
+        t.enqueue_firings(1)
         this_actor.sleep_for(100)
     this_actor.sleep_for(1000)
     print('--- Heavy load ---')
     for _ in range(3):
-        t.enqueue_execs(1)
+        t.enqueue_firings(1)
         this_actor.sleep_for(1)
 
 if __name__ == '__main__':
     args = parse()
     e = Engine(sys.argv)
     e.load_platform(args.platform)
-    Task.init()
 
     # Retrieve hosts
     tremblay = e.host_by_name('Tremblay')
@@ -59,7 +58,7 @@ if __name__ == '__main__':
     comm.add_successor(exec)
 
     # Add a function to be called when tasks end for log purpose
-    Task.on_end_cb(callback)
+    Task.on_completion_cb(callback)
 
     # Create the actor that will inject load during the simulation
     Actor.create("input", tremblay, variable_load, comm)
index f694696..77a71df 100644 (file)
@@ -22,6 +22,7 @@
 #include <simgrid/s4u/Mutex.hpp>
 #include <simgrid/s4u/NetZone.hpp>
 #include <simgrid/s4u/Semaphore.hpp>
+#include <simgrid/s4u/Task.hpp>
 #include <simgrid/s4u/VirtualMachine.hpp>
 
 #include <simgrid/Exception.hpp>
index a92f87f..a6a7f08 100644 (file)
@@ -39,7 +39,6 @@ class XBT_PUBLIC Comm : public Activity_T<Comm> {
   Comm() = default;
   Comm* do_start() override;
 
-protected:
   static xbt::signal<void(Comm const&)> on_send;
   xbt::signal<void(Comm const&)> on_this_send;
   static xbt::signal<void(Comm const&)> on_recv;
@@ -47,6 +46,7 @@ protected:
   inline static xbt::signal<void(Comm const&)> on_start;
   xbt::signal<void(Comm const&)> on_this_start;
 
+protected:
   void fire_on_completion() const override {
     /* The completion signal of a Comm has to be thrown only once and not by the sender AND the receiver.
        then Comm::on_completion is thrown in the kernel in CommImpl::finish.
index 5cd7857..7de8e8e 100644 (file)
@@ -36,14 +36,15 @@ class XBT_PUBLIC Exec : public Activity_T<Exec> {
 
   bool parallel_ = false;
 
+  inline static xbt::signal<void(Exec const&)> on_start;
+  xbt::signal<void(Exec const&)> on_this_start;
+
 protected:
   explicit Exec(kernel::activity::ExecImplPtr pimpl);
   Exec* do_start() override;
 
   void reset() const;
 
-  inline static xbt::signal<void(Exec const&)> on_start;
-  xbt::signal<void(Exec const&)> on_this_start;
   void fire_on_completion() const override { on_completion(*this); }
   void fire_on_this_completion() const override { on_this_completion(*this); }
   void fire_on_suspend() const override { on_suspend(*this); }
similarity index 55%
rename from include/simgrid/plugins/task.hpp
rename to include/simgrid/s4u/Task.hpp
index 8143333..c0677da 100644 (file)
@@ -1,16 +1,17 @@
-#ifndef SIMGRID_PLUGINS_TASK_H_
-#define SIMGRID_PLUGINS_TASK_H_
+#ifndef SIMGRID_S4U_TASK_H_
+#define SIMGRID_S4U_TASK_H_
 
 #include <simgrid/s4u/Activity.hpp>
 #include <simgrid/s4u/Io.hpp>
 #include <xbt/Extendable.hpp>
 
 #include <atomic>
+#include <deque>
 #include <map>
 #include <memory>
 #include <set>
 
-namespace simgrid::plugins {
+namespace simgrid::s4u {
 
 class Task;
 using TaskPtr = boost::intrusive_ptr<Task>;
@@ -23,59 +24,68 @@ using CommTaskPtr = boost::intrusive_ptr<CommTask>;
 class IoTask;
 using IoTaskPtr = boost::intrusive_ptr<IoTask>;
 
-struct ExtendedAttributeActivity {
-  static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
-  Task* task_;
-};
+class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
 
 class Task {
+  std::string name_;
+  double amount_;
+  int queued_firings_ = 0;
+  int count_        = 0;
+  bool working_     = false;
+
   std::set<Task*> successors_                 = {};
   std::map<Task*, unsigned int> predecessors_ = {};
+  std::atomic_int_fast32_t refcount_{0};
 
   bool ready_to_run() const;
   void receive(Task* source);
-  void complete();
+
+  std::shared_ptr<Token> token_ = nullptr;
+  std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+  ActivityPtr previous_activity_;
+  ActivityPtr current_activity_;
 
 protected:
-  std::string name_;
-  double amount_;
-  int queued_execs_ = 0;
-  int count_        = 0;
-  bool working_     = false;
-  s4u::ActivityPtr previous_activity_;
-  s4u::ActivityPtr current_activity_;
-  xbt::signal<void(Task*)> on_this_start_;
-  xbt::signal<void(Task*)> on_this_end_;
   explicit Task(const std::string& name);
   virtual ~Task()     = default;
-  virtual void fire() = 0;
 
-  static xbt::signal<void(Task*)> on_start;
-  static xbt::signal<void(Task*)> on_end;
-  std::atomic_int_fast32_t refcount_{0};
+  virtual void fire();
+  void complete();
+
+  void set_current_activity (ActivityPtr a) { current_activity_ = a; }
+
+  inline static xbt::signal<void(Task*)> on_start;
+  xbt::signal<void(Task*)> on_this_start;
+  inline static xbt::signal<void(Task*)> on_completion;
+  xbt::signal<void(Task*)> on_this_completion;
 
 public:
-  static void init();
   const std::string& get_name() const { return name_; }
   const char* get_cname() const { return name_.c_str(); }
-  void enqueue_execs(int n);
   void set_amount(double amount);
   double get_amount() const { return amount_; }
+  int get_count() const { return count_; }
+
+  void set_token(std::shared_ptr<Token> token);
+  std::shared_ptr<Token> get_next_token_from(TaskPtr t);
+
   void add_successor(TaskPtr t);
   void remove_successor(TaskPtr t);
   void remove_all_successors();
   const std::set<Task*>& get_successors() const { return successors_; }
-  void on_this_start_cb(const std::function<void(Task*)>& func);
-  void on_this_end_cb(const std::function<void(Task*)>& func);
-  int get_count() const;
 
-  /** Add a callback fired before a task activity start.
+  void enqueue_firings(int n);
+
+  /** Add a callback fired before this task activity starts */
+  void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
+  /** Add a callback fired before a task activity starts.
    * Triggered after the on_this_start function**/
   static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
-  /** Add a callback fired after a task activity end.
-   * Triggered after the on_this_end function, but before
-   * sending tokens to successors.**/
-  static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+  /** Add a callback fired before this task activity ends */
+  void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+  /** Add a callback fired after a task activity ends.
+   * Triggered after the on_this_end function, but before sending tokens to successors.**/
+  static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
 
 #ifndef DOXYGEN
   friend void intrusive_ptr_release(Task* o)
@@ -89,54 +99,57 @@ public:
 #endif
 };
 
-class ExecTask : public Task {
-  s4u::Host* host_;
+class CommTask : public Task {
+  Host* source_;
+  Host* destination_;
 
-  explicit ExecTask(const std::string& name);
+  explicit CommTask(const std::string& name);
   void fire() override;
 
 public:
-  static ExecTaskPtr init(const std::string& name);
-  static ExecTaskPtr init(const std::string& name, double flops, s4u::Host* host);
-  ExecTaskPtr set_host(s4u::Host* host);
-  s4u::Host* get_host() const { return host_; }
-  ExecTaskPtr set_flops(double flops);
-  double get_flops() const { return get_amount(); }
+  static CommTaskPtr init(const std::string& name);
+  static CommTaskPtr init(const std::string& name, double bytes, Host* source, Host* destination);
+
+  CommTaskPtr set_source(Host* source);
+  Host* get_source() const { return source_; }
+  CommTaskPtr set_destination(Host* destination);
+  Host* get_destination() const { return destination_; }
+  CommTaskPtr set_bytes(double bytes);
+  double get_bytes() const { return get_amount(); }
 };
 
-class CommTask : public Task {
-  s4u::Host* source_;
-  s4u::Host* destination_;
+class ExecTask : public Task {
+  Host* host_;
 
-  explicit CommTask(const std::string& name);
+  explicit ExecTask(const std::string& name);
   void fire() override;
 
 public:
-  static CommTaskPtr init(const std::string& name);
-  static CommTaskPtr init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination);
-  CommTaskPtr set_source(s4u::Host* source);
-  s4u::Host* get_source() const { return source_; }
-  CommTaskPtr set_destination(s4u::Host* destination);
-  s4u::Host* get_destination() const { return destination_; }
-  CommTaskPtr set_bytes(double bytes);
-  double get_bytes() const { return get_amount(); }
+  static ExecTaskPtr init(const std::string& name);
+  static ExecTaskPtr init(const std::string& name, double flops, Host* host);
+
+  ExecTaskPtr set_host(Host* host);
+  Host* get_host() const { return host_; }
+  ExecTaskPtr set_flops(double flops);
+  double get_flops() const { return get_amount(); }
 };
 
 class IoTask : public Task {
-  s4u::Disk* disk_;
-  s4u::Io::OpType type_;
+  Disk* disk_;
+  Io::OpType type_;
   explicit IoTask(const std::string& name);
   void fire() override;
 
 public:
   static IoTaskPtr init(const std::string& name);
-  static IoTaskPtr init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type);
-  IoTaskPtr set_disk(s4u::Disk* disk);
-  s4u::Disk* get_disk() const { return disk_; }
+  static IoTaskPtr init(const std::string& name, double bytes, Disk* disk, Io::OpType type);
+
+  IoTaskPtr set_disk(Disk* disk);
+  Disk* get_disk() const { return disk_; }
   IoTaskPtr set_bytes(double bytes);
   double get_bytes() const { return get_amount(); }
-  IoTaskPtr set_op_type(s4u::Io::OpType type);
-  s4u::Io::OpType get_op_type() const { return type_; }
+  IoTaskPtr set_op_type(Io::OpType type);
+  Io::OpType get_op_type() const { return type_; }
 };
-} // namespace simgrid::plugins
+} // namespace simgrid::s4u
 #endif
index 5f0f335..f71d652 100644 (file)
@@ -11,7 +11,6 @@
 #include "simgrid/kernel/ProfileBuilder.hpp"
 #include "simgrid/kernel/routing/NetPoint.hpp"
 #include <simgrid/Exception.hpp>
-#include <simgrid/plugins/task.hpp>
 #include <simgrid/s4u/Actor.hpp>
 #include <simgrid/s4u/Barrier.hpp>
 #include <simgrid/s4u/Comm.hpp>
@@ -25,6 +24,7 @@
 #include <simgrid/s4u/Mutex.hpp>
 #include <simgrid/s4u/NetZone.hpp>
 #include <simgrid/s4u/Semaphore.hpp>
+#include <simgrid/s4u/Task.hpp>
 #include <simgrid/version.h>
 
 #include <algorithm>
 #include <vector>
 
 namespace py = pybind11;
-using simgrid::plugins::CommTask;
-using simgrid::plugins::CommTaskPtr;
-using simgrid::plugins::ExecTask;
-using simgrid::plugins::ExecTaskPtr;
-using simgrid::plugins::IoTask;
-using simgrid::plugins::IoTaskPtr;
-using simgrid::plugins::Task;
-using simgrid::plugins::TaskPtr;
+using simgrid::s4u::CommTask;
+using simgrid::s4u::CommTaskPtr;
+using simgrid::s4u::ExecTask;
+using simgrid::s4u::ExecTaskPtr;
+using simgrid::s4u::IoTask;
+using simgrid::s4u::IoTaskPtr;
+using simgrid::s4u::Task;
+using simgrid::s4u::TaskPtr;
 using simgrid::s4u::Actor;
 using simgrid::s4u::ActorPtr;
 using simgrid::s4u::Barrier;
@@ -921,7 +921,6 @@ PYBIND11_MODULE(simgrid, m)
 
   /* Class Task */
   py::class_<Task, TaskPtr>(m, "Task", "Task. See the C++ documentation for details.")
-      .def_static("init", &Task::init)
       .def_static(
           "on_start_cb",
           [](py::object cb) {
@@ -934,11 +933,11 @@ PYBIND11_MODULE(simgrid, m)
           },
           "Add a callback called when each task starts.")
       .def_static(
-          "on_end_cb",
+          "on_completion_cb",
           [](py::object cb) {
             cb.inc_ref(); // keep alive after return
             const py::gil_scoped_release gil_release;
-            Task::on_end_cb([cb_p = cb.ptr()](Task* op) {
+            Task::on_completion_cb([cb_p = cb.ptr()](Task* op) {
               const py::gil_scoped_acquire py_context; // need a new context for callback
               py::reinterpret_borrow<py::function>(cb_p)(op);
             });
@@ -948,8 +947,8 @@ PYBIND11_MODULE(simgrid, m)
       .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
       .def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
       .def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
-      .def("enqueue_execs", py::overload_cast<int>(&Task::enqueue_execs), py::call_guard<py::gil_scoped_release>(),
-           py::arg("n"), "Enqueue executions for this task.")
+      .def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
+           py::arg("n"), "Enqueue firings for this task.")
       .def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
            py::arg("op"), "Add a successor to this task.")
       .def("remove_successor", py::overload_cast<TaskPtr>(&Task::remove_successor),
@@ -958,7 +957,7 @@ PYBIND11_MODULE(simgrid, m)
            "Remove all successors of this task.")
       .def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
            py::arg("func"), "Add a callback called when this task starts.")
-      .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+      .def("on_this_completion_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_completion_cb),
            py::arg("func"), "Add a callback called when this task ends.")
       .def(
           "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
index 6c81203..9017fd8 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "src/mc/explo/odpor/WakeupTreeIterator.hpp"
 #include "src/mc/explo/odpor/WakeupTree.hpp"
+#include "xbt/asserts.h"
 
 namespace simgrid::mc::odpor {
 
@@ -20,19 +21,19 @@ void WakeupTreeIterator::push_until_left_most_found()
   // there are no cycles. This means that at least
   // one node in the tree won't have any children,
   // so the loop will eventually terminate
-  auto* cur_top_node = *post_order_iteration.top();
+  WakeupTreeNode* cur_top_node = *post_order_iteration.top();
   while (not cur_top_node->is_leaf()) {
     // INVARIANT: Since we push children in
     // reverse order (right-most to left-most),
     // we ensure that we'll always process left-most
     // children first
     auto& children = cur_top_node->children_;
-
     for (auto iter = children.rbegin(); iter != children.rend(); ++iter) {
-      // iter.base() points one element past where we seek; hence,
-      // we move it over one position
+      // iter.base() points one element past where we seek; that is,
+      // we want the value one position forward
       post_order_iteration.push(std::prev(iter.base()));
     }
+    has_added_children.push(cur_top_node);
     cur_top_node = *post_order_iteration.top();
   }
 }
@@ -46,7 +47,6 @@ void WakeupTreeIterator::increment()
     return;
   }
 
-  auto prev_top_handle = post_order_iteration.top();
   post_order_iteration.pop();
 
   // If there are now no longer any nodes left,
@@ -57,15 +57,28 @@ void WakeupTreeIterator::increment()
     return;
   }
 
-  // Otherwise, look at the next top node. If
-  // `prev_top` is that node's right-most child,
-  // then we don't attempt to re-add `next_top`'s
-  // children again for we would have already seen them.
-  // To actually determine "right-most", we check if
-  // moving over to the right one spot brings us to the
-  // end of the candidate parent's list
-  const auto* next_top_node = *post_order_iteration.top();
-  if ((++prev_top_handle) != next_top_node->get_ordered_children().end()) {
+  xbt_assert(not has_added_children.empty(), "Invariant violated: There are more "
+                                             "nodes in the iteration that we must search "
+                                             "yet nobody has claimed to have added these nodes. "
+                                             "This implies that the algorithm is not iterating over "
+                                             "the wakeup tree is not following the post-fix order "
+                                             "correctly");
+
+  // Otherwise, look at what is the new, current top node.
+  // We're traversing the tree in
+  //
+  // If we've already added our children, we want
+  // to be sure not to add them again; but we ALSO
+  // want to be sure that we now start checking against
+  // the the node that's next in line as "finished"
+  //
+  // INVARIANT: Since we're searching in post-fix order,
+  // it always suffices to compare the current node
+  // with the top of the stack of nodes which have added their
+  // children
+  if (*post_order_iteration.top() == has_added_children.top()) {
+    has_added_children.pop();
+  } else {
     push_until_left_most_found();
   }
 }
index e42184c..c33d483 100644 (file)
@@ -56,6 +56,18 @@ private:
    */
   std::stack<node_handle> post_order_iteration;
 
+  /**
+   * @brief The nodes in the current ordering that have already
+   * added their own children
+   *
+   * We need to be able to determine whether to add the children
+   * of a given node. Eventually, we want to search that node itself,
+   * but we have to first search its children. Later, when we
+   * reach each node in this stack again, we'll remember not to add
+   * its children and will search the node in the stack instead.
+   */
+  std::stack<WakeupTreeNode*> has_added_children;
+
   /**
    * @brief Search the wakeup tree until a leaf node appears at the front
    * of the iteration, pushing all children towards the top of the stack
similarity index 57%
rename from src/plugins/task.cpp
rename to src/s4u/s4u_Task.cpp
index dca0c90..bda7ec1 100644 (file)
@@ -1,5 +1,6 @@
+#include <memory>
 #include <simgrid/Exception.hpp>
-#include <simgrid/plugins/task.hpp>
+#include <simgrid/s4u/Task.hpp>
 #include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/Exec.hpp>
 #include <simgrid/s4u/Io.hpp>
@@ -8,16 +9,13 @@
 #include "src/simgrid/module.hpp"
 
 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
-/** @defgroup plugin_task plugin_task Plugin Task
-
+/**
   @beginrst
 
-This is the task plugin, enabling management of Tasks.
-To activate this plugin, first call :cpp:func:`Task::init`.
 
 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
 Tasks can only be instancied using either
-:cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
+:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
 
@@ -25,12 +23,7 @@ A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API
  */
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
 
-namespace simgrid::plugins {
-
-xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
-
-xbt::signal<void(Task*)> Task::on_start;
-xbt::signal<void(Task*)> Task::on_end;
+namespace simgrid::s4u {
 
 Task::Task(const std::string& name) : name_(name) {}
 
@@ -40,7 +33,7 @@ Task::Task(const std::string& name) : name_(name) {}
  */
 bool Task::ready_to_run() const
 {
-  return not working_ && queued_execs_ > 0;
+  return not working_ && queued_firings_ > 0;
 }
 
 /**
@@ -52,7 +45,10 @@ bool Task::ready_to_run() const
 void Task::receive(Task* source)
 {
   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-  predecessors_[source]++;
+  auto source_count = predecessors_[source]++;
+  if (tokens_received_.size() <= queued_firings_ + source_count)
+    tokens_received_.push_back({});
+  tokens_received_[queued_firings_ + source_count][source] = source->token_;
   bool enough_tokens = true;
   for (auto const& [key, val] : predecessors_)
     if (val < 1) {
@@ -62,7 +58,7 @@ void Task::receive(Task* source)
   if (enough_tokens) {
     for (auto& [key, val] : predecessors_)
       val--;
-    enqueue_execs(1);
+    enqueue_firings(1);
   }
 }
 
@@ -77,11 +73,11 @@ void Task::receive(Task* source)
  */
 void Task::complete()
 {
-  xbt_assert(s4u::Actor::is_maestro());
+  xbt_assert(Actor::is_maestro());
   working_ = false;
   count_++;
-  on_this_end_(this);
-  Task::on_end(this);
+  on_this_completion(this);
+  on_completion(this);
   if (current_activity_)
     previous_activity_ = std::move(current_activity_);
   for (auto const& t : successors_)
@@ -90,42 +86,20 @@ void Task::complete()
     fire();
 }
 
-/** @ingroup plugin_task
- *  @brief Init the Task plugin.
- *  @note Add a completion callback to all Activities to call Task::complete().
- */
-void Task::init()
-{
-  static bool inited = false;
-  if (inited)
-    return;
-
-  inited                                  = true;
-  ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
-  simgrid::s4u::Exec::on_completion_cb(
-      [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
-  simgrid::s4u::Comm::on_completion_cb(
-      [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
-  simgrid::s4u::Io::on_completion_cb(
-      [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
-}
-
-/** @ingroup plugin_task
- *  @param n The number of executions to enqueue.
- *  @brief Enqueue executions.
- *  @note Immediatly starts an execution if possible.
+/** @param n The number of firings to enqueue.
+ *  @brief Enqueue firing.
+ *  @note Immediatly fire an activity if possible.
  */
-void Task::enqueue_execs(int n)
+void Task::enqueue_firings(int n)
 {
   simgrid::kernel::actor::simcall_answered([this, n] {
-    queued_execs_ += n;
+    queued_firings_ += n;
     if (ready_to_run())
       fire();
   });
 }
 
-/** @ingroup plugin_task
- *  @param amount The amount to set.
+/** @param amount The amount to set.
  *  @brief Set the amout of work to do.
  *  @note Amount in flop for ExecTask and in bytes for CommTask.
  */
@@ -134,8 +108,33 @@ void Task::set_amount(double amount)
   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
 }
 
-/** @ingroup plugin_task
- *  @param successor The Task to add.
+/** @param token The token to set.
+ *  @brief Set the token to send to successors.
+ *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ */
+void Task::set_token(std::shared_ptr<Token> token)
+{
+  simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+}
+
+/** @return Map of tokens received for the next execution.
+ *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+ */
+std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+{
+  return tokens_received_.front()[t];
+}
+
+void Task::fire() {
+  on_this_start(this);
+  on_start(this);
+  working_ = true;
+  queued_firings_ = std::max(queued_firings_ - 1, 0);
+  if (tokens_received_.size() > 0)
+    tokens_received_.pop_front();
+}
+
+/** @param successor The Task to add.
  *  @brief Add a successor to this Task.
  *  @note It also adds this as a predecessor of successor.
  */
@@ -147,8 +146,7 @@ void Task::add_successor(TaskPtr successor)
   });
 }
 
-/** @ingroup plugin_task
- *  @param successor The Task to remove.
+/** @param successor The Task to remove.
  *  @brief Remove a successor from this Task.
  *  @note It also remove this from the predecessors of successor.
  */
@@ -171,34 +169,6 @@ void Task::remove_all_successors()
   });
 }
 
-/** @ingroup plugin_task
- *  @param func The function to set.
- *  @brief Set a function to be called before each execution.
- *  @note The function is called before the underlying Activity starts.
- */
-void Task::on_this_start_cb(const std::function<void(Task*)>& func)
-{
-  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
-}
-
-/** @ingroup plugin_task
- *  @param func The function to set.
- *  @brief Set a function to be called after each execution.
- *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
- */
-void Task::on_this_end_cb(const std::function<void(Task*)>& func)
-{
-  simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
-}
-
-/** @ingroup plugin_task
- *  @brief Return the number of completed executions.
- */
-int Task::get_count() const
-{
-  return count_;
-}
-
 /**
  *  @brief Default constructor.
  */
@@ -215,7 +185,7 @@ ExecTaskPtr ExecTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
-ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
+ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
 {
   return init(name)->set_flops(flops)->set_host(host);
 }
@@ -227,25 +197,18 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos
  */
 void ExecTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_          = true;
-  queued_execs_     = std::max(queued_execs_ - 1, 0);
-  s4u::ExecPtr exec = s4u::Exec::init();
-  exec->set_name(name_);
-  exec->set_flops_amount(amount_);
-  exec->set_host(host_);
+  Task::fire();
+  auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
   exec->start();
-  exec->extension_set(new ExtendedAttributeActivity());
-  exec->extension<ExtendedAttributeActivity>()->task_ = this;
-  current_activity_                                   = exec;
+  exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
+  set_current_activity(exec);
 }
 
 /** @ingroup plugin_task
  *  @param host The host to set.
  *  @brief Set a new host.
  */
-ExecTaskPtr ExecTask::set_host(s4u::Host* host)
+ExecTaskPtr ExecTask::set_host(Host* host)
 {
   kernel::actor::simcall_answered([this, host] { host_ = host; });
   return this;
@@ -256,7 +219,7 @@ ExecTaskPtr ExecTask::set_host(s4u::Host* host)
  */
 ExecTaskPtr ExecTask::set_flops(double flops)
 {
-  kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
+  kernel::actor::simcall_answered([this, flops] { set_amount(flops); });
   return this;
 }
 
@@ -276,7 +239,7 @@ CommTaskPtr CommTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart constructor.
  */
-CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
+CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
 {
   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
 }
@@ -288,24 +251,18 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou
  */
 void CommTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_          = true;
-  queued_execs_     = std::max(queued_execs_ - 1, 0);
-  s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
-  comm->set_name(name_);
-  comm->set_payload_size(amount_);
+  Task::fire();
+  auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
   comm->start();
-  comm->extension_set(new ExtendedAttributeActivity());
-  comm->extension<ExtendedAttributeActivity>()->task_ = this;
-  current_activity_                                   = comm;
+  comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
+  set_current_activity(comm);
 }
 
 /** @ingroup plugin_task
  *  @param source The host to set.
  *  @brief Set a new source host.
  */
-CommTaskPtr CommTask::set_source(s4u::Host* source)
+CommTaskPtr CommTask::set_source(Host* source)
 {
   kernel::actor::simcall_answered([this, source] { source_ = source; });
   return this;
@@ -315,7 +272,7 @@ CommTaskPtr CommTask::set_source(s4u::Host* source)
  *  @param destination The host to set.
  *  @brief Set a new destination host.
  */
-CommTaskPtr CommTask::set_destination(s4u::Host* destination)
+CommTaskPtr CommTask::set_destination(Host* destination)
 {
   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
   return this;
@@ -326,7 +283,7 @@ CommTaskPtr CommTask::set_destination(s4u::Host* destination)
  */
 CommTaskPtr CommTask::set_bytes(double bytes)
 {
-  kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+  kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
   return this;
 }
 
@@ -346,7 +303,7 @@ IoTaskPtr IoTask::init(const std::string& name)
 /** @ingroup plugin_task
  *  @brief Smart Constructor.
  */
-IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
+IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
 {
   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
 }
@@ -355,7 +312,7 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s
  *  @param disk The disk to set.
  *  @brief Set a new disk.
  */
-IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
+IoTaskPtr IoTask::set_disk(Disk* disk)
 {
   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
   return this;
@@ -366,12 +323,12 @@ IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
  */
 IoTaskPtr IoTask::set_bytes(double bytes)
 {
-  kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
+  kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
   return this;
 }
 
 /** @ingroup plugin_task */
-IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
+IoTaskPtr IoTask::set_op_type(Io::OpType type)
 {
   kernel::actor::simcall_answered([this, type] { type_ = type; });
   return this;
@@ -379,19 +336,11 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
 
 void IoTask::fire()
 {
-  on_this_start_(this);
-  Task::on_start(this);
-  working_      = true;
-  queued_execs_ = std::max(queued_execs_ - 1, 0);
-  s4u::IoPtr io = s4u::Io::init();
-  io->set_name(name_);
-  io->set_size(amount_);
-  io->set_disk(disk_);
-  io->set_op_type(type_);
+  Task::fire();
+  auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
   io->start();
-  io->extension_set(new ExtendedAttributeActivity());
-  io->extension<ExtendedAttributeActivity>()->task_ = this;
-  current_activity_                                 = io;
+  io->on_this_completion_cb([this](Io const&) { this->complete(); });
+  set_current_activity(io);
 }
 
-} // namespace simgrid::plugins
+} // namespace simgrid::s4u
index 351c4e4..8170ff0 100644 (file)
@@ -76,12 +76,11 @@ static void get_set_disk_data(simgrid::s4u::Disk* disk)
 {
   XBT_INFO("*** GET/SET DATA for disk: %s ***", disk->get_cname());
 
-  const std::string* data = disk->get_data<std::string>();
+  auto data = disk->get_unique_data<std::string>();
   XBT_INFO("Get data: '%s'", data ? data->c_str() : "No User Data");
   disk->set_data(new std::string("Some data"));
-  data = disk->get_data<std::string>();
+  data = disk->get_unique_data<std::string>();
   XBT_INFO("  Set and get data: '%s'", data->c_str());
-  delete data;
 }
 
 static void dump_platform_disks()
index 5fa39d6..9b9aaa8 100644 (file)
@@ -25,7 +25,7 @@ set(EXTRA_DIST
   src/kernel/resource/models/network_ns3.hpp
   src/kernel/resource/models/ns3/ns3_simulator.hpp
   src/kernel/resource/models/ptask_L07.hpp
-  
+
   src/mc/datatypes.h
   src/mc/mc.h
   src/mc/mc_mmu.hpp
@@ -56,7 +56,7 @@ set(EXTRA_DIST
   src/xbt/log_private.hpp
   src/xbt/mallocator_private.h
   src/xbt/parmap.hpp
-  
+
   src/xbt/mmalloc/mmalloc.h
   src/xbt/mmalloc/mfree.c
   src/xbt/mmalloc/mm_legacy.c
@@ -342,7 +342,7 @@ set(KERNEL_SRC
   src/kernel/actor/SimcallObserver.hpp
   src/kernel/actor/SynchroObserver.cpp
   src/kernel/actor/SynchroObserver.hpp
-  
+
   src/kernel/context/Context.cpp
   src/kernel/context/Context.hpp
   src/kernel/context/ContextRaw.cpp
@@ -454,7 +454,6 @@ set(PLUGINS_SRC
   src/plugins/vm/VmLiveMigration.hpp
   src/plugins/vm/dirty_page_tracking.cpp
   src/plugins/battery.cpp
-  src/plugins/task.cpp
   src/plugins/photovoltaic.cpp
   )
 
@@ -475,6 +474,7 @@ set(S4U_SRC
   src/s4u/s4u_Mutex.cpp
   src/s4u/s4u_Netzone.cpp
   src/s4u/s4u_Semaphore.cpp
+  src/s4u/s4u_Task.cpp
   src/s4u/s4u_VirtualMachine.cpp
 )
 
@@ -521,7 +521,7 @@ set(MC_SRC_BASE
   src/mc/mc_replay.hpp
   src/mc/transition/Transition.cpp
   )
-  
+
 set(MC_SRC_STATELESS
   src/mc/api/ActorState.hpp
   src/mc/api/ClockVector.cpp
@@ -530,7 +530,7 @@ set(MC_SRC_STATELESS
   src/mc/api/State.hpp
   src/mc/api/RemoteApp.cpp
   src/mc/api/RemoteApp.hpp
-  
+
   src/mc/explo/DFSExplorer.cpp
   src/mc/explo/DFSExplorer.hpp
   src/mc/explo/Exploration.cpp
@@ -577,7 +577,7 @@ set(MC_SRC_STATEFUL
   src/mc/explo/LivenessChecker.hpp
   src/mc/explo/UdporChecker.cpp
   src/mc/explo/UdporChecker.hpp
-  
+
   src/mc/explo/udpor/Comb.hpp
   src/mc/explo/udpor/Configuration.hpp
   src/mc/explo/udpor/Configuration.cpp
@@ -595,7 +595,7 @@ set(MC_SRC_STATEFUL
   src/mc/explo/udpor/Unfolding.hpp
   src/mc/explo/udpor/udpor_forward.hpp
   src/mc/explo/udpor/udpor_tests_private.hpp
-  
+
   src/mc/inspect/DwarfExpression.cpp
   src/mc/inspect/DwarfExpression.hpp
   src/mc/inspect/Frame.cpp
@@ -614,7 +614,7 @@ set(MC_SRC_STATEFUL
   src/mc/inspect/mc_unw.cpp
   src/mc/inspect/mc_unw.hpp
   src/mc/inspect/mc_unw_vmread.cpp
-  
+
   src/mc/sosp/ChunkedData.cpp
   src/mc/sosp/ChunkedData.hpp
   src/mc/sosp/PageStore.cpp
@@ -641,7 +641,7 @@ set(MC_SRC_STATEFUL
   src/mc/api/strategy/MinMatchComm.hpp
   src/mc/api/strategy/Strategy.hpp
   src/mc/api/strategy/UniformStrategy.hpp
-  
+
   src/xbt/mmalloc/mm_interface.c
   )
 
@@ -661,7 +661,6 @@ set(headers_to_install
   include/simgrid/plugins/file_system.h
   include/simgrid/plugins/live_migration.h
   include/simgrid/plugins/load.h
-  include/simgrid/plugins/task.hpp
   include/simgrid/plugins/photovoltaic.hpp
   include/simgrid/plugins/ProducerConsumer.hpp
   include/simgrid/instr.h
@@ -695,6 +694,7 @@ set(headers_to_install
   include/simgrid/s4u/Mutex.hpp
   include/simgrid/s4u/NetZone.hpp
   include/simgrid/s4u/Semaphore.hpp
+  include/simgrid/s4u/Task.hpp
   include/simgrid/s4u/VirtualMachine.hpp
   include/simgrid/s4u.hpp