+/* Copyright (c) 2016. The SimGrid Team.
+ * All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_KERNEL_FUTURE_HPP
+#define SIMGRID_KERNEL_FUTURE_HPP
+
+#include <boost/optional.hpp>
+
+#include <xbt/base.h>
+
+#include <functional>
+#include <future>
+#include <memory>
+#include <utility>
+#include <type_traits>
+
+namespace simgrid {
+namespace kernel {
+
+// There are the public classes:
+template<class T> class Future;
+template<class T> class Promise;
+
+// Those are implementation details:
+enum class FutureStatus;
+template<class T> class FutureState;
+class FutureContinuation;
+template<class T, class F> class FutureContinuationImpl;
+
+enum class FutureStatus {
+ not_ready,
+ ready,
+ done,
+};
+
+/** A continuation attached to a future to be executed when it is ready */
+XBT_PUBLIC_CLASS FutureContinuation {
+public:
+ FutureContinuation() {}
+
+ // No copy:
+ FutureContinuation(FutureContinuation&) = delete;
+ FutureContinuation& operator=(FutureContinuation&) = delete;
+
+ virtual ~FutureContinuation() {}
+ virtual void operator()() = 0;
+};
+
+/** Default implementation of `FutureContinuation`
+ *
+ * @param T value type of the future
+ * @param F type of the wrapped code/callback/continuation
+ */
+template<class T, class F>
+class FutureContinuationImpl : public FutureContinuation {
+public:
+ FutureContinuationImpl(std::shared_ptr<FutureState<T>> ptr, F callback)
+ : ptr_(std::move(ptr)), callback_(std::move(callback)) {}
+ ~FutureContinuationImpl() override {}
+ void operator()() override
+ {
+ try {
+ callback_(Future<T>(ptr_));
+ }
+ // Those exceptions are lost.
+ // If we want to implement callback chaining, we'll have to catch them and
+ // foward them to the next future.
+ catch (...) {
+ // We could log this.
+ }
+ }
+private:
+ std::shared_ptr<FutureState<T>> ptr_;
+ F callback_;
+};
+
+/** Bases stuff for all @ref simgrid::kernel::FutureState<T> */
+class FutureStateBase {
+public:
+ // No copy/move:
+ FutureStateBase(FutureStateBase const&) = delete;
+ FutureStateBase& operator=(FutureStateBase const&) = delete;
+
+ void set_exception(std::exception_ptr exception)
+ {
+ xbt_assert(exception_ == nullptr);
+ if (status_ != FutureStatus::not_ready)
+ throw std::future_error(std::future_errc::promise_already_satisfied);
+ exception_ = std::move(exception);
+ this->set_ready();
+ }
+
+ void set_continuation(std::unique_ptr<FutureContinuation> continuation)
+ {
+ xbt_assert(!continuation_);
+ switch (status_) {
+ case FutureStatus::done:
+ // This is not supposed to happen if continuation is set
+ // via the Promise:
+ xbt_die("Set continuation on finished future");
+ break;
+ case FutureStatus::ready:
+ // The future is ready, execute the continuation directly.
+ // We might execute it from the event loop instead:
+ (*continuation)();
+ break;
+ case FutureStatus::not_ready:
+ // The future is not ready so we mast keep the continuation for
+ // executing it later:
+ continuation_ = std::move(continuation);
+ break;
+ }
+ }
+
+ FutureStatus get_status() const
+ {
+ return status_;
+ }
+
+ bool is_ready() const
+ {
+ return status_ == FutureStatus::ready;
+ }
+
+protected:
+ FutureStateBase() {}
+ ~FutureStateBase() {};
+
+ /** Set the future as ready and trigger the continuation */
+ void set_ready()
+ {
+ status_ = FutureStatus::ready;
+ if (continuation_) {
+ // We unregister the continuation before executing it.
+ // We need to do this becase the current implementation of the
+ // continuation has a shared_ptr to the FutureState.
+ auto continuation = std::move(continuation_);
+ (*continuation)();
+ }
+ }
+
+ /** Set the future as done and raise an exception if any
+ *
+ * This does half the job of `.get()`.
+ **/
+ void resolve()
+ {
+ if (status_ != FutureStatus::ready)
+ xbt_die("Deadlock: this future is not ready");
+ status_ = FutureStatus::done;
+ if (exception_) {
+ std::exception_ptr exception = std::move(exception_);
+ std::rethrow_exception(std::move(exception));
+ }
+ }
+
+private:
+ FutureStatus status_ = FutureStatus::not_ready;
+ std::exception_ptr exception_;
+ std::unique_ptr<FutureContinuation> continuation_;
+};
+
+/** Shared state for future and promises
+ *
+ * You are not expected to use them directly but to create them
+ * implicitely through a @ref simgrid::kernel::Promise.
+ * Alternatively kernel operations could inherit or contain FutureState
+ * if they are managed with @ref std::shared_ptr.
+ **/
+template<class T>
+class FutureState : public FutureStateBase {
+public:
+
+ void set_value(T value)
+ {
+ if (this->get_status() != FutureStatus::not_ready)
+ throw std::future_error(std::future_errc::promise_already_satisfied);
+ value_ = std::move(value);
+ this->set_ready();
+ }
+
+ T get()
+ {
+ this->resolve();
+ xbt_assert(this->value_);
+ auto result = std::move(this->value_.get());
+ this->value_ = boost::optional<T>();
+ return std::move(result);
+ }
+
+private:
+ boost::optional<T> value_;
+};
+
+template<class T>
+class FutureState<T&> : public FutureStateBase {
+public:
+ void set_value(T& value)
+ {
+ if (this->get_status() != FutureStatus::not_ready)
+ throw std::future_error(std::future_errc::promise_already_satisfied);
+ value_ = &value;
+ this->set_ready();
+ }
+
+ T& get()
+ {
+ this->resolve();
+ xbt_assert(this->value_);
+ T* result = value_;
+ value_ = nullptr;
+ return *value_;
+ }
+
+private:
+ T* value_ = nullptr;
+};
+
+template<>
+class FutureState<void> : public FutureStateBase {
+public:
+ void set_value()
+ {
+ if (this->get_status() != FutureStatus::not_ready)
+ throw std::future_error(std::future_errc::promise_already_satisfied);
+ this->set_ready();
+ }
+
+ void get()
+ {
+ this->resolve();
+ }
+};
+
+/** Result of some (possibly ongoing, asynchronous) operation in the SimGrid kernel
+ *
+ * As the operation may not be completed yet, the result might be an exception.
+ *
+ * Example of the API (`simgrid::kernel::createProcess` does not exist):
+ * <pre>
+ * // Create a new process using the Worker code, this process returns
+ * // a std::string:
+ * simgrid::kernel::Future<std::string> future =
+ * simgrid::kernel::createProcess("worker42", host, Worker(42));
+ * // At this point, we just created the process so the result is not available.
+ * // However, we can attach some work do be done with this result:
+ * future.then([](simgrid::kernel::Future<std::string> result) {
+ * // This code is called when the operation is completed so the result is
+ * // available:
+ * try {
+ * // Try to get value, this might throw an exception if the operation
+ * // failed (such as an exception throwed by the worker process):
+ * std::string value = result.get();
+ * XBT_INFO("Value: %s", value.c_str());
+ * }
+ * catch(std::exception& e) {
+ * // This is an exception from the asynchronous operation:
+ * XBT_INFO("Error: %e", e.what());
+ * }
+ * );
+ * </pre>
+ *
+ * This is based on C++1z @ref std::future but with some differences:
+ *
+ * * there is no thread synchronization (atomic, mutex, condition variable,
+ * etc.) because everything happens in the SimGrid event loop;
+ *
+ * * it is purely asynchronous, you are expected to use `.then()`;
+ *
+ * * inside the `.then()`, `.get()` can be used;
+ *
+ * * `.get()` can only be used when `.is_ready()` (as everything happens in
+ * a single-thread, the future would be guaranted to deadlock if `.get()`
+ * is called when the future is not ready);
+ *
+ * * there is no future chaining support for now (`.then().then()`);
+ *
+ * * there is no sharing (`shared_future`) for now.
+ */
+template<class T>
+class Future {
+public:
+ Future() {}
+ Future(std::shared_ptr<FutureState<T>> state): state_(std::move(state)) {}
+
+ // Move type:
+ Future(Future&) = delete;
+ Future& operator=(Future&) = delete;
+ Future(Future&& that) : state_(std::move(that.state_)) {}
+ Future& operator=(Future&& that)
+ {
+ state_ = std::move(that.stat_);
+ }
+
+ /** Whether the future is valid:.
+ *
+ * A future which as been used (`.then` of `.get`) becomes invalid.
+ *
+ * We can use `.then` on a valid future.
+ */
+ bool valid() const
+ {
+ return state_ != nullptr;
+ }
+
+ /** Whether the future is ready
+ *
+ * A future is ready when it has an associated value or exception.
+ *
+ * We can use `.get()` on ready futures.
+ **/
+ bool is_ready() const
+ {
+ return state_ != nullptr && state_->is_ready();
+ }
+
+ /** Attach a continuation to this future
+ *
+ * The future must be valid in order to make this call.
+ * The continuation is executed when the future becomes ready.
+ * The future becomes invalid after this call.
+ *
+ * We don't support future chaining for now (`.then().then()`).
+ *
+ * @param continuation This function is called with a ready future
+ * the future is ready
+ * @exception std::future_error no state is associated with the future
+ */
+ template<class F>
+ void then(F continuation)
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ std::unique_ptr<FutureContinuation> ptr =
+ std::unique_ptr<FutureContinuation>(
+ new FutureContinuationImpl<T,F>(state_, std::move(continuation)));
+ state_->set_continuation(std::move(ptr));
+ state_ = nullptr;
+ }
+
+ /** Get the value from the future
+ *
+ * This is expected to be called
+ *
+ * The future must be valid and ready in order to make this call.
+ * @ref std::future blocks when the future is not ready but we are
+ * completely single-threaded so blocking would be a deadlock.
+ * After the call, the future becomes invalid.
+ *
+ * @return value of the future
+ * @exception any Exception from the future
+ * @exception std::future_error no state is associated with the future
+ */
+ T get()
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ std::shared_ptr<FutureState<T>> state = std::move(state_);
+ return state->get();
+ }
+
+private:
+ std::shared_ptr<FutureState<T>> state_;
+};
+
+/** Producer side of a @simgrid::kernel::Future
+ *
+ * A @ref Promise is connected to some `Future` and can be used to
+ * set its result.
+ *
+ * Similar to @ref std::promise
+ *
+ * <code>
+ * // Create a promise and a future:
+ * auto promise = std::make_shared<simgrid::kernel::Promise<T>>();
+ * auto future = promise->get_future();
+ *
+ * SIMIX_timer_set(date, [promise] {
+ * try {
+ * int value = compute_the_value();
+ * if (value < 0)
+ * throw std::logic_error("Bad value");
+ * // Whenever the operation is completed, we set the value
+ * // for the future:
+ * promise.set_value(value);
+ * }
+ * catch (...) {
+ * // If an error occured, we can set an exception which
+ * // will be throwed buy future.get():
+ * promise.set_exception(std::current_exception());
+ * }
+ * });
+ *
+ * // Return the future to the caller:
+ * return future;
+ * </code>
+ **/
+template<class T>
+class Promise {
+public:
+ Promise() : state_(std::make_shared<FutureState<T>>()) {}
+ Promise(std::shared_ptr<FutureState<T>> state) : state_(std::move(state)) {}
+
+ // Move type
+ Promise(Promise&) = delete;
+ Promise& operator=(Promise&) = delete;
+ Promise(Promise&& that) :
+ state_(std::move(that.state_)), future_get_(that.future_set)
+ {
+ that.future_get_ = false;
+ }
+
+ Promise& operator=(Promise&& that)
+ {
+ this->state_ = std::move(that.state_);
+ this->future_get_ = that.future_get_;
+ that.future_get_ = false;
+ return *this;
+ }
+ Future<T> get_future()
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ if (future_get_)
+ throw std::future_error(std::future_errc::future_already_retrieved);
+ future_get_ = true;
+ return Future<T>(state_);
+ }
+ void set_value(T value)
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ state_->set_value(std::move(value));
+ }
+ void set_exception(std::exception_ptr exception)
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ state_->set_exception(std::move(exception));
+ }
+ ~Promise()
+ {
+ if (state_ && state_->get_status() == FutureStatus::not_ready)
+ state_->set_exception(std::make_exception_ptr(
+ std::future_error(std::future_errc::broken_promise)));
+ }
+
+private:
+ std::shared_ptr<FutureState<T>> state_;
+ bool future_get_ = false;
+};
+
+template<>
+class Promise<void> {
+public:
+ Promise() : state_(std::make_shared<FutureState<void>>()) {}
+ Promise(std::shared_ptr<FutureState<void>> state) : state_(std::move(state)) {}
+ ~Promise()
+ {
+ if (state_ && state_->get_status() == FutureStatus::not_ready)
+ state_->set_exception(std::make_exception_ptr(
+ std::future_error(std::future_errc::broken_promise)));
+ }
+
+ // Move type
+ Promise(Promise&) = delete;
+ Promise& operator=(Promise&) = delete;
+ Promise(Promise&& that) :
+ state_(std::move(that.state_)), future_get_(that.future_get_)
+ {
+ that.future_get_ = false;
+ }
+ Promise& operator=(Promise&& that)
+ {
+ this->state_ = std::move(that.state_);
+ this->future_get_ = that.future_get_;
+ that.future_get_ = false;
+ return *this;
+ }
+
+ Future<void> get_future()
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ if (future_get_)
+ throw std::future_error(std::future_errc::future_already_retrieved);
+ future_get_ = true;
+ return Future<void>(state_);
+ }
+ void set_value()
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ state_->set_value();
+ }
+ void set_exception(std::exception_ptr exception)
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ state_->set_exception(std::move(exception));
+ }
+
+private:
+ std::shared_ptr<FutureState<void>> state_;
+ bool future_get_ = false;
+};
+
+}
+}
+
+#endif