#ifndef SIMGRID_KERNEL_FUTURE_HPP
#define SIMGRID_KERNEL_FUTURE_HPP
+#include <future>
+#include <type_traits>
+
#include <boost/optional.hpp>
#include <xbt/base.h>
done,
};
+template<class T>
+struct is_future : public std::integral_constant<bool, false> {};
+template<class T>
+struct is_future<Future<T>> : public std::integral_constant<bool, true> {};
+
/** Bases stuff for all @ref simgrid::kernel::FutureState<T> */
class FutureStateBase {
public:
}
};
+template<class T>
+void bindPromise(Promise<T> promise, Future<T> future)
+{
+ struct PromiseBinder {
+ public:
+ PromiseBinder(Promise<T> promise) : promise_(std::move(promise)) {}
+ void operator()(Future<T> future)
+ {
+ simgrid::xbt::setPromise(promise_, future);
+ }
+ private:
+ Promise<T> promise_;
+ };
+ future.then_(PromiseBinder(std::move(promise)));
+}
+
+template<class T> Future<T> unwrapFuture(Future<Future<T>> future);
+
/** Result of some (probably) asynchronous operation in the SimGrid kernel
*
* @ref simgrid::simix::Future and @ref simgrid::simix::Future provide an
return state_ != nullptr && state_->is_ready();
}
+ /** Attach a continuation to this future
+ *
+ * This is like .then() but avoid the creation of a new future.
+ */
+ template<class F>
+ void then_(F continuation)
+ {
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ // Give shared-ownership to the continuation:
+ auto state = std::move(state_);
+ state->set_continuation(simgrid::xbt::makeTask(
+ std::move(continuation), state));
+ }
+
+ /** Attach a continuation to this future
+ *
+ * This version never does future unwrapping.
+ */
+ template<class F>
+ auto thenNoUnwrap(F continuation)
+ -> Future<decltype(continuation(std::move(*this)))>
+ {
+ typedef decltype(continuation(std::move(*this))) R;
+ if (state_ == nullptr)
+ throw std::future_error(std::future_errc::no_state);
+ auto state = std::move(state_);
+ // Create a new future...
+ Promise<R> promise;
+ Future<R> future = promise.get_future();
+ // ...and when the current future is ready...
+ state->set_continuation(simgrid::xbt::makeTask(
+ [](Promise<R> promise, std::shared_ptr<FutureState<T>> state, F continuation) {
+ // ...set the new future value by running the continuation.
+ Future<T> future(std::move(state));
+ simgrid::xbt::fulfillPromise(promise,[&]{
+ return continuation(std::move(future));
+ });
+ },
+ std::move(promise), state, std::move(continuation)));
+ return std::move(future);
+ }
+
/** Attach a continuation to this future
*
* The future must be valid in order to make this call.
* The continuation is executed when the future becomes ready.
* The future becomes invalid after this call.
*
- * We don't support future chaining for now (`.then().then()`).
- *
* @param continuation This function is called with a ready future
* the future is ready
* @exception std::future_error no state is associated with the future
*/
template<class F>
- void then(F continuation)
+ auto then(F continuation)
+ -> typename std::enable_if<
+ !is_future<decltype(continuation(std::move(*this)))>::value,
+ Future<decltype(continuation(std::move(*this)))>
+ >::type
{
- if (state_ == nullptr)
- throw std::future_error(std::future_errc::no_state);
- // Give shared-ownership to the continuation:
- auto state = std::move(state_);
- state->set_continuation(simgrid::xbt::makeTask(
- std::move(continuation), state));
+ return this->thenNoUnwrap(std::move(continuation));
+ }
+
+ /** Attach a continuation to this future (future chaining) */
+ template<class F>
+ auto then(F continuation)
+ -> typename std::enable_if<
+ is_future<decltype(continuation(std::move(*this)))>::value,
+ decltype(continuation(std::move(*this)))
+ >::type
+ {
+ return unwrapFuture(this->thenNoUnwap(std::move(continuation)));
}
/** Get the value from the future
std::shared_ptr<FutureState<T>> state_;
};
+template<class T>
+Future<T> unwrapFuture(Future<Future<T>> future)
+{
+ Promise<T> promise;
+ Future<T> result = promise.get_future();
+ bindPromise(std::move(promise), std::move(future));
+ return std::move(result);
+}
+
/** Producer side of a @simgrid::kernel::Future
*
* A @ref Promise is connected to some `Future` and can be used to
Promise(std::shared_ptr<FutureState<T>> state) : state_(std::move(state)) {}
// Move type
- Promise(Promise&) = delete;
- Promise& operator=(Promise&) = delete;
+ Promise(Promise const&) = delete;
+ Promise& operator=(Promise const&) = delete;
Promise(Promise&& that) :
- state_(std::move(that.state_)), future_get_(that.future_set)
+ state_(std::move(that.state_)), future_get_(that.future_get_)
{
that.future_get_ = false;
}
}
// Move type
- Promise(Promise&) = delete;
- Promise& operator=(Promise&) = delete;
+ Promise(Promise const&) = delete;
+ Promise& operator=(Promise const&) = delete;
Promise(Promise&& that) :
state_(std::move(that.state_)), future_get_(that.future_get_)
{
simcall_run_blocking([&result, self, &code]{
try {
auto future = code();
- future.then([&result, self](simgrid::kernel::Future<T> value) {
+ future.then_([&result, self](simgrid::kernel::Future<T> value) {
simgrid::xbt::setPromise(result, value);
simgrid::simix::unblock(self);
});
simcall_run_blocking([this, &result, self]{
try {
// When the kernel future is ready...
- this->future_.then([this, &result, self](simgrid::kernel::Future<T> value) {
+ this->future_.then_([this, &result, self](simgrid::kernel::Future<T> value) {
// ... wake up the process with the result of the kernel future.
simgrid::xbt::setPromise(result, value);
simgrid::simix::unblock(self);
simcall_run_blocking([this, &exception, self]{
try {
// When the kernel future is ready...
- this->future_.then([this, self](simgrid::kernel::Future<T> value) {
+ this->future_.then_([this, self](simgrid::kernel::Future<T> value) {
// ...store it the simix kernel and wake up.
this->future_ = std::move(value);
simgrid::simix::unblock(self);
}
};
+template<class F, class... Args>
+class TaskImpl {
+private:
+ F code_;
+ std::tuple<Args...> args_;
+ typedef decltype(simgrid::xbt::apply(std::move(code_), std::move(args_))) result_type;
+public:
+ TaskImpl(F code, std::tuple<Args...> args) :
+ code_(std::move(code)),
+ args_(std::move(args))
+ {}
+ result_type operator()()
+ {
+ return simgrid::xbt::apply(std::move(code_), std::move(args_));
+ }
+};
+
template<class F, class... Args>
auto makeTask(F code, Args... args)
-> Task< decltype(code(std::move(args)...))() >
{
- typedef decltype(code(std::move(args)...)) result_type;
-
- class Impl {
- private:
- F code_;
- std::tuple<Args...> args_;
- public:
- Impl(F code, std::tuple<Args...> args) :
- code_(std::move(code)),
- args_(std::move(args)) {}
- result_type operator()()
- {
- return simgrid::xbt::apply(std::move(code_), std::move(args_));
- }
- };
-
- return Impl(std::move(code), std::make_tuple(std::move(args)...));
+ TaskImpl<F, Args...> task(std::move(code), std::make_tuple(std::move(args)...));
+ return std::move(task);
}
}
namespace example {
-/** Execute the code in the kernel at some time
- *
- * @param date when we should execute the code
- * @param code code to execute
- * @return future with the result of the call
- */
-template<class F>
-auto kernel_defer(double date, F code) -> simgrid::kernel::Future<decltype(code())>
+/** Create a future which becomes ready when the date is reached */
+static
+simgrid::kernel::Future<void> kernel_wait_until(double date)
{
- typedef decltype(code()) T;
- auto promise = std::make_shared<simgrid::kernel::Promise<T>>();
+ auto promise = std::make_shared<simgrid::kernel::Promise<void>>();
auto future = promise->get_future();
- SIMIX_timer_set(date, [promise, code] {
- simgrid::xbt::fulfillPromise(*promise, std::move(code));
+ SIMIX_timer_set(date, [promise] {
+ promise->set_value();
});
return future;
}
// Synchronize on a successful Future<void>:
simgrid::simix::kernelSync([&] {
- return kernel_defer(10, [] {
+ return kernel_wait_until(10).then([](simgrid::kernel::Future<void> future) {
+ future.get();
XBT_INFO("kernelSync with void");
});
});
// Synchronize on a failing Future<void>:
try {
simgrid::simix::kernelSync([&] {
- return kernel_defer(20, [] {
+ return kernel_wait_until(20).then([](simgrid::kernel::Future<void> future) {
+ future.get();
throw std::runtime_error("Exception throwed from kernel_defer");
});
});
// Synchronize on a successul Future<int> and get the value:
int res = simgrid::simix::kernelSync([&] {
- return kernel_defer(30, [] {
+ return kernel_wait_until(30).then([](simgrid::kernel::Future<void> future) {
+ future.get();
XBT_INFO("kernelSync with value");
return 42;
});
// Synchronize on a successul Future<int> and get the value:
simgrid::simix::Future<int> future = simgrid::simix::kernelAsync([&] {
- return kernel_defer(50, [] {
+ return kernel_wait_until(50).then([](simgrid::kernel::Future<void> future) {
+ future.get();
XBT_INFO("kernelAsync with value");
return 43;
});
// Synchronize on a successul Future<int> and get the value:
future = simgrid::simix::kernelAsync([&] {
- return kernel_defer(60, [] {
+ return kernel_wait_until(60).then([](simgrid::kernel::Future<void> future) {
+ future.get();
XBT_INFO("kernelAsync with value");
return 43;
});