Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of https://framagit.org/simgrid/simgrid
authormlaurent <mathieu.laurent@ens-rennes.fr>
Tue, 27 Jun 2023 09:20:07 +0000 (11:20 +0200)
committermlaurent <mathieu.laurent@ens-rennes.fr>
Tue, 27 Jun 2023 09:20:07 +0000 (11:20 +0200)
12 files changed:
1  2 
examples/cpp/task-io/s4u-task-io.cpp
examples/cpp/task-simple/s4u-task-simple.cpp
examples/cpp/task-storm/s4u-task-storm.cpp
examples/cpp/task-switch-host/s4u-task-switch-host.cpp
examples/cpp/task-variable-load/s4u-task-variable-load.cpp
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Link.hpp
include/simgrid/s4u/Task.hpp
src/bindings/python/simgrid_python.cpp
src/mc/api/strategy/MaxMatchComm.hpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Task.cpp

@@@ -13,8 -13,8 +13,8 @@@
   * comm is a communication task.
   */
  
- #include "simgrid/plugins/task.hpp"
 -#include "simgrid/s4u/Task.hpp"
  #include "simgrid/s4u.hpp"
++#include "simgrid/s4u/Task.hpp"
  #include <simgrid/plugins/file_system.h>
  
  XBT_LOG_NEW_DEFAULT_CATEGORY(task_simple, "Messages specific for this task example");
@@@ -41,12 -41,12 +41,11 @@@ int main(int argc, char* argv[]
    read->add_successor(exec2);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
-   // Enqueue two executions for task exec1
-   exec1->enqueue_execs(2);
+   // Enqueue two firings for task exec1
+   exec1->enqueue_firings(2);
  
    // Start the simulation
    e.run();
@@@ -38,12 -38,12 +38,11 @@@ int main(int argc, char* argv[]
    comm->add_successor(exec2);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
-   // Enqueue two executions for task exec1
-   exec1->enqueue_execs(2);
+   // Enqueue two firings for task exec1
+   exec1->enqueue_firings(2);
  
    // Start the simulation
    e.run();
index 0000000,2c4edb1..d290ca0
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,130 +1,123 @@@
 -/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
 -   and use them to build a simulation of a stream processing application
+ /* Copyright (c) 2017-2023. The SimGrid Team. All rights reserved.          */
+ /* This program is free software; you can redistribute it and/or modify it
+  * under the terms of the license (GNU LGPL) which comes with this package. */
 -   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
 -   Bolt B3 processes data from Spout SB.
 -   Bolt B4 processes data from Bolt B3.
++/* This example takes the main concepts of Apache Storm presented here
++   https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
++   application
+    Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
+    Spout SB produces 1e6 bytes every 200ms.
 -  auto fafard  = e.host_by_name("Fafard");
++   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
++   bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.
+                         Fafard
+                         ┌────┐
+                     ┌──►│ B1 │
+          Tremblay   │   └────┘
+           ┌────┐    │
+           │ SA ├────┤  Ginette
+           └────┘    │   ┌────┐
+                     └──►│ B2 │
+                         └────┘
+                        Bourassa
+          Jupiter     ┌──────────┐
+           ┌────┐     │          │
+           │ SB ├─────┤ B3 ──► B4│
+           └────┘     │          │
+                      └──────────┘
+  */
+ #include "simgrid/s4u.hpp"
+ XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
+ namespace sg4 = simgrid::s4u;
+ int main(int argc, char* argv[])
+ {
+   sg4::Engine e(&argc, argv);
+   e.load_platform(argv[1]);
+   // Retrieve hosts
+   auto tremblay = e.host_by_name("Tremblay");
+   auto jupiter  = e.host_by_name("Jupiter");
 -  SA->on_this_start_cb([SA_to_B1,SA_to_B2](sg4::Task* t) {
++  auto fafard   = e.host_by_name("Fafard");
+   auto ginette  = e.host_by_name("Ginette");
+   auto bourassa = e.host_by_name("Bourassa");
+   // Create execution tasks
+   auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+   auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+   auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+   auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+   auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+   auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
+   // Create communication tasks
+   auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+   auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+   auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+   // Create the graph by defining dependencies between tasks
+   // Some dependencies are defined dynamically
+   SA_to_B1->add_successor(B1);
+   SA_to_B2->add_successor(B2);
+   SB->add_successor(SB_to_B3);
+   SB_to_B3->add_successor(B3);
+   B3->add_successor(B4);
+   /* Dynamic modification of the graph and bytes sent
+      Alternatively we: remove/add the link between SA and SA_to_B2
+                        add/remove the link between SA and SA_to_B1
+   */
 -    }
 -    else {
++  SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+     int count = t->get_count();
+     sg4::CommTaskPtr comm;
+     if (count % 2 == 0) {
+       t->remove_successor(SA_to_B2);
+       t->add_successor(SA_to_B1);
+       comm = SA_to_B1;
 -    std::vector<double> amount = {1e3,1e6,1e9};
++    } else {
+       t->remove_successor(SA_to_B1);
+       t->add_successor(SA_to_B2);
+       comm = SA_to_B2;
+     }
 -  SA_to_B1->on_this_start_cb([SA](sg4::Task* t) {
 -    t->set_token(t->get_next_token_from(SA));
 -  });
 -  SA_to_B2->on_this_start_cb([SA](sg4::Task* t) {
 -    t->set_token(t->get_next_token_from(SA));
 -  });
++    std::vector<double> amount = {1e3, 1e6, 1e9};
+     comm->set_amount(amount[count % 3]);
+     auto token = std::make_shared<sg4::Token>();
+     token->set_data(new double(amount[count % 3]));
+     t->set_token(token);
+   });
+   // The token sent by SA is forwarded by both communication tasks
 -  sg4::Task::on_completion_cb([]
 -  (const sg4::Task* t) {
 -    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
 -  });
++  SA_to_B1->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
++  SA_to_B2->on_this_start_cb([SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+   /* B1 and B2 read the value of the token received by their predecessors
+      and use it to adapt their amount of work to do.
+   */
+   B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
+     auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+     t->set_amount(*data * 10);
+   });
+   B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
+     auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+     t->set_amount(*data * 10);
+   });
+   // Enqueue firings for tasks without predecessors
+   SA->enqueue_firings(5);
+   SB->enqueue_firings(5);
+   // Add a function to be called when tasks end for log purpose
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
+   // Start the simulation
+   e.run();
+   return 0;
+ }
@@@ -47,15 -46,14 +46,13 @@@ int main(int argc, char* argv[]
    exec2->add_successor(comm2);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
-   // Add a function to be called before each executions of comm0
+   // Add a function to be called before each firing of comm0
    // This function modifies the graph of tasks by adding or removing
    // successors to comm0
-   comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) {
-     auto* comm0      = dynamic_cast<simgrid::plugins::CommTask*>(t);
+   comm0->on_this_start_cb([comm0, exec1, exec2, jupiter, fafard](sg4::Task*) {
      static int count = 0;
      if (count % 2 == 0) {
        comm0->set_destination(jupiter);
@@@ -51,12 -50,12 +50,11 @@@ int main(int argc, char* argv[]
    comm->add_successor(exec);
  
    // Add a function to be called when tasks end for log purpose
-   simgrid::plugins::Task::on_end_cb([](const simgrid::plugins::Task* t) {
 -  sg4::Task::on_completion_cb([](const sg4::Task* t) {
--    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
--  });
++  sg4::Task::on_completion_cb(
++      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
  
    // Create the actor that will inject load during the simulation
-   simgrid::s4u::Actor::create("input", tremblay, variable_load, comm);
+   sg4::Actor::create("input", tremblay, variable_load, comm);
  
    // Start the simulation
    e.run();
@@@ -267,10 -263,10 +263,16 @@@ public
     *  dependency or no resource assigned) */
    void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
  
-   XBT_ATTRIB_DEPRECATED_v337("Please use on_suspend_cb() instead") static void on_suspended_cb(
-       const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
-   XBT_ATTRIB_DEPRECATED_v337("Please use on_resume_cb() instead") static void on_resumed_cb(
-       const std::function<void(Activity const&)>& cb) { on_resume.connect(cb);  }
+   XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
 -      const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
++      const std::function<void(Activity const&)>& cb)
++  {
++    on_suspend.connect(cb);
++  }
+   XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
 -      const std::function<void(Activity const&)>& cb) { on_resume.connect(cb);  }
++      const std::function<void(Activity const&)>& cb)
++  {
++    on_resume.connect(cb);
++  }
  
    AnyActivity* add_successor(ActivityPtr a)
    {
@@@ -133,10 -133,10 +133,7 @@@ public
    double get_load() const;
  
  #ifndef DOXYGEN
-   XBT_ATTRIB_DEPRECATED_v337("Please use get_load() instead") double get_usage() const
 -  XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const
--  {
--    return get_load();
--  }
++  XBT_ATTRIB_DEPRECATED_v338("Please use get_load() instead") double get_usage() const { return get_load(); }
  #endif
  
    /** @brief Check if the Link is used (at least one flow uses the link) */
@@@ -23,59 -24,68 +24,68 @@@ using CommTaskPtr = boost::intrusive_pt
  class IoTask;
  using IoTaskPtr = boost::intrusive_ptr<IoTask>;
  
- struct ExtendedAttributeActivity {
-   static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
-   Task* task_;
- };
+ class XBT_PUBLIC Token : public xbt::Extendable<Token> {};
  
  class Task {
 -  int count_        = 0;
 -  bool working_     = false;
+   std::string name_;
+   double amount_;
+   int queued_firings_ = 0;
++  int count_          = 0;
++  bool working_       = false;
    std::set<Task*> successors_                 = {};
    std::map<Task*, unsigned int> predecessors_ = {};
+   std::atomic_int_fast32_t refcount_{0};
  
    bool ready_to_run() const;
    void receive(Task* source);
-   void complete();
+   std::shared_ptr<Token> token_ = nullptr;
+   std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+   ActivityPtr previous_activity_;
+   ActivityPtr current_activity_;
  
  protected:
-   std::string name_;
-   double amount_;
-   int queued_execs_ = 0;
-   int count_        = 0;
-   bool working_     = false;
-   s4u::ActivityPtr previous_activity_;
-   s4u::ActivityPtr current_activity_;
-   xbt::signal<void(Task*)> on_this_start_;
-   xbt::signal<void(Task*)> on_this_end_;
    explicit Task(const std::string& name);
--  virtual ~Task()     = default;
-   virtual void fire() = 0;
++  virtual ~Task() = default;
  
-   static xbt::signal<void(Task*)> on_start;
-   static xbt::signal<void(Task*)> on_end;
-   std::atomic_int_fast32_t refcount_{0};
+   virtual void fire();
+   void complete();
 -  void set_current_activity (ActivityPtr a) { current_activity_ = a; }
++  void set_current_activity(ActivityPtr a) { current_activity_ = a; }
+   inline static xbt::signal<void(Task*)> on_start;
+   xbt::signal<void(Task*)> on_this_start;
+   inline static xbt::signal<void(Task*)> on_completion;
+   xbt::signal<void(Task*)> on_this_completion;
  
  public:
-   static void init();
    const std::string& get_name() const { return name_; }
    const char* get_cname() const { return name_.c_str(); }
-   void enqueue_execs(int n);
    void set_amount(double amount);
    double get_amount() const { return amount_; }
+   int get_count() const { return count_; }
+   void set_token(std::shared_ptr<Token> token);
+   std::shared_ptr<Token> get_next_token_from(TaskPtr t);
    void add_successor(TaskPtr t);
    void remove_successor(TaskPtr t);
    void remove_all_successors();
    const std::set<Task*>& get_successors() const { return successors_; }
-   void on_this_start_cb(const std::function<void(Task*)>& func);
-   void on_this_end_cb(const std::function<void(Task*)>& func);
-   int get_count() const;
  
-   /** Add a callback fired before a task activity start.
+   void enqueue_firings(int n);
+   /** Add a callback fired before this task activity starts */
 -  void on_this_start_cb(const std::function<void(Task*)>& func){ on_this_start.connect(func); }
++  void on_this_start_cb(const std::function<void(Task*)>& func) { on_this_start.connect(func); }
+   /** Add a callback fired before a task activity starts.
     * Triggered after the on_this_start function**/
    static void on_start_cb(const std::function<void(Task*)>& cb) { on_start.connect(cb); }
-   /** Add a callback fired after a task activity end.
-    * Triggered after the on_this_end function, but before
-    * sending tokens to successors.**/
-   static void on_end_cb(const std::function<void(Task*)>& cb) { on_end.connect(cb); }
+   /** Add a callback fired before this task activity ends */
+   void on_this_completion_cb(const std::function<void(Task*)>& func) { on_this_completion.connect(func); };
+   /** Add a callback fired after a task activity ends.
+    * Triggered after the on_this_end function, but before sending tokens to successors.**/
+   static void on_completion_cb(const std::function<void(Task*)>& cb) { on_completion.connect(cb); }
  
  #ifndef DOXYGEN
    friend void intrusive_ptr_release(Task* o)
  #include <vector>
  
  namespace py = pybind11;
- using simgrid::plugins::CommTask;
- using simgrid::plugins::CommTaskPtr;
- using simgrid::plugins::ExecTask;
- using simgrid::plugins::ExecTaskPtr;
- using simgrid::plugins::IoTask;
- using simgrid::plugins::IoTaskPtr;
- using simgrid::plugins::Task;
- using simgrid::plugins::TaskPtr;
 -using simgrid::s4u::CommTask;
 -using simgrid::s4u::CommTaskPtr;
 -using simgrid::s4u::ExecTask;
 -using simgrid::s4u::ExecTaskPtr;
 -using simgrid::s4u::IoTask;
 -using simgrid::s4u::IoTaskPtr;
 -using simgrid::s4u::Task;
 -using simgrid::s4u::TaskPtr;
  using simgrid::s4u::Actor;
  using simgrid::s4u::ActorPtr;
  using simgrid::s4u::Barrier;
  using simgrid::s4u::BarrierPtr;
  using simgrid::s4u::Comm;
  using simgrid::s4u::CommPtr;
++using simgrid::s4u::CommTask;
++using simgrid::s4u::CommTaskPtr;
  using simgrid::s4u::Disk;
  using simgrid::s4u::Engine;
++using simgrid::s4u::ExecTask;
++using simgrid::s4u::ExecTaskPtr;
  using simgrid::s4u::Host;
  using simgrid::s4u::Io;
++using simgrid::s4u::IoTask;
++using simgrid::s4u::IoTaskPtr;
  using simgrid::s4u::Link;
  using simgrid::s4u::Mailbox;
  using simgrid::s4u::Mutex;
  using simgrid::s4u::MutexPtr;
  using simgrid::s4u::Semaphore;
  using simgrid::s4u::SemaphorePtr;
++using simgrid::s4u::Task;
++using simgrid::s4u::TaskPtr;
  
  XBT_LOG_NEW_DEFAULT_CATEGORY(python, "python");
  
@@@ -56,10 -55,9 +55,9 @@@ public
          if (mailbox_.count(cast_recv->get_mailbox()) > 0 and
              mailbox_.at(cast_recv->get_mailbox()) > 0) { 
              aid_value--; // This means we have waiting recv corresponding to this recv
--        } else { 
--            aid_value++; 
--        }
++        } else {
++              aid_value++;
++          }
        }
     
        const CommSendTransition* cast_send = dynamic_cast<CommSendTransition const*>(transition);
Simple merge
@@@ -1,8 -1,9 +1,9 @@@
+ #include <memory>
  #include <simgrid/Exception.hpp>
- #include <simgrid/plugins/task.hpp>
 -#include <simgrid/s4u/Task.hpp>
  #include <simgrid/s4u/Comm.hpp>
  #include <simgrid/s4u/Exec.hpp>
  #include <simgrid/s4u/Io.hpp>
++#include <simgrid/s4u/Task.hpp>
  #include <simgrid/simix.hpp>
  
  #include "src/simgrid/module.hpp"
@@@ -52,8 -45,11 +45,11 @@@ bool Task::ready_to_run() cons
  void Task::receive(Task* source)
  {
    XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
-   predecessors_[source]++;
-   bool enough_tokens = true;
+   auto source_count = predecessors_[source]++;
+   if (tokens_received_.size() <= queued_firings_ + source_count)
+     tokens_received_.push_back({});
+   tokens_received_[queued_firings_ + source_count][source] = source->token_;
 -  bool enough_tokens = true;
++  bool enough_tokens                                       = true;
    for (auto const& [key, val] : predecessors_)
      if (val < 1) {
        enough_tokens = false;
@@@ -134,8 -108,33 +108,34 @@@ void Task::set_amount(double amount
    simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
  }
  
- /** @ingroup plugin_task
-  *  @param successor The Task to add.
+ /** @param token The token to set.
+  *  @brief Set the token to send to successors.
+  *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+  */
+ void Task::set_token(std::shared_ptr<Token> token)
+ {
+   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
+ }
+ /** @return Map of tokens received for the next execution.
+  *  @note If there is no queued execution for this task the map might not exist or be partially empty.
+  */
+ std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
+ {
+   return tokens_received_.front()[t];
+ }
 -void Task::fire() {
++void Task::fire()
++{
+   on_this_start(this);
+   on_start(this);
 -  working_ = true;
++  working_        = true;
+   queued_firings_ = std::max(queued_firings_ - 1, 0);
+   if (tokens_received_.size() > 0)
+     tokens_received_.pop_front();
+ }
+ /** @param successor The Task to add.
   *  @brief Add a successor to this Task.
   *  @note It also adds this as a predecessor of successor.
   */