Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[simix] Add a `run_blocking` simcall and simix::kernelSync
authorGabriel Corona <gabriel.corona@loria.fr>
Tue, 31 May 2016 14:36:51 +0000 (16:36 +0200)
committerGabriel Corona <gabriel.corona@loria.fr>
Fri, 17 Jun 2016 10:12:31 +0000 (12:12 +0200)
* run_blocking() is a generic blocking simcall. It is given a callback
  which is executed immediately in the SimGrid kernel. The callback is
  responsible for setting the suitable logic for waking up the process
  when needed.

* simix::kernelSync() is a higher level wrapper for this. It is given
  a callback which is executed in the kernel SimGrid and returns a
  simgrid::kernel::Future<T>. The kernel blocks the process until the
  Future is ready and either the value wrapped in the future to the
  process or raises the exception stored in the Future in the process.

* simgrid::simix::{Future,Promise} provide an abstration for
  asynchronous stuff happening in the SimGrid kernel. They are based
  on C++1z futures.

18 files changed:
.gitignore
include/simgrid/kernel/future.hpp [new file with mode: 0644]
include/simgrid/simix.hpp
include/simgrid/simix/blocking_simcall.hpp [new file with mode: 0644]
include/xbt/future.hpp
src/simix/libsmx.cpp
src/simix/popping.cpp
src/simix/popping_accessors.h
src/simix/popping_bodies.cpp
src/simix/popping_enum.h
src/simix/popping_generated.cpp
src/simix/popping_private.h
src/simix/simcalls.in
src/simix/smx_global.cpp
teshsuite/simix/CMakeLists.txt
teshsuite/simix/generic_simcalls/generic_simcalls.cpp [new file with mode: 0644]
teshsuite/simix/generic_simcalls/generic_simcalls.tesh [new file with mode: 0644]
tools/cmake/DefinePackages.cmake

index 1e766e4..c1bdf8a 100644 (file)
@@ -269,6 +269,7 @@ teshsuite/simdag/evaluate-get-route-time/evaluate-get-route-time
 teshsuite/simdag/evaluate-parse-time/evaluate-parse-time
 teshsuite/simix/check_defaults/check_defaults
 teshsuite/simix/stack_overflow/stack_overflow
+teshsuite/simix/generic_simcalls/generic_simcalls
 teshsuite/smpi/bug-17132/bug-17132
 teshsuite/smpi/coll-allgather/coll-allgather
 teshsuite/smpi/coll-allgatherv/coll-allgatherv
diff --git a/include/simgrid/kernel/future.hpp b/include/simgrid/kernel/future.hpp
new file mode 100644 (file)
index 0000000..a99a155
--- /dev/null
@@ -0,0 +1,514 @@
+/* Copyright (c) 2016. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_KERNEL_FUTURE_HPP
+#define SIMGRID_KERNEL_FUTURE_HPP
+
+#include <boost/optional.hpp>
+
+#include <xbt/base.h>
+
+#include <functional>
+#include <future>
+#include <memory>
+#include <utility>
+#include <type_traits>
+
+namespace simgrid {
+namespace kernel {
+
+// There are the public classes:
+template<class T> class Future;
+template<class T> class Promise;
+
+// Those are implementation details:
+enum class FutureStatus;
+template<class T> class FutureState;
+class FutureContinuation;
+template<class T, class F> class FutureContinuationImpl;
+
+enum class FutureStatus {
+  not_ready,
+  ready,
+  done,
+};
+
+/** A continuation attached to a future to be executed when it is ready */
+XBT_PUBLIC_CLASS FutureContinuation {
+public:
+  FutureContinuation() {}
+
+  // No copy:
+  FutureContinuation(FutureContinuation&) = delete;
+  FutureContinuation& operator=(FutureContinuation&) = delete;
+
+  virtual ~FutureContinuation() {}
+  virtual void operator()() = 0;
+};
+
+/** Default implementation of `FutureContinuation`
+ *
+ *  @param T   value type of the future
+ *  @param F   type of the wrapped code/callback/continuation
+ */
+template<class T, class F>
+class FutureContinuationImpl : public FutureContinuation {
+public:
+  FutureContinuationImpl(std::shared_ptr<FutureState<T>> ptr, F callback)
+    : ptr_(std::move(ptr)), callback_(std::move(callback)) {}
+  ~FutureContinuationImpl() override {}
+  void operator()() override
+  {
+    try {
+      callback_(Future<T>(ptr_));
+    }
+    // Those exceptions are lost.
+    // If we want to implement callback chaining, we'll have to catch them and
+    // foward them to the next future.
+    catch (...) {
+      // We could log this.
+    }
+  }
+private:
+  std::shared_ptr<FutureState<T>> ptr_;
+  F callback_;
+};
+
+/** Bases stuff for all @ref simgrid::kernel::FutureState<T> */
+class FutureStateBase {
+public:
+  // No copy/move:
+  FutureStateBase(FutureStateBase const&) = delete;
+  FutureStateBase& operator=(FutureStateBase const&) = delete;
+
+  void set_exception(std::exception_ptr exception)
+  {
+    xbt_assert(exception_ == nullptr);
+    if (status_ != FutureStatus::not_ready)
+      throw std::future_error(std::future_errc::promise_already_satisfied);
+    exception_ = std::move(exception);
+    this->set_ready();
+  }
+
+  void set_continuation(std::unique_ptr<FutureContinuation> continuation)
+  {
+    xbt_assert(!continuation_);
+    switch (status_) {
+    case FutureStatus::done:
+      // This is not supposed to happen if continuation is set
+      // via the Promise:
+      xbt_die("Set continuation on finished future");
+      break;
+    case FutureStatus::ready:
+      // The future is ready, execute the continuation directly.
+      // We might execute it from the event loop instead:
+      (*continuation)();
+      break;
+    case FutureStatus::not_ready:
+      // The future is not ready so we mast keep the continuation for
+      // executing it later:
+      continuation_ = std::move(continuation);
+      break;
+    }
+  }
+
+  FutureStatus get_status() const
+  {
+    return status_;
+  }
+
+  bool is_ready() const
+  {
+    return status_ == FutureStatus::ready;
+  }
+
+protected:
+  FutureStateBase() {}
+  ~FutureStateBase() {};
+
+  /** Set the future as ready and trigger the continuation */
+  void set_ready()
+  {
+    status_ = FutureStatus::ready;
+    if (continuation_) {
+      // We unregister the continuation before executing it.
+      // We need to do this becase the current implementation of the
+      // continuation has a shared_ptr to the FutureState.
+      auto continuation = std::move(continuation_);
+      (*continuation)();
+    }
+  }
+
+  /** Set the future as done and raise an exception if any
+   *
+   *  This does half the job of `.get()`.
+   **/
+  void resolve()
+  {
+    if (status_ != FutureStatus::ready)
+      xbt_die("Deadlock: this future is not ready");
+    status_ = FutureStatus::done;
+    if (exception_) {
+      std::exception_ptr exception = std::move(exception_);
+      std::rethrow_exception(std::move(exception));
+    }
+  }
+
+private:
+  FutureStatus status_ = FutureStatus::not_ready;
+  std::exception_ptr exception_;
+  std::unique_ptr<FutureContinuation> continuation_;
+};
+
+/** Shared state for future and promises
+ *
+ *  You are not expected to use them directly but to create them
+ *  implicitely through a @ref simgrid::kernel::Promise.
+ *  Alternatively kernel operations could inherit or contain FutureState
+ *  if they are managed with @ref std::shared_ptr.
+ **/
+template<class T>
+class FutureState : public FutureStateBase {
+public:
+
+  void set_value(T value)
+  {
+    if (this->get_status() != FutureStatus::not_ready)
+      throw std::future_error(std::future_errc::promise_already_satisfied);
+    value_ = std::move(value);
+    this->set_ready();
+  }
+
+  T get()
+  {
+    this->resolve();
+    xbt_assert(this->value_);
+    auto result = std::move(this->value_.get());
+    this->value_ = boost::optional<T>();
+    return std::move(result);
+  }
+
+private:
+  boost::optional<T> value_;
+};
+
+template<class T>
+class FutureState<T&> : public FutureStateBase {
+public:
+  void set_value(T& value)
+  {
+    if (this->get_status() != FutureStatus::not_ready)
+      throw std::future_error(std::future_errc::promise_already_satisfied);
+    value_ = &value;
+    this->set_ready();
+  }
+
+  T& get()
+  {
+    this->resolve();
+    xbt_assert(this->value_);
+    T* result = value_;
+    value_ = nullptr;
+    return *value_;
+  }
+
+private:
+  T* value_ = nullptr;
+};
+
+template<>
+class FutureState<void> : public FutureStateBase {
+public:
+  void set_value()
+  {
+    if (this->get_status() != FutureStatus::not_ready)
+      throw std::future_error(std::future_errc::promise_already_satisfied);
+    this->set_ready();
+  }
+
+  void get()
+  {
+    this->resolve();
+  }
+};
+
+/** Result of some (possibly ongoing, asynchronous) operation in the SimGrid kernel
+ *
+ *  As the operation may not be completed yet, the result might be an exception.
+ *
+ *  Example of the API (`simgrid::kernel::createProcess` does not exist):
+ *  <pre>
+ *  // Create a new process using the Worker code, this process returns
+ *  // a std::string:
+ *  simgrid::kernel::Future<std::string> future =
+ *     simgrid::kernel::createProcess("worker42", host, Worker(42));
+ *  // At this point, we just created the process so the result is not available.
+ *  // However, we can attach some work do be done with this result:
+ *  future.then([](simgrid::kernel::Future<std::string> result) {
+ *    // This code is called when the operation is completed so the result is
+ *    // available:
+ *    try {
+ *      // Try to get value, this might throw an exception if the operation
+ *      // failed (such as an exception throwed by the worker process):
+ *      std::string value = result.get();
+ *      XBT_INFO("Value: %s", value.c_str());
+ *    }
+ *    catch(std::exception& e) {
+ *      // This is an exception from the asynchronous operation:
+ *      XBT_INFO("Error: %e", e.what());
+ *    }
+ *  );
+ *  </pre>
+ *
+ *  This is based on C++1z @ref std::future but with some differences:
+ *
+ *  * there is no thread synchronization (atomic, mutex, condition variable,
+ *    etc.) because everything happens in the SimGrid event loop;
+ *
+ *  * it is purely asynchronous, you are expected to use `.then()`;
+ *
+ *  * inside the `.then()`, `.get()` can be used;
+ *
+ *  * `.get()` can only be used when `.is_ready()` (as everything happens in
+ *     a single-thread, the future would be guaranted to deadlock if `.get()`
+ *     is called when the future is not ready);
+ *
+ *  * there is no future chaining support for now (`.then().then()`);
+ *
+ *  * there is no sharing (`shared_future`) for now.
+ */
+template<class T>
+class Future {
+public:
+  Future() {}
+  Future(std::shared_ptr<FutureState<T>> state): state_(std::move(state)) {}
+
+  // Move type:
+  Future(Future&) = delete;
+  Future& operator=(Future&) = delete;
+  Future(Future&& that) : state_(std::move(that.state_)) {}
+  Future& operator=(Future&& that)
+  {
+    state_ = std::move(that.stat_);
+  }
+
+  /** Whether the future is valid:.
+   *
+   *  A future which as been used (`.then` of `.get`) becomes invalid.
+   *
+   *  We can use `.then` on a valid future.
+   */
+  bool valid() const
+  {
+    return state_ != nullptr;
+  }
+
+  /** Whether the future is ready
+   *
+   *  A future is ready when it has an associated value or exception.
+   *
+   *  We can use `.get()` on ready futures.
+   **/
+  bool is_ready() const
+  {
+    return state_ != nullptr && state_->is_ready();
+  }
+
+  /** Attach a continuation to this future
+   *
+   *  The future must be valid in order to make this call.
+   *  The continuation is executed when the future becomes ready.
+   *  The future becomes invalid after this call.
+   *
+   *  We don't support future chaining for now (`.then().then()`).
+   *
+   * @param continuation This function is called with a ready future
+   *                     the future is ready
+   * @exception std::future_error no state is associated with the future
+   */
+  template<class F>
+  void then(F continuation)
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    std::unique_ptr<FutureContinuation> ptr =
+      std::unique_ptr<FutureContinuation>(
+        new FutureContinuationImpl<T,F>(state_, std::move(continuation)));
+    state_->set_continuation(std::move(ptr));
+    state_ = nullptr;
+  }
+
+  /** Get the value from the future
+   *
+   *  This is expected to be called
+   *
+   *  The future must be valid and ready in order to make this call.
+   *  @ref std::future blocks when the future is not ready but we are
+   *  completely single-threaded so blocking would be a deadlock.
+   *  After the call, the future becomes invalid.
+   *
+   *  @return                      value of the future
+   *  @exception any               Exception from the future
+   *  @exception std::future_error no state is associated with the future
+   */
+  T get()
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    std::shared_ptr<FutureState<T>> state = std::move(state_);
+    return state->get();
+  }
+
+private:
+  std::shared_ptr<FutureState<T>> state_;
+};
+
+/** Producer side of a @simgrid::kernel::Future
+ *
+ *  A @ref Promise is connected to some `Future` and can be used to
+ *  set its result.
+ *
+ *  Similar to @ref std::promise
+ *
+ *  <code>
+ *  // Create a promise and a future:
+ *  auto promise = std::make_shared<simgrid::kernel::Promise<T>>();
+ *  auto future = promise->get_future();
+ *
+ *  SIMIX_timer_set(date, [promise] {
+ *    try {
+ *      int value = compute_the_value();
+ *      if (value < 0)
+ *        throw std::logic_error("Bad value");
+ *      // Whenever the operation is completed, we set the value
+ *      // for the future:
+ *      promise.set_value(value);
+ *    }
+ *    catch (...) {
+ *      // If an error occured, we can set an exception which
+ *      // will be throwed buy future.get():
+ *      promise.set_exception(std::current_exception());
+ *    }
+ *  });
+ *
+ *  // Return the future to the caller:
+ *  return future;
+ *  </code>
+ **/
+template<class T>
+class Promise {
+public:
+  Promise() : state_(std::make_shared<FutureState<T>>()) {}
+  Promise(std::shared_ptr<FutureState<T>> state) : state_(std::move(state)) {}
+
+  // Move type
+  Promise(Promise&) = delete;
+  Promise& operator=(Promise&) = delete;
+  Promise(Promise&& that) :
+    state_(std::move(that.state_)), future_get_(that.future_set)
+  {
+    that.future_get_ = false;
+  }
+
+  Promise& operator=(Promise&& that)
+  {
+    this->state_ = std::move(that.state_);
+    this->future_get_ = that.future_get_;
+    that.future_get_ = false;
+    return *this;
+  }
+  Future<T> get_future()
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    if (future_get_)
+      throw std::future_error(std::future_errc::future_already_retrieved);
+    future_get_ = true;
+    return Future<T>(state_);
+  }
+  void set_value(T value)
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    state_->set_value(std::move(value));
+  }
+  void set_exception(std::exception_ptr exception)
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    state_->set_exception(std::move(exception));
+  }
+  ~Promise()
+  {
+    if (state_ && state_->get_status() == FutureStatus::not_ready)
+      state_->set_exception(std::make_exception_ptr(
+        std::future_error(std::future_errc::broken_promise)));
+  }
+
+private:
+  std::shared_ptr<FutureState<T>> state_;
+  bool future_get_ = false;
+};
+
+template<>
+class Promise<void> {
+public:
+  Promise() : state_(std::make_shared<FutureState<void>>()) {}
+  Promise(std::shared_ptr<FutureState<void>> state) : state_(std::move(state)) {}
+  ~Promise()
+  {
+    if (state_ && state_->get_status() == FutureStatus::not_ready)
+      state_->set_exception(std::make_exception_ptr(
+        std::future_error(std::future_errc::broken_promise)));
+  }
+
+  // Move type
+  Promise(Promise&) = delete;
+  Promise& operator=(Promise&) = delete;
+  Promise(Promise&& that) :
+    state_(std::move(that.state_)), future_get_(that.future_get_)
+  {
+    that.future_get_ = false;
+  }
+  Promise& operator=(Promise&& that)
+  {
+    this->state_ = std::move(that.state_);
+    this->future_get_ = that.future_get_;
+    that.future_get_ = false;
+    return *this;
+  }
+
+  Future<void> get_future()
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    if (future_get_)
+      throw std::future_error(std::future_errc::future_already_retrieved);
+    future_get_ = true;
+    return Future<void>(state_);
+  }
+  void set_value()
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    state_->set_value();
+  }
+  void set_exception(std::exception_ptr exception)
+  {
+    if (state_ == nullptr)
+      throw std::future_error(std::future_errc::no_state);
+    state_->set_exception(std::move(exception));
+  }
+
+private:
+  std::shared_ptr<FutureState<void>> state_;
+  bool future_get_ = false;
+};
+
+}
+}
+
+#endif
index 901081a..ad48794 100644 (file)
@@ -9,13 +9,11 @@
 
 #include <cstddef>
 
-#include <exception>
 #include <string>
 #include <utility>
 #include <memory>
 #include <functional>
 #include <future>
-#include <type_traits>
 
 #include <xbt/function_types.h>
 #include <xbt/future.hpp>
 #include <simgrid/simix.h>
 
 XBT_PUBLIC(void) simcall_run_kernel(std::function<void()> const& code);
+XBT_PUBLIC(void) simcall_run_blocking(std::function<void()> const& code);
 
 template<class F> inline
 void simcall_run_kernel(F& f)
 {
   simcall_run_kernel(std::function<void()>(std::ref(f)));
 }
+template<class F> inline
+void simcall_run_blocking(F& f)
+{
+  simcall_run_blocking(std::function<void()>(std::ref(f)));
+}
 
 namespace simgrid {
+
 namespace simix {
 
 /** Execute some code in the kernel/maestro
@@ -191,7 +196,13 @@ XBT_PUBLIC(smx_process_t) simcall_process_create(const char *name,
                                           xbt_dict_t properties,
                                           int auto_restart);
 
-XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, std::function<void()> callback);
+XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, std::packaged_task<void()> callback);
+
+template<class F> inline
+XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, F callback)
+{
+  return SIMIX_timer_set(date, std::packaged_task<void()>(std::move(callback)));
+}
 
 template<class R, class T> inline
 XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, R(*callback)(T*), T* arg)
diff --git a/include/simgrid/simix/blocking_simcall.hpp b/include/simgrid/simix/blocking_simcall.hpp
new file mode 100644 (file)
index 0000000..aa85620
--- /dev/null
@@ -0,0 +1,74 @@
+/* Copyright (c) 2016. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_SIMIX_BLOCKING_SIMCALL_HPP
+#define SIMGRID_SIMIX_BLOCKING_SIMCALL_HPP
+
+#include <iostream>
+
+#include <exception>
+
+#include <xbt/sysdep.h>
+
+#include <future>
+
+#include <xbt/future.hpp>
+#include <simgrid/kernel/future.hpp>
+#include <simgrid/simix.h>
+
+XBT_PUBLIC(void) simcall_run_blocking(std::function<void()> const& code);
+
+namespace simgrid {
+namespace simix {
+
+XBT_PUBLIC(void) unblock(smx_process_t process);
+
+/** Execute some code in kernel mode and wakes up the process when
+ *  the result is available.
+ *
+ *  The code given is executed in SimGrid kernel and expected to return
+ *  a `simgrid::kernel::Future`. The current process is resumed whenever
+ *  the Future becomes ready and gets the value or exception of the future:
+ *
+ *  This can be used to implement blocking calls in without adding new simcalls.
+ *  One downside of this approach is that we don't have any semantic on what
+ *  the process is waiting. This might be a problem for the model-checker and
+ *  we'll have to device a way to make it work.
+ *
+ *  @param     code Kernel code returning a `simgrid::kernel::Future<T>`
+ *  @return         Value of the kernel future
+ *  @exception      Exception from the kernel future
+ */
+template<class F>
+auto blocking_simcall(F code) -> decltype(code().get())
+{
+  typedef decltype(code().get()) T;
+  if (SIMIX_is_maestro())
+    xbt_die("Can't execute blocking call in kernel mode");
+
+  smx_process_t self = SIMIX_process_self();
+  simgrid::xbt::Result<T> result;
+
+  simcall_run_blocking([&result, self, &code]{
+    try {
+      auto future = code();
+      future.then([&result, self](simgrid::kernel::Future<T> value) {
+        simgrid::xbt::setPromise(result, value);
+        simgrid::simix::unblock(self);
+      });
+    }
+    catch (...) {
+      result.set_exception(std::current_exception());
+      simgrid::simix::unblock(self);
+    }
+  });
+  return result.get();
+}
+
+}
+}
+
+#endif
index 81c8845..b7e52e0 100644 (file)
@@ -11,6 +11,9 @@
 
 #include <utility>
 #include <exception>
+#include <stdexcept>
+
+#include <type_traits>
 
 namespace simgrid {
 namespace xbt {
@@ -173,8 +176,16 @@ public:
 
 /** Execute some code and set a promise or result accordingly
  *
+ *  Roughly this does:
+ *
+ *  <pre>
+ *  promise.set_value(code());
+ *  </pre>
+ *
+ *  but it takes care of exceptions and works with void.
+ *
  *  We might need this when working with generic code because
- *  the trivial implementation does not work with void (before C++1z).
+ *  the trivial implementation does not work with `void` (before C++1z).
  *
  *  @param    code  What we want to do
  *  @param  promise Where to want to store the result
@@ -184,7 +195,7 @@ auto fulfillPromise(R& promise, F&& code)
 -> decltype(promise.set_value(code()))
 {
   try {
-    promise.set_value(code());
+    promise.set_value(std::forward<F>(code)());
   }
   catch(...) {
     promise.set_exception(std::current_exception());
@@ -196,7 +207,7 @@ auto fulfillPromise(P& promise, F&& code)
 -> decltype(promise.set_value())
 {
   try {
-    (code)();
+    std::forward<F>(code)();
     promise.set_value();
   }
   catch(...) {
@@ -204,6 +215,26 @@ auto fulfillPromise(P& promise, F&& code)
   }
 }
 
+/** Set a promise/result from a future/resul
+ *
+ *  Roughly this does:
+ *
+ *  <pre>promise.set_value(future);</pre>
+ *
+ *  but it takes care of exceptions and works with `void`.
+ *
+ *  We might need this when working with generic code because
+ *  the trivial implementation does not work with `void` (before C++1z).
+ *
+ *  @param promise output (a valid future or a result)
+ *  @param future  input (a ready/waitable future or a valid result)
+ */
+template<class P, class F> inline
+void setPromise(P& promise, F&& future)
+{
+  fulfillPromise(promise, [&]{ return std::forward<F>(future).get(); });
+}
+
 }
 }
 
index e731708..e91ebb8 100644 (file)
 
 #include <xbt/functional.hpp>
 
+#include <simgrid/simix/blocking_simcall.hpp>
+
 #include "src/mc/mc_replay.h"
 #include "smx_private.h"
 #include "src/mc/mc_forward.hpp"
 #include "xbt/ex.h"
 #include "mc/mc.h"
 #include "src/simix/smx_host_private.h"
-
 #include "src/simix/SynchroComm.hpp"
 
 #include <simgrid/simix.hpp>
@@ -1093,6 +1094,11 @@ void simcall_run_kernel(std::function<void()> const& code)
   return simcall_BODY_run_kernel(&code);
 }
 
+void simcall_run_blocking(std::function<void()> const& code)
+{
+  return simcall_BODY_run_blocking(&code);
+}
+
 int simcall_mc_random(int min, int max) {
   return simcall_BODY_mc_random(min, max);
 }
@@ -1103,3 +1109,15 @@ int simcall_mc_random(int min, int max) {
 const char *SIMIX_simcall_name(e_smx_simcall_t kind) {
   return simcall_names[kind];
 }
+
+namespace simgrid {
+namespace simix {
+
+void unblock(smx_process_t process)
+{
+  xbt_assert(SIMIX_is_maestro());
+  SIMIX_simcall_answer(&process->simcall);
+}
+
+}
+}
\ No newline at end of file
index 8e93873..d5cabef 100644 (file)
@@ -44,3 +44,16 @@ void SIMIX_run_kernel(std::function<void()> const* code)
 {
   (*code)();
 }
+
+/** Kernel code for run_blocking
+ *
+ * This looks a lot like SIMIX_run_kernel ^^
+ *
+ * However, this `run_blocking` is blocking so the process will not be woken
+ * up until `SIMIX_simcall_answer(simcall)`` is called by the kernel.
+ * This means that `code` is responsible for doing this.
+ */
+void SIMIX_run_blocking(std::function<void()> const* code)
+{
+  (*code)();
+}
index 5e913d9..32e906e 100644 (file)
@@ -1150,6 +1150,13 @@ static inline void simcall_run_kernel__set__code(smx_simcall_t simcall, std::fun
     simgrid::simix::marshal<std::function<void()> const*>(simcall->args[0], arg);
 }
 
+static inline std::function<void()> const* simcall_run_blocking__get__code(smx_simcall_t simcall) {
+  return simgrid::simix::unmarshal<std::function<void()> const*>(simcall->args[0]);
+}
+static inline void simcall_run_blocking__set__code(smx_simcall_t simcall, std::function<void()> const* arg) {
+    simgrid::simix::marshal<std::function<void()> const*>(simcall->args[0], arg);
+}
+
 /* The prototype of all simcall handlers, automatically generated for you */
 
 XBT_PRIVATE void simcall_HANDLER_vm_suspend(smx_simcall_t simcall, sg_host_t ind_vm);
index 74b4dcf..ba83d70 100644 (file)
@@ -441,4 +441,10 @@ inline static void simcall_BODY_run_kernel(std::function<void()> const* code) {
     /* Go to that function to follow the code flow through the simcall barrier */
     if (0) SIMIX_run_kernel(code);
     return simcall<void, std::function<void()> const*>(SIMCALL_RUN_KERNEL, code);
+  }
+  
+inline static void simcall_BODY_run_blocking(std::function<void()> const* code) {
+    /* Go to that function to follow the code flow through the simcall barrier */
+    if (0) SIMIX_run_blocking(code);
+    return simcall<void, std::function<void()> const*>(SIMCALL_RUN_BLOCKING, code);
   }/** @endcond */
index f7590f8..4f631eb 100644 (file)
@@ -85,5 +85,6 @@ typedef enum {
   SIMCALL_ASR_GET_PROPERTIES,
   SIMCALL_MC_RANDOM,
   SIMCALL_SET_CATEGORY,
-  SIMCALL_RUN_KERNEL,  NUM_SIMCALLS
+  SIMCALL_RUN_KERNEL,
+  SIMCALL_RUN_BLOCKING,  NUM_SIMCALLS
 } e_smx_simcall_t;
index f7b5259..1ba8754 100644 (file)
@@ -90,7 +90,8 @@ const char* simcall_names[] = {
   "SIMCALL_ASR_GET_PROPERTIES",
   "SIMCALL_MC_RANDOM",
   "SIMCALL_SET_CATEGORY",
-  "SIMCALL_RUN_KERNEL",};
+  "SIMCALL_RUN_KERNEL",
+  "SIMCALL_RUN_BLOCKING",};
 
 /** @private
  * @brief (in kernel mode) unpack the simcall and activate the handler
@@ -423,6 +424,10 @@ case SIMCALL_RUN_KERNEL:
       SIMIX_run_kernel(simgrid::simix::unmarshal<std::function<void()> const*>(simcall->args[0]));
       SIMIX_simcall_answer(simcall);
       break;
+
+case SIMCALL_RUN_BLOCKING:
+      SIMIX_run_blocking(simgrid::simix::unmarshal<std::function<void()> const*>(simcall->args[0]));
+      break;
     case NUM_SIMCALLS:
       break;
     case SIMCALL_NONE:
index 3c33b23..d6a3a57 100644 (file)
@@ -60,6 +60,7 @@ XBT_PRIVATE void SIMIX_simcall_handle(smx_simcall_t simcall, int value);
 XBT_PRIVATE void SIMIX_simcall_exit(smx_synchro_t synchro);
 XBT_PRIVATE const char *SIMIX_simcall_name(e_smx_simcall_t kind);
 XBT_PRIVATE void SIMIX_run_kernel(std::function<void()> const* code);
+XBT_PRIVATE void SIMIX_run_blocking(std::function<void()> const* code);
 
 SG_END_DECL()
 
index c8cf973..55858b2 100644 (file)
@@ -116,3 +116,4 @@ int        mc_random(int min, int max);
 void       set_category(smx_synchro_t synchro, const char* category) [[nohandler]];
 
 void       run_kernel(std::function<void()> const* code) [[nohandler]];
+void       run_blocking(std::function<void()> const* code) [[block,nohandler]];
index 7643bdd..ab8d595 100644 (file)
@@ -50,10 +50,10 @@ static xbt_heap_t simix_timers = nullptr;
 /** @brief Timer datatype */
 typedef struct s_smx_timer {
   double date = 0.0;
-  std::function<void()> callback;
+  std::packaged_task<void()> callback;
 
   s_smx_timer() {}
-  s_smx_timer(double date, std::function<void()> callback)
+  s_smx_timer(double date, std::packaged_task<void()> callback)
     : date(date), callback(std::move(callback)) {}
 } s_smx_timer_t;
 
@@ -474,13 +474,11 @@ void SIMIX_run(void)
        //FIXME: make the timers being real callbacks
        // (i.e. provide dispatchers that read and expand the args)
        timer = (smx_timer_t) xbt_heap_pop(simix_timers);
-       if (timer->callback) {
-         try {
-           timer->callback();
-         }
-         catch(...) {
-           xbt_die("Exception throwed ouf of timer callback");
-         }
+       try {
+         timer->callback();
+       }
+       catch(...) {
+         xbt_die("Exception throwed ouf of timer callback");
        }
        delete timer;
     }
@@ -539,12 +537,13 @@ void SIMIX_run(void)
  */
 smx_timer_t SIMIX_timer_set(double date, void (*function)(void*), void *arg)
 {
-  smx_timer_t timer = new s_smx_timer_t(date, std::bind(function, arg));
+  smx_timer_t timer = new s_smx_timer_t(date,
+    std::packaged_task<void()>(std::bind(function, arg)));
   xbt_heap_push(simix_timers, timer, date);
   return timer;
 }
 
-smx_timer_t SIMIX_timer_set(double date, std::function<void()> callback)
+smx_timer_t SIMIX_timer_set(double date, std::packaged_task<void()> callback)
 {
   smx_timer_t timer = new s_smx_timer_t(date, std::move(callback));
   xbt_heap_push(simix_timers, timer, date);
index 980b9e2..0fbec1d 100644 (file)
@@ -6,12 +6,20 @@ foreach(x check_defaults stack_overflow)
   set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.c)
 endforeach()
 
+foreach(x generic_simcalls)
+  add_executable       (${x}  ${x}/${x}.cpp)
+  target_link_libraries(${x}  simgrid)
+  set_target_properties(${x}  PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${x})
+  set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.cpp)
+endforeach()
+
 foreach (factory raw thread boost ucontext)
   set(tesh_files    ${tesh_files}    ${CMAKE_CURRENT_SOURCE_DIR}/check_defaults/factory_${factory}.tesh)
 endforeach()
 
 set(teshsuite_src  ${teshsuite_src}                                                                        PARENT_SCOPE)
 set(tesh_files     ${tesh_files}     ${CMAKE_CURRENT_SOURCE_DIR}/stack_overflow/stack_overflow.tesh        PARENT_SCOPE)
+set(tesh_files     ${tesh_files}     ${CMAKE_CURRENT_SOURCE_DIR}/generic_simcalls/generic_simcalls.tesh    PARENT_SCOPE)
 
 IF(HAVE_RAW_CONTEXTS)
   ADD_TESH(tesh-simix-factory-default --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/simix/check_defaults --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/simix/check_defaults factory_raw.tesh)
@@ -25,6 +33,7 @@ ENDIF()
 
 if (NOT enable_memcheck)
 ADD_TESH_FACTORIES(stack-overflow "thread;ucontext;boost;raw" --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/simix/stack_overflow --setenv srcdir=${CMAKE_HOME_DIRECTORY} --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/simix/stack_overflow stack_overflow.tesh)
+ADD_TESH_FACTORIES(generic-simcalls "thread;ucontext;boost;raw" --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/simix/generic_simcalls --setenv srcdir=${CMAKE_HOME_DIRECTORY} --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/simix/generic_simcalls generic_simcalls.tesh)
 endif()
 
 foreach (factory raw thread boost ucontext)
diff --git a/teshsuite/simix/generic_simcalls/generic_simcalls.cpp b/teshsuite/simix/generic_simcalls/generic_simcalls.cpp
new file mode 100644 (file)
index 0000000..f1743b5
--- /dev/null
@@ -0,0 +1,92 @@
+/* Copyright (c) 2016. The SimGrid Team.
+ * All rights reserved.                                                     */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <future>
+#include <list>
+
+#include <xbt/future.hpp>
+
+#include <simgrid/simix.hpp>
+#include <simgrid/simix/blocking_simcall.hpp>
+#include <simgrid/kernel/future.hpp>
+#include <xbt/log.h>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(test, "my log messages");
+
+namespace example {
+
+/** Execute the code in the kernel at some time
+ *
+ *  @param date  when we should execute the code
+ *  @param code  code to execute
+ *  @return      future with the result of the call
+ */
+template<class F>
+auto kernel_defer(double date, F code) -> simgrid::kernel::Future<decltype(code())>
+{
+  typedef decltype(code()) T;
+  auto promise = std::make_shared<simgrid::kernel::Promise<T>>();
+  auto future = promise->get_future();
+  SIMIX_timer_set(date, [promise, code] {
+    simgrid::xbt::fulfillPromise(*promise, std::move(code));
+  });
+  return future;
+}
+
+static int master(int argc, char *argv[])
+{
+  // Test the simple immediate execution:
+  XBT_INFO("Start");
+  simgrid::simix::kernel([] {
+    XBT_INFO("kernel");
+  });
+  XBT_INFO("kernel, returned");
+
+  // Synchronize on a successful Future<void>:
+  simgrid::simix::blocking_simcall([&] {
+    return kernel_defer(10, [] {
+      XBT_INFO("blocking_simcall with void");
+    });
+  });
+  XBT_INFO("blocking_simcall with void, returned");
+
+  // Synchronize on a failing Future<void>:
+  try {
+    simgrid::simix::blocking_simcall([&] {
+      return kernel_defer(20, [] {
+        throw std::runtime_error("Exception throwed from kernel_defer");
+      });
+    });
+    XBT_ERROR("No exception caught!");
+  }
+  catch(std::runtime_error& e) {
+    XBT_INFO("Exception caught: %s", e.what());
+  }
+
+  // Synchronize on a successul Future<int> and get the value:
+  int res = simgrid::simix::blocking_simcall([&] {
+    return kernel_defer(30, [] {
+      XBT_INFO("blocking_simcall with value");
+      return 42;
+    });
+  });
+  XBT_INFO("blocking_simcall with value returned with %i", res);
+
+  return 0;
+}
+
+}
+
+int main(int argc, char *argv[])
+{
+  SIMIX_global_init(&argc, argv);
+  xbt_assert(argc == 2, "Usage: %s platform.xml\n", argv[0]);
+  SIMIX_function_register("master", example::master);
+  SIMIX_create_environment(argv[1]);
+  simcall_process_create("master", example::master, NULL, "Tremblay", -1, 0, NULL, NULL, 0);
+  SIMIX_run();
+  return 0;
+}
diff --git a/teshsuite/simix/generic_simcalls/generic_simcalls.tesh b/teshsuite/simix/generic_simcalls/generic_simcalls.tesh
new file mode 100644 (file)
index 0000000..1c10ecb
--- /dev/null
@@ -0,0 +1,9 @@
+$ ${bindir:=.}/generic_simcalls --cfg=contexts/stack-size:96 ${srcdir:=.}/examples/platforms/small_platform.xml
+> [Tremblay:master:(0) 0.000000] [test/INFO] Start
+> [0.000000] [test/INFO] kernel
+> [Tremblay:master:(0) 0.000000] [test/INFO] kernel, returned
+> [10.000000] [test/INFO] blocking_simcall with void
+> [Tremblay:master:(0) 10.000000] [test/INFO] blocking_simcall with void, returned
+> [Tremblay:master:(0) 20.000000] [test/INFO] Exception caught: Exception throwed from kernel_defer
+> [30.000000] [test/INFO] blocking_simcall with value
+> [Tremblay:master:(0) 30.000000] [test/INFO] blocking_simcall with value returned with 42
index 1416671..d46f4b2 100644 (file)
@@ -634,6 +634,8 @@ set(headers_to_install
   include/simgrid/forward.h
   include/simgrid/simix.h
   include/simgrid/simix.hpp
+  include/simgrid/simix/sync.hpp
+  include/simgrid/kernel/future.hpp
   include/simgrid/host.h
   include/simgrid/link.h
   include/simgrid/s4u/forward.hpp