Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
migrate actors_to_run and actors_that_ran to EngineImpl
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Fri, 21 May 2021 13:38:56 +0000 (15:38 +0200)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Fri, 21 May 2021 13:39:09 +0000 (15:39 +0200)
src/kernel/EngineImpl.cpp
src/kernel/EngineImpl.hpp
src/kernel/actor/ActorImpl.cpp
src/kernel/context/ContextSwapped.cpp
src/kernel/context/ContextThread.cpp
src/mc/mc_base.cpp
src/simix/smx_global.cpp
src/simix/smx_private.hpp

index b15a4e6..f2ce403 100644 (file)
@@ -41,6 +41,8 @@ EngineImpl::~EngineImpl()
   for (auto const& kv : links_)
     if (kv.second)
       kv.second->destroy();
+  actors_to_run_.clear();
+  actors_that_ran_.clear();
 }
 
 void EngineImpl::load_deployment(const std::string& file) const
@@ -96,6 +98,20 @@ void EngineImpl::wake_all_waiting_actors() const
     }
   }
 }
+/**
+ * @brief Executes the actors in actors_to_run.
+ *
+ * The actors in actors_to_run are run (in parallel if possible). On exit, actors_to_run is empty, and actors_that_ran
+ * contains the list of actors that just ran.  The two lists are swapped so, be careful when using them before and after
+ * a call to this function.
+ */
+void EngineImpl::run_all_actors()
+{
+  simix_global->context_factory->run_all();
+
+  actors_to_run_.swap(actors_that_ran_);
+  actors_to_run_.clear();
+}
 
 /** Execute all the tasks that are queued, e.g. `.then()` callbacks of futures. */
 bool EngineImpl::execute_tasks()
@@ -126,6 +142,22 @@ void EngineImpl::rm_daemon(actor::ActorImpl* actor)
   daemons_.erase(it);
 }
 
+void EngineImpl::add_actor_to_run_list_no_check(actor::ActorImpl* actor)
+{
+  XBT_DEBUG("Inserting [%p] %s(%s) in the to_run list", actor, actor->get_cname(), actor->get_host()->get_cname());
+  actors_to_run_.push_back(actor);
+}
+
+void EngineImpl::add_actor_to_run_list(actor::ActorImpl* actor)
+{
+  if (std::find(begin(actors_to_run_), end(actors_to_run_), actor) != end(actors_to_run_)) {
+    XBT_DEBUG("Actor %s is already in the to_run list", actor->get_cname());
+  } else {
+    XBT_DEBUG("Inserting [%p] %s(%s) in the to_run list", actor, actor->get_cname(), actor->get_host()->get_cname());
+    actors_to_run_.push_back(actor);
+  }
+}
+
 void EngineImpl::run()
 {
   if (MC_record_replay_is_active()) {
@@ -136,7 +168,7 @@ void EngineImpl::run()
   double time = 0;
 
   do {
-    XBT_DEBUG("New Schedule Round; size(queue)=%zu", simix_global->actors_to_run.size());
+    XBT_DEBUG("New Schedule Round; size(queue)=%zu", actors_to_run_.size());
 
     if (cfg_breakpoint >= 0.0 && surf_get_clock() >= cfg_breakpoint) {
       XBT_DEBUG("Breakpoint reached (%g)", cfg_breakpoint.get());
@@ -150,11 +182,11 @@ void EngineImpl::run()
 
     execute_tasks();
 
-    while (not simix_global->actors_to_run.empty()) {
-      XBT_DEBUG("New Sub-Schedule Round; size(queue)=%zu", simix_global->actors_to_run.size());
+    while (not actors_to_run_.empty()) {
+      XBT_DEBUG("New Sub-Schedule Round; size(queue)=%zu", actors_to_run_.size());
 
       /* Run all actors that are ready to run, possibly in parallel */
-      simix_global->run_all_actors();
+      run_all_actors();
 
       /* answer sequentially and in a fixed arbitrary order all the simcalls that were issued during that sub-round */
 
@@ -218,7 +250,7 @@ void EngineImpl::run()
        *   That would thus be a pure waste of time.
        */
 
-      for (auto const& actor : simix_global->actors_that_ran) {
+      for (auto const& actor : actors_that_ran_) {
         if (actor->simcall_.call_ != simix::Simcall::NONE) {
           actor->simcall_handle(0);
         }
@@ -260,10 +292,9 @@ void EngineImpl::run()
     /* Clean actors to destroy */
     simix_global->empty_trash();
 
-    XBT_DEBUG("### time %f, #actors %zu, #to_run %zu", time, simix_global->process_list.size(),
-              simix_global->actors_to_run.size());
+    XBT_DEBUG("### time %f, #actors %zu, #to_run %zu", time, simix_global->process_list.size(), actors_to_run_.size());
 
-    if (time < 0. && simix_global->actors_to_run.empty() && not simix_global->process_list.empty()) {
+    if (time < 0. && actors_to_run_.empty() && not simix_global->process_list.empty()) {
       if (simix_global->process_list.size() <= daemons_.size()) {
         XBT_CRITICAL("Oops! Daemon actors cannot do any blocking activity (communications, synchronization, etc) "
                      "once the simulation is over. Please fix your on_exit() functions.");
@@ -277,7 +308,7 @@ void EngineImpl::run()
         simix_global->maestro_->kill(kv.second);
       }
     }
-  } while (time > -1.0 || not simix_global->actors_to_run.empty());
+  } while (time > -1.0 || has_actors_to_run());
 
   if (not simix_global->process_list.empty())
     THROW_IMPOSSIBLE;
index 1857b85..5095243 100644 (file)
@@ -29,7 +29,9 @@ class EngineImpl {
   std::vector<resource::Model*> models_;
   std::unordered_map<std::string, std::shared_ptr<resource::Model>> models_prio_;
   routing::NetZoneImpl* netzone_root_ = nullptr;
-  std::set<kernel::actor::ActorImpl*> daemons_;
+  std::set<actor::ActorImpl*> daemons_;
+  std::vector<actor::ActorImpl*> actors_to_run_;
+  std::vector<actor::ActorImpl*> actors_that_ran_;
 
   std::vector<xbt::Task<void()>> tasks;
   std::vector<xbt::Task<void()>> tasksTemp;
@@ -70,11 +72,21 @@ public:
   }
   void add_daemon(actor::ActorImpl* d) { daemons_.insert(d); }
   void rm_daemon(actor::ActorImpl* d);
+  void add_actor_to_run_list(actor::ActorImpl* a);
+  void add_actor_to_run_list_no_check(actor::ActorImpl* a);
+  bool has_actors_to_run() { return not actors_to_run_.empty(); }
+  const actor::ActorImpl* get_first_actor_to_run() const { return actors_to_run_.front(); }
+  const actor::ActorImpl* get_actor_to_run_at(unsigned long int i) const { return actors_to_run_[i]; }
+  unsigned long int get_actor_to_run_count() { return actors_to_run_.size(); }
+
+  const std::vector<actor::ActorImpl*>& get_actors_to_run() const { return actors_to_run_; }
+  const std::vector<actor::ActorImpl*>& get_actors_that_ran() const { return actors_that_ran_; }
 
   bool execute_tasks();
   void add_task(xbt::Task<void()>&& t) { tasks.push_back(std::move(t)); }
   void wake_all_waiting_actors() const;
   void display_all_actor_status() const;
+  void run_all_actors();
 
   /** @brief Run the main simulation loop. */
   void run();
index 5c9ee3b..fe6c717 100644 (file)
@@ -116,8 +116,7 @@ ActorImplPtr ActorImpl::attach(const std::string& name, void* data, s4u::Host* h
 
   /* Now insert it in the global actor list and in the actors to run list */
   simix_global->process_list[actor->get_pid()] = actor;
-  XBT_DEBUG("Inserting [%p] %s(%s) in the to_run list", actor, actor->get_cname(), host->get_cname());
-  simix_global->actors_to_run.push_back(actor);
+  EngineImpl::get_instance()->add_actor_to_run_list_no_check(actor);
   intrusive_ptr_add_ref(actor);
 
   auto* context = dynamic_cast<context::AttachContext*>(actor->context_.get());
@@ -255,13 +254,8 @@ void ActorImpl::kill(ActorImpl* actor) const
 
   if (actor == this) {
     XBT_DEBUG("Go on, this is a suicide,");
-  } else if (std::find(begin(simix_global->actors_to_run), end(simix_global->actors_to_run), actor) !=
-             end(simix_global->actors_to_run)) {
-    XBT_DEBUG("Actor %s is already in the to_run list", actor->get_cname());
-  } else {
-    XBT_DEBUG("Inserting %s in the to_run list", actor->get_cname());
-    simix_global->actors_to_run.push_back(actor);
-  }
+  } else
+    EngineImpl::get_instance()->add_actor_to_run_list(actor);
 }
 
 void ActorImpl::kill_all() const
@@ -395,7 +389,7 @@ void ActorImpl::resume()
   for (auto const& activity : activities_)
     activity->resume();
   if (not waiting_synchro_) // Reschedule the actor if it was forcefully unscheduled in yield()
-    simix_global->actors_to_run.push_back(this);
+    EngineImpl::get_instance()->add_actor_to_run_list_no_check(this);
 
   XBT_OUT();
 }
@@ -442,11 +436,12 @@ void ActorImpl::simcall_answer()
     XBT_DEBUG("Answer simcall %s issued by %s (%p)", SIMIX_simcall_name(simcall_), get_cname(), this);
     xbt_assert(simcall_.call_ != simix::Simcall::NONE);
     simcall_.call_ = simix::Simcall::NONE;
+    auto* engine              = EngineImpl::get_instance();
+    const auto& actors_to_run = engine->get_actors_to_run();
     xbt_assert(not XBT_LOG_ISENABLED(simix_process, xbt_log_priority_debug) ||
-                   std::find(begin(simix_global->actors_to_run), end(simix_global->actors_to_run), this) ==
-                       end(simix_global->actors_to_run),
+                   std::find(begin(actors_to_run), end(actors_to_run), this) == end(actors_to_run),
                "Actor %p should not exist in actors_to_run!", this);
-    simix_global->actors_to_run.push_back(this);
+    engine->add_actor_to_run_list_no_check(this);
   }
 }
 
@@ -490,8 +485,7 @@ ActorImpl* ActorImpl::start(const ActorCode& code)
   simix_global->process_list[pid_] = this;
 
   /* Now insert it in the global actor list and in the actor to run list */
-  XBT_DEBUG("Inserting [%p] %s(%s) in the to_run list", this, get_cname(), host_->get_cname());
-  simix_global->actors_to_run.push_back(this);
+  EngineImpl::get_instance()->add_actor_to_run_list_no_check(this);
 
   return this;
 }
index 177c54a..65d170f 100644 (file)
@@ -6,6 +6,7 @@
 #include "simgrid/Exception.hpp"
 #include "simgrid/modelchecker.h"
 #include "src/internal_config.h"
+#include "src/kernel/EngineImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/simix/smx_private.hpp"
 #include "xbt/parmap.hpp"
@@ -206,6 +207,7 @@ void SwappedContext::swap_into(SwappedContext* to)
 /** Maestro wants to run all ready actors */
 void SwappedContextFactory::run_all()
 {
+  auto* engine = EngineImpl::get_instance();
   /* This function is called by maestro at the beginning of a scheduling round to get all working threads executing some
    * stuff It is much easier to understand what happens if you see the working threads as bodies that swap their soul
    * for the ones of the simulated processes that must run.
@@ -223,17 +225,17 @@ void SwappedContextFactory::run_all()
     //     It only yields back to worker_context when the work array is exhausted.
     //   - So, resume() is only launched from the parmap for the first job of each minion.
     parmap_->apply(
-        [](const actor::ActorImpl* process) {
-          auto* context = static_cast<SwappedContext*>(process->context_.get());
+        [](const actor::ActorImpl* actor) {
+          auto* context = static_cast<SwappedContext*>(actor->context_.get());
           context->resume();
         },
-        simix_global->actors_to_run);
+        engine->get_actors_to_run());
   } else { // sequential execution
-    if (simix_global->actors_to_run.empty())
+    if (not engine->has_actors_to_run())
       return;
 
     /* maestro is already saved in the first slot of workers_context_ */
-    const actor::ActorImpl* first_actor = simix_global->actors_to_run.front();
+    const actor::ActorImpl* first_actor = engine->get_first_actor_to_run();
     process_index_          = 1;
     /* execute the first actor; it will chain to the others when using suspend() */
     static_cast<SwappedContext*>(first_actor->context_.get())->resume();
@@ -286,14 +288,15 @@ void SwappedContext::suspend()
       // When given that soul, the body will wait for the next scheduling round
     }
   } else { // sequential execution
+    auto* engine = EngineImpl::get_instance();
     /* determine the next context */
     unsigned long int i = factory_.process_index_;
     factory_.process_index_++;
 
-    if (i < simix_global->actors_to_run.size()) {
+    if (i < engine->get_actor_to_run_count()) {
       /* Actually swap into the next actor directly without transiting to maestro */
       XBT_DEBUG("Run next actor");
-      next_context = static_cast<SwappedContext*>(simix_global->actors_to_run[i]->context_.get());
+      next_context = static_cast<SwappedContext*>(engine->get_actor_to_run_at(i)->context_.get());
     } else {
       /* all processes were run, actually return to maestro */
       XBT_DEBUG("No more actors to run");
index 2a3b815..7b25fc6 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "simgrid/Exception.hpp"
 #include "src/internal_config.h" /* loads context system definitions */
+#include "src/kernel/EngineImpl.hpp"
 #include "src/simix/smx_private.hpp"
 #include "xbt/function_types.h"
 #include "xbt/xbt_modinter.h" /* prototype of os thread module's init/exit in XBT */
@@ -177,7 +178,8 @@ void ThreadContext::attach_stop()
 
 void SerialThreadContext::run_all()
 {
-  for (smx_actor_t const& actor : simix_global->actors_to_run) {
+  const auto& to_run = EngineImpl::get_instance()->get_actors_to_run();
+  for (smx_actor_t const& actor : to_run) {
     XBT_DEBUG("Handling %p", actor);
     auto* context = static_cast<ThreadContext*>(actor->context_.get());
     context->release();
@@ -202,9 +204,11 @@ void ParallelThreadContext::finalize()
 
 void ParallelThreadContext::run_all()
 {
-  for (smx_actor_t const& actor : simix_global->actors_to_run)
+  const auto& to_release = EngineImpl::get_instance()->get_actors_to_run();
+  for (smx_actor_t const& actor : to_release)
     static_cast<ThreadContext*>(actor->context_.get())->release();
-  for (smx_actor_t const& actor : simix_global->actors_to_run)
+  const auto& to_wait = EngineImpl::get_instance()->get_actors_to_run();
+  for (smx_actor_t const& actor : to_wait)
     static_cast<ThreadContext*>(actor->context_.get())->wait();
 }
 
index a2aee1d..7c83510 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "src/mc/mc_base.hpp"
 #include "mc/mc.h"
+#include "src/kernel/EngineImpl.hpp"
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/activity/MutexImpl.hpp"
 #include "src/kernel/actor/SimcallObserver.hpp"
@@ -42,15 +43,16 @@ namespace mc {
 
 void execute_actors()
 {
+  auto* engine = kernel::EngineImpl::get_instance();
 #if SIMGRID_HAVE_MC
   xbt_assert(mc_model_checker == nullptr, "This must be called from the client");
 #endif
-  while (not simix_global->actors_to_run.empty()) {
-    simix_global->run_all_actors();
-    for (smx_actor_t const& process : simix_global->actors_that_ran) {
-      const s_smx_simcall* req = &process->simcall_;
+  while (engine->has_actors_to_run()) {
+    engine->run_all_actors();
+    for (auto const& actor : engine->get_actors_that_ran()) {
+      const s_smx_simcall* req = &actor->simcall_;
       if (req->call_ != simix::Simcall::NONE && not simgrid::mc::request_is_visible(req))
-        process->simcall_handle(0);
+        actor->simcall_handle(0);
     }
   }
 #if SIMGRID_HAVE_MC
index 7d9d47b..9281df0 100644 (file)
@@ -161,20 +161,6 @@ void Global::empty_trash()
   xbt_dynar_reset(dead_actors_vector);
 #endif
 }
-/**
- * @brief Executes the actors in actors_to_run.
- *
- * The actors in actors_to_run are run (in parallel if possible). On exit, actors_to_run is empty, and actors_that_ran
- * contains the list of actors that just ran.  The two lists are swapped so, be careful when using them before and after
- * a call to this function.
- */
-void Global::run_all_actors()
-{
-  simix_global->context_factory->run_all();
-
-  actors_to_run.swap(actors_that_ran);
-  actors_to_run.clear();
-}
 
 void Global::display_all_actor_status() const
 {
@@ -228,34 +214,36 @@ void SIMIX_set_maestro(void (*code)(void*), void* data)
 
 void SIMIX_global_init(int* argc, char** argv)
 {
-  if (simix_global == nullptr) {
-    simix_global = std::make_unique<simgrid::simix::Global>();
+  if (simix_global != nullptr)
+    return;
+
+  simix_global = std::make_unique<simgrid::simix::Global>();
 
 #if SIMGRID_HAVE_MC
-    // The communication initialization is done ASAP, as we need to get some init parameters from the MC for different layers.
-    // But simix_global needs to be created, as we send the address of some of its fields to the MC that wants to read them directly.
-    simgrid::mc::AppSide::initialize();
+  // The communication initialization is done ASAP, as we need to get some init parameters from the MC for different
+  // layers. But simix_global needs to be created, as we send the address of some of its fields to the MC that wants to
+  // read them directly.
+  simgrid::mc::AppSide::initialize();
 #endif
 
-    surf_init(argc, argv); /* Initialize SURF structures */
+  surf_init(argc, argv); /* Initialize SURF structures */
 
-    simix_global->maestro_ = nullptr;
-    SIMIX_context_mod_init();
+  simix_global->maestro_ = nullptr;
+  SIMIX_context_mod_init();
 
-    // Either create a new context with maestro or create
-    // a context object with the current context maestro):
-    simgrid::kernel::actor::create_maestro(maestro_code);
+  // Either create a new context with maestro or create
+  // a context object with the current context maestro):
+  simgrid::kernel::actor::create_maestro(maestro_code);
 
-    /* Prepare to display some more info when dying on Ctrl-C pressing */
-    std::signal(SIGINT, inthandler);
+  /* Prepare to display some more info when dying on Ctrl-C pressing */
+  std::signal(SIGINT, inthandler);
 
 #ifndef _WIN32
-    install_segvhandler();
+  install_segvhandler();
 #endif
-    /* register a function to be called by SURF after the environment creation */
-    sg_platf_init();
-    simgrid::s4u::Engine::on_platform_created.connect(surf_presolve);
-  }
+  /* register a function to be called by SURF after the environment creation */
+  sg_platf_init();
+  simgrid::s4u::Engine::on_platform_created.connect(surf_presolve);
 
   if (simgrid::config::get_value<bool>("debug/clean-atexit"))
     atexit(SIMIX_clean);
@@ -275,7 +263,8 @@ void SIMIX_clean()
 
   smx_cleaned = true;
   XBT_DEBUG("SIMIX_clean called. Simulation's over.");
-  if (not simix_global->actors_to_run.empty() && SIMIX_get_clock() <= 0.0) {
+  auto* engine = simgrid::kernel::EngineImpl::get_instance();
+  if (engine->has_actors_to_run() && SIMIX_get_clock() <= 0.0) {
     XBT_CRITICAL("   ");
     XBT_CRITICAL("The time is still 0, and you still have processes ready to run.");
     XBT_CRITICAL("It seems that you forgot to run the simulation that you setup.");
@@ -295,7 +284,7 @@ void SIMIX_clean()
 
   /* Kill all processes (but maestro) */
   simix_global->maestro_->kill_all();
-  simix_global->run_all_actors();
+  engine->run_all_actors();
   simix_global->empty_trash();
 
   /* Exit the SIMIX network module */
@@ -306,8 +295,6 @@ void SIMIX_clean()
     simgrid::kernel::timer::kernel_timers().pop();
   }
   /* Free the remaining data structures */
-  simix_global->actors_to_run.clear();
-  simix_global->actors_that_ran.clear();
   simix_global->actors_to_destroy.clear();
   simix_global->process_list.clear();
 
index 227c83f..ddf9816 100644 (file)
@@ -28,12 +28,9 @@ public:
    * Should be called some time to time to free the memory allocated for actors that have finished (or killed).
    */
   void empty_trash();
-  void run_all_actors();
   void display_all_actor_status() const;
 
   kernel::context::ContextFactory* context_factory = nullptr;
-  std::vector<kernel::actor::ActorImpl*> actors_to_run;
-  std::vector<kernel::actor::ActorImpl*> actors_that_ran;
   std::map<aid_t, kernel::actor::ActorImpl*> process_list;
   boost::intrusive::list<kernel::actor::ActorImpl,
                          boost::intrusive::member_hook<kernel::actor::ActorImpl, boost::intrusive::list_member_hook<>,