Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New function: Engine::track_vetoed_activities()
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 15 Dec 2021 14:51:29 +0000 (15:51 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Wed, 15 Dec 2021 14:51:29 +0000 (15:51 +0100)
ChangeLog
examples/cpp/CMakeLists.txt
examples/cpp/dag-simple/s4u-dag-simple.cpp [new file with mode: 0644]
examples/cpp/dag-simple/s4u-dag-simple.tesh [new file with mode: 0644]
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Engine.hpp
include/simgrid/s4u/Exec.hpp
src/kernel/EngineImpl.cpp
src/kernel/activity/ExecImpl.hpp
src/s4u/s4u_Activity.cpp
src/s4u/s4u_Engine.cpp

index 9ef1ed5..37d20a1 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -5,6 +5,9 @@ SimGrid (3.29.1) NOT RELEASED YET (v3.30 expected December 21. 2021, 15:59 UTC)
 S4U:
  - New function: Engine::run_until(date), to split the simulation.
  - New signal: Activity::on_veto, to detect when an activity fails to start.
+ - New function: Engine::track_vetoed_activities() to interrupt run()
+   when an activity fails to start, and to keep track of such activities.
+   Please see the corresponding example for more info.
 
 SMPI:
  - Dynamic costs for MPI operations: New API to allow users to dynamically
index 6c372ef..3253bf4 100644 (file)
@@ -66,6 +66,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                  comm-pingpong comm-ready comm-serialize comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil
                  comm-dependent comm-host2host comm-failure
                  cloud-capping cloud-migration cloud-simple
+                 dag-simple
                  dht-chord dht-kademlia
                  energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi
                  engine-filtering engine-run-partial
diff --git a/examples/cpp/dag-simple/s4u-dag-simple.cpp b/examples/cpp/dag-simple/s4u-dag-simple.cpp
new file mode 100644 (file)
index 0000000..bbcfe64
--- /dev/null
@@ -0,0 +1,80 @@
+/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <vector>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example");
+
+int main(int argc, char* argv[])
+{
+  simgrid::s4u::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+  std::set<simgrid::s4u::Activity*> vetoed;
+  e.track_vetoed_activities(&vetoed);
+
+  auto fafard = e.host_by_name("Fafard");
+
+  // Display the details on vetoed activities
+  simgrid::s4u::Activity::on_veto.connect([&e](simgrid::s4u::Activity& a) {
+    auto& exec = static_cast<simgrid::s4u::Exec&>(a); // all activities are execs in this example
+
+    XBT_INFO("Activity '%s' vetoed. Dependencies: %s; Ressources: %s", exec.get_cname(),
+             (exec.dependencies_solved() ? "solved" : "NOT solved"),
+             (exec.is_assigned() ? "assigned" : "NOT assigned"));
+  });
+
+  // Define an amount of work that should take 1 second to execute.
+  double computation_amount = fafard->get_speed();
+
+  // Create a small DAG: Two parents and a child
+  simgrid::s4u::ExecPtr first_parent  = simgrid::s4u::Exec::init();
+  simgrid::s4u::ExecPtr second_parent = simgrid::s4u::Exec::init();
+  simgrid::s4u::ExecPtr child         = simgrid::s4u::Exec::init();
+  first_parent->add_successor(child);
+  second_parent->add_successor(child);
+
+  /*
+    std::vector<simgrid::s4u::ExecPtr> pending_execs;
+    pending_execs.push_back(first_parent);
+    pending_execs.push_back(second_parent);
+    pending_execs.push_back(child);
+  */
+
+  // Set the parameters (the name is for logging purposes only)
+  // + First parent ends after 1 second and the Second parent after 2 seconds.
+  first_parent->set_name("parent 1")->set_flops_amount(computation_amount);
+  second_parent->set_name("parent 2")->set_flops_amount(2 * computation_amount);
+  child->set_name("child")->set_flops_amount(computation_amount);
+
+  // Only the parents are scheduled so far
+  first_parent->set_host(fafard);
+  second_parent->set_host(fafard);
+
+  // Start all activities that can actually start.
+  first_parent->vetoable_start();
+  second_parent->vetoable_start();
+  child->vetoable_start();
+
+  while (child->get_state() != simgrid::s4u::Activity::State::FINISHED) {
+    e.run();
+    for (auto* a : vetoed) {
+      auto* exec = static_cast<simgrid::s4u::Exec*>(a);
+
+      // In this simple case, we just assign the child task to a resource when its dependencies are solved
+      if (exec->dependencies_solved() && not exec->is_assigned()) {
+        XBT_INFO("Activity %s's dependencies are resolved. Let's assign it to Fafard.", exec->get_cname());
+        exec->set_host(fafard);
+      } else {
+        XBT_INFO("Activity %s not ready.", exec->get_cname());
+      }
+    }
+    vetoed.clear(); // DON'T FORGET TO CLEAR this set between two calls to run
+  }
+
+  XBT_INFO("Simulation time %g", simgrid::s4u::Engine::get_clock());
+
+  return 0;
+}
diff --git a/examples/cpp/dag-simple/s4u-dag-simple.tesh b/examples/cpp/dag-simple/s4u-dag-simple.tesh
new file mode 100644 (file)
index 0000000..8594c1b
--- /dev/null
@@ -0,0 +1,13 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-dag-simple ${platfdir}/small_platform.xml --log=s4u_activity.t:verbose "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [  0.000000] (0:maestro@) 'parent 1' is assigned to a resource and all dependencies are solved. Let's start
+> [  0.000000] (0:maestro@) 'parent 2' is assigned to a resource and all dependencies are solved. Let's start
+> [  0.000000] (0:maestro@) Activity 'child' vetoed. Dependencies: NOT solved; Ressources: NOT assigned
+> [  2.000000] (0:maestro@) Remove a dependency from 'parent 1' on 'child'
+> [  2.000000] (0:maestro@) Activity child not ready.
+> [  3.000000] (0:maestro@) Remove a dependency from 'parent 2' on 'child'
+> [  3.000000] (0:maestro@) Activity 'child' vetoed. Dependencies: solved; Ressources: NOT assigned
+> [  3.000000] (0:maestro@) Activity child's dependencies are resolved. Let's assign it to Fafard.
+> [  3.000000] (0:maestro@) 'child' is assigned to a resource and all dependencies are solved. Let's start
+> [  4.000000] (0:maestro@) Simulation time 4
index ff42c98..b7d3594 100644 (file)
@@ -88,6 +88,8 @@ protected:
       throw std::invalid_argument("Dependency does not exist. Can not be removed.");
   }
 
+  static std::set<Activity*>* vetoed_activities_;
+
 public:
   /*! Signal fired each time that the activity fails to start because of a veto (e.g., unsolved dependency or no
    * resource assigned) */
@@ -100,10 +102,15 @@ public:
       XBT_CVERB(s4u_activity, "'%s' is assigned to a resource and all dependencies are solved. Let's start", get_cname());
       start();
     } else {
+      if (vetoed_activities_ != nullptr)
+        vetoed_activities_->insert(this);
       on_veto(*this);
     }
   }
 
+  static std::set<Activity*>* get_vetoed_activities() { return vetoed_activities_; }
+  static void set_vetoed_activities(std::set<Activity*>* whereto) { vetoed_activities_ = whereto; }
+
 #ifndef DOXYGEN
   Activity(Activity const&) = delete;
   Activity& operator=(Activity const&) = delete;
index 778bc84..0703c0c 100644 (file)
@@ -13,6 +13,7 @@
 #include <simgrid/kernel/resource/Model.hpp>
 #include <simgrid/s4u/NetZone.hpp>
 
+#include <set>
 #include <string>
 #include <utility>
 #include <vector>
@@ -83,6 +84,9 @@ public:
     register_function(name, code_factory);
   }
 
+  /** If non-null, the provided set will be filled with all activities that fail to start because of a veto */
+  void track_vetoed_activities(std::set<Activity*>* vetoed_activities);
+
   void load_deployment(const std::string& deploy) const;
 
 protected:
index e6b18d0..643d0d4 100644 (file)
@@ -32,6 +32,7 @@ namespace s4u {
 class XBT_PUBLIC Exec : public Activity_T<Exec> {
 #ifndef DOXYGEN
   friend kernel::activity::ExecImpl;
+  friend kernel::EngineImpl; // Auto-completes the execs of maestro (in simDAG)
 #endif
 
   bool parallel_ = false;
index 6b08aa1..547a532 100644 (file)
@@ -415,16 +415,31 @@ void EngineImpl::wake_all_waiting_actors() const
     XBT_DEBUG("Handling the failed actions (if any)");
     while (auto* action = model->extract_failed_action()) {
       XBT_DEBUG("   Handling Action %p", action);
-      if (action->get_activity() != nullptr)
+      if (action->get_activity() != nullptr) {
+        // If nobody told the interface that the activity has failed, that's because no actor waits on it (maestro
+        // started it). SimDAG I see you!
+        auto* exec = dynamic_cast<activity::ExecImpl*>(action->get_activity());
+        if (exec != nullptr && exec->get_actor() == maestro_)
+          exec->get_iface()->complete(s4u::Activity::State::FAILED);
+
         activity::ActivityImplPtr(action->get_activity())->post();
+      }
     }
     XBT_DEBUG("Handling the terminated actions (if any)");
     while (auto* action = model->extract_done_action()) {
       XBT_DEBUG("   Handling Action %p", action);
       if (action->get_activity() == nullptr)
         XBT_DEBUG("probably vcpu's action %p, skip", action);
-      else
+      else {
+        // If nobody told the interface that the activity is finished, that's because no actor waits on it (maestro
+        // started it). SimDAG I see you!
+        // TODO: do the same for other activity kinds once comms are cleaned up
+        auto* exec = dynamic_cast<activity::ExecImpl*>(action->get_activity());
+        if (exec != nullptr && exec->get_actor() == maestro_)
+          exec->get_iface()->complete(s4u::Activity::State::FINISHED);
+
         activity::ActivityImplPtr(action->get_activity())->post();
+      }
     }
   }
 }
@@ -681,6 +696,7 @@ void EngineImpl::run(double max_date)
   }
 
   double elapsed_time = -1;
+  std::set<s4u::Activity*>* vetoed_activities = s4u::Activity::get_vetoed_activities();
 
   do {
     XBT_DEBUG("New Schedule Round; size(queue)=%zu", actors_to_run_.size());
@@ -793,13 +809,9 @@ void EngineImpl::run(double max_date)
       next_time = std::min(next_time, max_date);
     }
 
-    if (next_time > -1.0 || not actor_list_.empty()) {
-      XBT_DEBUG("Calling solve(%g) %g", next_time, NOW);
-      elapsed_time = solve(next_time);
-      XBT_DEBUG("Moving time ahead. NOW=%g; elapsed: %g", NOW, elapsed_time);
-    } else {
-      elapsed_time = -1;
-    }
+    XBT_DEBUG("Calling solve(%g) %g", next_time, NOW);
+    elapsed_time = solve(next_time);
+    XBT_DEBUG("Moving time ahead. NOW=%g; elapsed: %g", NOW, elapsed_time);
 
     /* Notify all the hosts that have failed */
     /* FIXME: iterate through the list of failed host and mark each of them */
@@ -817,7 +829,8 @@ void EngineImpl::run(double max_date)
     /* Clean actors to destroy */
     empty_trash();
 
-    XBT_DEBUG("### elapsed time %f, #actors %zu, #to_run %zu", elapsed_time, actor_list_.size(), actors_to_run_.size());
+    XBT_DEBUG("### elapsed time %f, #actors %zu, #to_run %zu, #vetoed %d", elapsed_time, actor_list_.size(),
+              actors_to_run_.size(), (vetoed_activities == nullptr ? -1 : static_cast<int>(vetoed_activities->size())));
 
     if (elapsed_time < 0. && actors_to_run_.empty() && not actor_list_.empty()) {
       if (actor_list_.size() <= daemons_.size()) {
@@ -833,9 +846,11 @@ void EngineImpl::run(double max_date)
         maestro_->kill(kv.second);
       }
     }
-  } while ((elapsed_time > -1.0 && not double_equals(max_date, NOW, 0.00001)) || has_actors_to_run());
 
-  if (not actor_list_.empty() && max_date < 0)
+  } while ((vetoed_activities == nullptr || vetoed_activities->empty()) &&
+           ((elapsed_time > -1.0 && not double_equals(max_date, NOW, 0.00001)) || has_actors_to_run()));
+
+  if (not actor_list_.empty() && max_date < 0 && not(vetoed_activities == nullptr || vetoed_activities->empty()))
     THROW_IMPOSSIBLE;
 
   simgrid::s4u::Engine::on_simulation_end();
index ac8b8c8..3022827 100644 (file)
@@ -32,6 +32,7 @@ class XBT_PUBLIC ExecImpl : public ActivityImpl_T<ExecImpl> {
 public:
   ExecImpl();
   s4u::Exec* get_iface() { return piface_; }
+  actor::ActorImpl* get_actor() { return actor_; }
 
   ExecImpl& set_timeout(double timeout) override;
   ExecImpl& set_bound(double bound);
index 000e1b5..7ed8264 100644 (file)
@@ -21,6 +21,8 @@ namespace s4u {
 
 xbt::signal<void(Activity&)> Activity::on_veto;
 
+std::set<Activity*>* Activity::vetoed_activities_ = nullptr;
+
 void Activity::wait_until(double time_limit)
 {
   double now = Engine::get_clock();
index 67792c9..6d73ea2 100644 (file)
@@ -338,6 +338,11 @@ void Engine::run_until(double max_date) const
   }
 }
 
+void Engine::track_vetoed_activities(std::set<Activity*>* vetoed_activities)
+{
+  Activity::set_vetoed_activities(vetoed_activities);
+}
+
 /** @brief Retrieve the root netzone, containing all others */
 s4u::NetZone* Engine::get_netzone_root() const
 {