Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix memleak in battery examples and task-storm
[simgrid.git] / examples / cpp / task-storm / s4u-task-storm.cpp
index 0319d17..ca0f61e 100644 (file)
@@ -3,16 +3,16 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
-   and use them to build a simulation of a stream processing application
+/* This example takes the main concepts of Apache Storm presented here
+   https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
+   application
 
    Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
 
    Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
-   Spout SB produces 1e6 bytes every 200ms.  
-   
-   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
-   Bolt B3 processes data from Spout SB.
-   Bolt B4 processes data from Bolt B3.
-   
+   Spout SB produces 1e6 bytes every 200ms.
+
+   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
+   bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.
+
                         Fafard
                         ┌────┐
                     ┌──►│ B1 │
                         Fafard
                         ┌────┐
                     ┌──►│ B1 │
           ┌────┐     │          │
           │ SB ├─────┤ B3 ──► B4│
           └────┘     │          │
           ┌────┐     │          │
           │ SB ├─────┤ B3 ──► B4│
           └────┘     │          │
-                     └──────────┘     
+                     └──────────┘
  */
 
  */
 
-#include "simgrid/plugins/task.hpp"
 #include "simgrid/s4u.hpp"
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
 #include "simgrid/s4u.hpp"
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
-
-struct Token {
-  double data_ = 0;
-  Token(double data) : data_(data) {}
-};
+namespace sg4 = simgrid::s4u;
 
 int main(int argc, char* argv[])
 {
 
 int main(int argc, char* argv[])
 {
-  simgrid::s4u::Engine e(&argc, argv);
+  sg4::Engine e(&argc, argv);
   e.load_platform(argv[1]);
   e.load_platform(argv[1]);
-  simgrid::plugins::Task::init();
 
   // Retrieve hosts
   auto tremblay = e.host_by_name("Tremblay");
   auto jupiter  = e.host_by_name("Jupiter");
 
   // Retrieve hosts
   auto tremblay = e.host_by_name("Tremblay");
   auto jupiter  = e.host_by_name("Jupiter");
-  auto fafard  = e.host_by_name("Fafard");
+  auto fafard   = e.host_by_name("Fafard");
   auto ginette  = e.host_by_name("Ginette");
   auto bourassa = e.host_by_name("Bourassa");
 
   // Create execution tasks
   auto ginette  = e.host_by_name("Ginette");
   auto bourassa = e.host_by_name("Bourassa");
 
   // Create execution tasks
-  auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
-  auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
-  auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard);
-  auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette);
-  auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa);
-  auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa);
+  auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+  auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+  auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+  auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+  auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+  auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
 
   // Create communication tasks
 
   // Create communication tasks
-  auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard); 
-  auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette);
-  auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+  auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+  auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+  auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
 
   // Create the graph by defining dependencies between tasks
   // Some dependencies are defined dynamically
 
   // Create the graph by defining dependencies between tasks
   // Some dependencies are defined dynamically
@@ -80,56 +74,59 @@ int main(int argc, char* argv[])
      Alternatively we: remove/add the link between SA and SA_to_B2
                        add/remove the link between SA and SA_to_B1
   */
      Alternatively we: remove/add the link between SA and SA_to_B2
                        add/remove the link between SA and SA_to_B1
   */
-  SA->on_this_start_cb([&](simgrid::plugins::Task* t) {
+  SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) {
     int count = t->get_count();
     int count = t->get_count();
-    simgrid::plugins::CommTaskPtr comm;
-    if (count % 2 == 0) {
+    sg4::CommTaskPtr comm;
+    if (count % 2 == 1) {
       t->remove_successor(SA_to_B2);
       t->add_successor(SA_to_B1);
       comm = SA_to_B1;
       t->remove_successor(SA_to_B2);
       t->add_successor(SA_to_B1);
       comm = SA_to_B1;
-    }
-    else {
+    } else {
       t->remove_successor(SA_to_B1);
       t->add_successor(SA_to_B2);
       comm = SA_to_B2;
     }
       t->remove_successor(SA_to_B1);
       t->add_successor(SA_to_B2);
       comm = SA_to_B2;
     }
-    std::vector<double> amount = {1e3,1e6,1e9};
+    std::vector<double> amount = {1e9, 1e3, 1e6};
+    // XBT_INFO("Comm %f", amount[count % 3]);
     comm->set_amount(amount[count % 3]);
     comm->set_amount(amount[count % 3]);
-    auto token = std::make_shared<Token>(amount[count % 3]);
+    auto token = std::make_shared<sg4::Token>();
+    token->set_data(new double(amount[count % 3]));
     t->set_token(token);
   });
 
   // The token sent by SA is forwarded by both communication tasks
     t->set_token(token);
   });
 
   // The token sent by SA is forwarded by both communication tasks
-  SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    t->set_token(t->get_tokens()[SA]);
+  SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
   });
   });
-  SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    t->set_token(t->get_tokens()[SA]);
+  SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
   });
 
   /* B1 and B2 read the value of the token received by their predecessors
      and use it to adapt their amount of work to do.
   });
 
   /* B1 and B2 read the value of the token received by their predecessors
      and use it to adapt their amount of work to do.
-  */ 
-  B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    auto tokens_map = t->get_tokens();
-    Token* tok = (Token*)(tokens_map[SA_to_B1].get());
-    t->set_amount(tok->data_ * 10);
+  */
+  B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B1)->get_data<double>();
+    t->deque_token_from(SA_to_B1);
+    t->set_amount(*data * 10);
+    delete data;
   });
   });
-  B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    auto tokens_map = t->get_tokens();
-    Token* tok = (Token*)(tokens_map[SA_to_B2].get());
-    t->set_amount(tok->data_ * 10);
+  B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B2)->get_data<double>();
+    t->deque_token_from(SA_to_B2);
+    t->set_amount(*data * 10);
+    delete data;
   });
 
   });
 
-  // Enqueue executions for tasks without predecessors
-  SA->enqueue_execs(5);
-  SB->enqueue_execs(5);
+  // Enqueue firings for tasks without predecessors
+  SA->enqueue_firings(5);
+  SB->enqueue_firings(5);
 
   // Add a function to be called when tasks end for log purpose
 
   // Add a function to be called when tasks end for log purpose
-  simgrid::plugins::Task::on_end_cb([]
-  (const simgrid::plugins::Task* t) {
-    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-  });
+  sg4::Task::on_completion_cb(
+      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
 
   // Start the simulation
   e.run();
 
   // Start the simulation
   e.run();