Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[simix] Execute future callbacks in the main loop
authorGabriel Corona <gabriel.corona@loria.fr>
Fri, 24 Jun 2016 11:12:06 +0000 (13:12 +0200)
committerGabriel Corona <gabriel.corona@loria.fr>
Tue, 5 Jul 2016 09:46:33 +0000 (11:46 +0200)
By avoiding to execute them directly, we avoid problems such as stack
overflows, resource acquisition problems, etc.

include/simgrid/kernel/future.hpp
src/kernel/future.cpp [new file with mode: 0644]
src/simix/smx_global.cpp
src/simix/smx_private.h
tools/cmake/DefinePackages.cmake

index a159b6b..b3abf96 100644 (file)
@@ -17,6 +17,7 @@
 
 #include <xbt/base.h>
 #include <xbt/functional.hpp>
+#include <xbt/future.hpp>
 
 namespace simgrid {
 namespace kernel {
@@ -47,6 +48,8 @@ public:
   FutureStateBase(FutureStateBase const&) = delete;
   FutureStateBase& operator=(FutureStateBase const&) = delete;
 
+  void schedule(simgrid::xbt::Task<void()>&& job);
+
   void set_exception(std::exception_ptr exception)
   {
     xbt_assert(exception_ == nullptr);
@@ -56,7 +59,7 @@ public:
     this->set_ready();
   }
 
-  void set_continuation(simgrid::xbt::Task<void()> continuation)
+  void set_continuation(simgrid::xbt::Task<void()>&& continuation)
   {
     xbt_assert(!continuation_);
     switch (status_) {
@@ -68,7 +71,7 @@ public:
     case FutureStatus::ready:
       // The future is ready, execute the continuation directly.
       // We might execute it from the event loop instead:
-      continuation();
+      schedule(std::move(continuation));
       break;
     case FutureStatus::not_ready:
       // The future is not ready so we mast keep the continuation for
@@ -103,7 +106,7 @@ protected:
       // We need to do this becase the current implementation of the
       // continuation has a shared_ptr to the FutureState.
       auto continuation = std::move(continuation_);
-      continuation();
+      this->schedule(std::move(continuation));
     }
   }
 
diff --git a/src/kernel/future.cpp b/src/kernel/future.cpp
new file mode 100644 (file)
index 0000000..4252161
--- /dev/null
@@ -0,0 +1,22 @@
+/* Copyright (c) 2016. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <xbt/functional.hpp>
+
+#include <simgrid/kernel/future.hpp>
+
+#include "src/simix/smx_private.h"
+
+namespace simgrid {
+namespace kernel {
+
+void FutureStateBase::schedule(simgrid::xbt::Task<void()>&& job)
+{
+  simix_global->tasks.push_back(std::move(job));
+}
+
+}
+}
index 90a2f88..1e04e86 100644 (file)
@@ -349,6 +349,76 @@ static int process_syscall_color(void *p)
   }
 }
 
+/** Wake up all processes waiting for a Surf action to finish */
+static void SIMIX_wake_processes()
+{
+  unsigned int iter;
+  surf_model_t model;
+  surf_action_t action;
+
+  xbt_dynar_foreach(all_existing_models, iter, model) {
+    XBT_DEBUG("Handling process whose action failed");
+    while ((action = surf_model_extract_failed_action_set(model))) {
+      XBT_DEBUG("   Handling Action %p",action);
+      SIMIX_simcall_exit((smx_synchro_t) action->getData());
+    }
+    XBT_DEBUG("Handling process whose action terminated normally");
+    while ((action = surf_model_extract_done_action_set(model))) {
+      XBT_DEBUG("   Handling Action %p",action);
+      if (action->getData() == nullptr)
+        XBT_DEBUG("probably vcpu's action %p, skip", action);
+      else
+        SIMIX_simcall_exit((smx_synchro_t) action->getData());
+    }
+  }
+}
+
+/** Handle any pending timer */
+static bool SIMIX_execute_timers()
+{
+  bool result = false;
+  while (xbt_heap_size(simix_timers) > 0 && SIMIX_get_clock() >= SIMIX_timer_next()) {
+    result = true;
+     //FIXME: make the timers being real callbacks
+     // (i.e. provide dispatchers that read and expand the args)
+     smx_timer_t timer = (smx_timer_t) xbt_heap_pop(simix_timers);
+     try {
+       timer->callback();
+     }
+     catch(...) {
+       xbt_die("Exception throwed ouf of timer callback");
+     }
+     delete timer;
+  }
+  return result;
+}
+
+/** Execute all the tasks that are queued
+ *
+ *  e.g. `.then()` callbacks of futures.
+ **/
+static bool SIMIX_execute_tasks()
+{
+  xbt_assert(simix_global->tasksTemp.empty());
+
+  if (simix_global->tasks.empty())
+    return false;
+
+  using std::swap;
+  do {
+    // We don't want the callbacks to modify the vector we are iterating over:
+    swap(simix_global->tasks, simix_global->tasksTemp);
+
+    // Execute all the queued tasks:
+    for (auto& task : simix_global->tasksTemp)
+      task();
+
+    simix_global->tasksTemp.clear();
+  } while (!simix_global->tasks.empty());
+
+  return true;
+}
+
 /**
  * \ingroup SIMIX_API
  * \brief Run the main simulation loop.
@@ -362,14 +432,13 @@ void SIMIX_run(void)
 
   double time = 0;
   smx_process_t process;
-  surf_action_t action;
-  smx_timer_t timer;
-  surf_model_t model;
-  unsigned int iter;
 
   do {
     XBT_DEBUG("New Schedule Round; size(queue)=%lu",
         xbt_dynar_length(simix_global->process_to_run));
+
+    SIMIX_execute_tasks();
+
     while (!xbt_dynar_is_empty(simix_global->process_to_run)) {
       XBT_DEBUG("New Sub-Schedule Round; size(queue)=%lu",
               xbt_dynar_length(simix_global->process_to_run));
@@ -435,27 +504,18 @@ void SIMIX_run(void)
        *   That would thus be a pure waste of time.
        */
 
+      unsigned int iter;
       xbt_dynar_foreach(simix_global->process_that_ran, iter, process) {
         if (process->simcall.call != SIMCALL_NONE) {
           SIMIX_simcall_handle(&process->simcall, 0);
         }
       }
-      /* Wake up all processes waiting for a Surf action to finish */
-      xbt_dynar_foreach(all_existing_models, iter, model) {
-        XBT_DEBUG("Handling process whose action failed");
-        while ((action = surf_model_extract_failed_action_set(model))) {
-          XBT_DEBUG("   Handling Action %p",action);
-          SIMIX_simcall_exit((smx_synchro_t) action->getData());
-        }
-        XBT_DEBUG("Handling process whose action terminated normally");
-        while ((action = surf_model_extract_done_action_set(model))) {
-          XBT_DEBUG("   Handling Action %p",action);
-          if (action->getData() == nullptr)
-            XBT_DEBUG("probably vcpu's action %p, skip", action);
-          else
-            SIMIX_simcall_exit((smx_synchro_t) action->getData());
-        }
-      }
+
+      SIMIX_execute_tasks();
+      do {
+        SIMIX_wake_processes();
+      } while (SIMIX_execute_tasks());
+
     }
 
     time = SIMIX_timer_next();
@@ -464,43 +524,23 @@ void SIMIX_run(void)
       time = surf_solve(time);
       XBT_DEBUG("Moving time ahead : %g", time);
     }
+
     /* Notify all the hosts that have failed */
     /* FIXME: iterate through the list of failed host and mark each of them */
     /* as failed. On each host, signal all the running processes with host_fail */
 
-    /* Handle any pending timer */
-    while (xbt_heap_size(simix_timers) > 0 && SIMIX_get_clock() >= SIMIX_timer_next()) {
-       //FIXME: make the timers being real callbacks
-       // (i.e. provide dispatchers that read and expand the args)
-       timer = (smx_timer_t) xbt_heap_pop(simix_timers);
-       try {
-         timer->callback();
-       }
-       catch(...) {
-         xbt_die("Exception throwed ouf of timer callback");
-       }
-       delete timer;
-    }
-
-    /* Wake up all processes waiting for a Surf action to finish */
-    xbt_dynar_foreach(all_existing_models, iter, model) {
-      XBT_DEBUG("Handling process whose action failed");
-      while ((action = surf_model_extract_failed_action_set(model))) {
-        XBT_DEBUG("   Handling Action %p",action);
-        SIMIX_simcall_exit((smx_synchro_t) action->getData());
-      }
-      XBT_DEBUG("Handling process whose action terminated normally");
-      while ((action = surf_model_extract_done_action_set(model))) {
-        XBT_DEBUG("   Handling Action %p",action);
-        if (action->getData() == nullptr)
-          XBT_DEBUG("probably vcpu's action %p, skip", action);
-        else
-          SIMIX_simcall_exit((smx_synchro_t) action->getData());
-      }
-    }
+    // Execute timers and tasks until there isn't anything to be done:
+    bool again = false;
+    do {
+      again = SIMIX_execute_timers();
+      if (SIMIX_execute_tasks())
+        again = true;
+      SIMIX_wake_processes();
+    } while (again);
 
     /* Autorestart all process */
     char *hostname = nullptr;
+    unsigned int iter;
     xbt_dynar_foreach(host_that_restart,iter,hostname) {
       XBT_INFO("Restart processes on host: %s",hostname);
       SIMIX_host_autorestart(sg_host_by_name(hostname));
@@ -510,7 +550,6 @@ void SIMIX_run(void)
     /* Clean processes to destroy */
     SIMIX_process_empty_trash();
 
-
     XBT_DEBUG("### time %f, empty %d", time, xbt_dynar_is_empty(simix_global->process_to_run));
 
   } while (time != -1.0 || !xbt_dynar_is_empty(simix_global->process_to_run));
index 1808c16..ad1ff19 100644 (file)
 #include <functional>
 #include <memory>
 #include <unordered_map>
+#include <vector>
 
 #include <xbt/functional.hpp>
 
-
 #include "src/internal_config.h"
 #include "simgrid/simix.h"
 #include "surf/surf.h"
@@ -88,6 +88,9 @@ public:
   /** Callback used when killing a SMX_process */
   void_pfn_smxprocess_t cleanup_process_function = nullptr;
   xbt_os_mutex_t mutex = nullptr;
+
+  std::vector<simgrid::xbt::Task<void()>> tasks;
+  std::vector<simgrid::xbt::Task<void()>> tasksTemp;
 };
 
 }
index 1e6ac4f..28348f0 100644 (file)
@@ -337,6 +337,7 @@ set(SURF_SRC
 
 set(SIMIX_GENERATED_SRC   src/simix/popping_generated.cpp  )
 set(SIMIX_SRC
+  src/kernel/future.cpp
   src/simix/libsmx.cpp
   src/simix/smx_context.cpp
   src/simix/Context.cpp