Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix memleak in battery examples and task-storm
[simgrid.git] / examples / cpp / task-storm / s4u-task-storm.cpp
index 0319d17..ca0f61e 100644 (file)
@@ -3,16 +3,16 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
-/* This example takes the main concepts of Apache Storm presented here https://storm.apache.org/releases/2.4.0/Concepts.html
-   and use them to build a simulation of a stream processing application
+/* This example takes the main concepts of Apache Storm presented here
+   https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
+   application
 
    Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
-   Spout SB produces 1e6 bytes every 200ms.  
-   
-   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per bytes
-   Bolt B3 processes data from Spout SB.
-   Bolt B4 processes data from Bolt B3.
-   
+   Spout SB produces 1e6 bytes every 200ms.
+
+   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
+   bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.
+
                         Fafard
                         ┌────┐
                     ┌──►│ B1 │
           ┌────┐     │          │
           │ SB ├─────┤ B3 ──► B4│
           └────┘     │          │
-                     └──────────┘     
+                     └──────────┘
  */
 
-#include "simgrid/plugins/task.hpp"
 #include "simgrid/s4u.hpp"
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
-
-struct Token {
-  double data_ = 0;
-  Token(double data) : data_(data) {}
-};
+namespace sg4 = simgrid::s4u;
 
 int main(int argc, char* argv[])
 {
-  simgrid::s4u::Engine e(&argc, argv);
+  sg4::Engine e(&argc, argv);
   e.load_platform(argv[1]);
-  simgrid::plugins::Task::init();
 
   // Retrieve hosts
   auto tremblay = e.host_by_name("Tremblay");
   auto jupiter  = e.host_by_name("Jupiter");
-  auto fafard  = e.host_by_name("Fafard");
+  auto fafard   = e.host_by_name("Fafard");
   auto ginette  = e.host_by_name("Ginette");
   auto bourassa = e.host_by_name("Bourassa");
 
   // Create execution tasks
-  auto SA = simgrid::plugins::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
-  auto SB = simgrid::plugins::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
-  auto B1 = simgrid::plugins::ExecTask::init("B1", 1e8, fafard);
-  auto B2 = simgrid::plugins::ExecTask::init("B2", 1e8, ginette);
-  auto B3 = simgrid::plugins::ExecTask::init("B3", 1e8, bourassa);
-  auto B4 = simgrid::plugins::ExecTask::init("B4", 2e8, bourassa);
+  auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
+  auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
+  auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
+  auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
+  auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
+  auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);
 
   // Create communication tasks
-  auto SA_to_B1 = simgrid::plugins::CommTask::init("SA_to_B1", 0, tremblay, fafard); 
-  auto SA_to_B2 = simgrid::plugins::CommTask::init("SA_to_B2", 0, tremblay, ginette);
-  auto SB_to_B3 = simgrid::plugins::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
+  auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
+  auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
+  auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);
 
   // Create the graph by defining dependencies between tasks
   // Some dependencies are defined dynamically
@@ -80,56 +74,59 @@ int main(int argc, char* argv[])
      Alternatively we: remove/add the link between SA and SA_to_B2
                        add/remove the link between SA and SA_to_B1
   */
-  SA->on_this_start_cb([&](simgrid::plugins::Task* t) {
+  SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) {
     int count = t->get_count();
-    simgrid::plugins::CommTaskPtr comm;
-    if (count % 2 == 0) {
+    sg4::CommTaskPtr comm;
+    if (count % 2 == 1) {
       t->remove_successor(SA_to_B2);
       t->add_successor(SA_to_B1);
       comm = SA_to_B1;
-    }
-    else {
+    } else {
       t->remove_successor(SA_to_B1);
       t->add_successor(SA_to_B2);
       comm = SA_to_B2;
     }
-    std::vector<double> amount = {1e3,1e6,1e9};
+    std::vector<double> amount = {1e9, 1e3, 1e6};
+    // XBT_INFO("Comm %f", amount[count % 3]);
     comm->set_amount(amount[count % 3]);
-    auto token = std::make_shared<Token>(amount[count % 3]);
+    auto token = std::make_shared<sg4::Token>();
+    token->set_data(new double(amount[count % 3]));
     t->set_token(token);
   });
 
   // The token sent by SA is forwarded by both communication tasks
-  SA_to_B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    t->set_token(t->get_tokens()[SA]);
+  SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
   });
-  SA_to_B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    t->set_token(t->get_tokens()[SA]);
+  SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) {
+    t->set_token(t->get_token_from(SA));
+    t->deque_token_from(SA);
   });
 
   /* B1 and B2 read the value of the token received by their predecessors
      and use it to adapt their amount of work to do.
-  */ 
-  B1->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    auto tokens_map = t->get_tokens();
-    Token* tok = (Token*)(tokens_map[SA_to_B1].get());
-    t->set_amount(tok->data_ * 10);
+  */
+  B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B1)->get_data<double>();
+    t->deque_token_from(SA_to_B1);
+    t->set_amount(*data * 10);
+    delete data;
   });
-  B2->on_this_start_cb([&](simgrid::plugins::Task* t) {
-    auto tokens_map = t->get_tokens();
-    Token* tok = (Token*)(tokens_map[SA_to_B2].get());
-    t->set_amount(tok->data_ * 10);
+  B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) {
+    auto data = t->get_token_from(SA_to_B2)->get_data<double>();
+    t->deque_token_from(SA_to_B2);
+    t->set_amount(*data * 10);
+    delete data;
   });
 
-  // Enqueue executions for tasks without predecessors
-  SA->enqueue_execs(5);
-  SB->enqueue_execs(5);
+  // Enqueue firings for tasks without predecessors
+  SA->enqueue_firings(5);
+  SB->enqueue_firings(5);
 
   // Add a function to be called when tasks end for log purpose
-  simgrid::plugins::Task::on_end_cb([]
-  (const simgrid::plugins::Task* t) {
-    XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count());
-  });
+  sg4::Task::on_completion_cb(
+      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });
 
   // Start the simulation
   e.run();