X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/198b09ec16ca1b8fc05053bcae9e75c0ad689711..1d0056178d75e1a88c82c6657833f24153f5ad16:/include/simgrid/kernel/future.hpp diff --git a/include/simgrid/kernel/future.hpp b/include/simgrid/kernel/future.hpp index c58f46b164..7b2941e583 100644 --- a/include/simgrid/kernel/future.hpp +++ b/include/simgrid/kernel/future.hpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2016. The SimGrid Team. +/* Copyright (c) 2016-2019. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it @@ -7,16 +7,18 @@ #ifndef SIMGRID_KERNEL_FUTURE_HPP #define SIMGRID_KERNEL_FUTURE_HPP -#include - -#include - #include #include #include #include #include +#include + +#include +#include +#include + namespace simgrid { namespace kernel { @@ -27,8 +29,6 @@ template class Promise; // Those are implementation details: enum class FutureStatus; template class FutureState; -class FutureContinuation; -template class FutureContinuationImpl; enum class FutureStatus { not_ready, @@ -36,46 +36,10 @@ enum class FutureStatus { done, }; -/** A continuation attached to a future to be executed when it is ready */ -XBT_PUBLIC_CLASS FutureContinuation { -public: - FutureContinuation() {} - - // No copy: - FutureContinuation(FutureContinuation&) = delete; - FutureContinuation& operator=(FutureContinuation&) = delete; - - virtual ~FutureContinuation() {} - virtual void operator()() = 0; -}; - -/** Default implementation of `FutureContinuation` - * - * @param T value type of the future - * @param F type of the wrapped code/callback/continuation - */ -template -class FutureContinuationImpl : public FutureContinuation { -public: - FutureContinuationImpl(std::shared_ptr> ptr, F callback) - : ptr_(std::move(ptr)), callback_(std::move(callback)) {} - ~FutureContinuationImpl() override {} - void operator()() override - { - try { - callback_(Future(ptr_)); - } - // Those exceptions are lost. - // If we want to implement callback chaining, we'll have to catch them and - // foward them to the next future. - catch (...) { - // We could log this. - } - } -private: - std::shared_ptr> ptr_; - F callback_; -}; +template +struct is_future : std::false_type {}; +template +struct is_future> : std::true_type {}; /** Bases stuff for all @ref simgrid::kernel::FutureState */ class FutureStateBase { @@ -84,6 +48,8 @@ public: FutureStateBase(FutureStateBase const&) = delete; FutureStateBase& operator=(FutureStateBase const&) = delete; + XBT_PUBLIC void schedule(simgrid::xbt::Task&& job); + void set_exception(std::exception_ptr exception) { xbt_assert(exception_ == nullptr); @@ -93,9 +59,9 @@ public: this->set_ready(); } - void set_continuation(std::unique_ptr continuation) + void set_continuation(simgrid::xbt::Task&& continuation) { - xbt_assert(!continuation_); + xbt_assert(not continuation_); switch (status_) { case FutureStatus::done: // This is not supposed to happen if continuation is set @@ -105,13 +71,15 @@ public: case FutureStatus::ready: // The future is ready, execute the continuation directly. // We might execute it from the event loop instead: - (*continuation)(); + schedule(std::move(continuation)); break; case FutureStatus::not_ready: - // The future is not ready so we mast keep the continuation for + // The future is not ready so we must keep the continuation for // executing it later: continuation_ = std::move(continuation); break; + default: + DIE_IMPOSSIBLE; } } @@ -126,8 +94,8 @@ public: } protected: - FutureStateBase() {} - ~FutureStateBase() {}; + FutureStateBase() = default; + ~FutureStateBase() = default; /** Set the future as ready and trigger the continuation */ void set_ready() @@ -135,10 +103,10 @@ protected: status_ = FutureStatus::ready; if (continuation_) { // We unregister the continuation before executing it. - // We need to do this becase the current implementation of the + // We need to do this because the current implementation of the // continuation has a shared_ptr to the FutureState. auto continuation = std::move(continuation_); - (*continuation)(); + this->schedule(std::move(continuation)); } } @@ -153,6 +121,7 @@ protected: status_ = FutureStatus::done; if (exception_) { std::exception_ptr exception = std::move(exception_); + exception_ = nullptr; std::rethrow_exception(std::move(exception)); } } @@ -160,15 +129,15 @@ protected: private: FutureStatus status_ = FutureStatus::not_ready; std::exception_ptr exception_; - std::unique_ptr continuation_; + simgrid::xbt::Task continuation_; }; /** Shared state for future and promises * * You are not expected to use them directly but to create them - * implicitely through a @ref simgrid::kernel::Promise. + * implicitly through a @ref simgrid::kernel::Promise. * Alternatively kernel operations could inherit or contain FutureState - * if they are managed with @ref std::shared_ptr. + * if they are managed with std::shared_ptr. **/ template class FutureState : public FutureStateBase { @@ -188,7 +157,7 @@ public: xbt_assert(this->value_); auto result = std::move(this->value_.get()); this->value_ = boost::optional(); - return std::move(result); + return result; } private: @@ -212,7 +181,7 @@ public: xbt_assert(this->value_); T* result = value_; value_ = nullptr; - return *value_; + return *result; } private: @@ -235,14 +204,29 @@ public: } }; +template void bind_promise(Promise&& promise, Future future) +{ + class PromiseBinder { + public: + explicit PromiseBinder(Promise&& promise) : promise_(std::move(promise)) {} + void operator()(Future future) { simgrid::xbt::set_promise(promise_, future); } + + private: + Promise promise_; + }; + future.then_(PromiseBinder(std::move(promise))); +} + +template Future unwrap_future(Future> future); + /** Result of some (probably) asynchronous operation in the SimGrid kernel * * @ref simgrid::simix::Future and @ref simgrid::simix::Future provide an - * abstration for asynchronous stuff happening in the SimGrid kernel. They + * abstraction for asynchronous stuff happening in the SimGrid kernel. They * are based on C++1z futures. * * The future represents a value which will be available at some point when this - * asynchronous operaiont is finished. Alternatively, if this operations fails, + * asynchronous operation is finished. Alternatively, if this operations fails, * the result of the operation might be an exception. * * As the operation is possibly no terminated yet, we cannot get the result @@ -263,7 +247,7 @@ public: * // available: * try { * // Try to get value, this might throw an exception if the operation - * // failed (such as an exception throwed by the worker process): + * // failed (such as an exception thrown by the worker process): * std::string value = result.get(); * XBT_INFO("Value: %s", value.c_str()); * } @@ -274,7 +258,7 @@ public: * ); * * - * This is based on C++1z @ref std::future but with some differences: + * This is based on C++1z std::future but with some differences: * * * there is no thread synchronization (atomic, mutex, condition variable, * etc.) because everything happens in the SimGrid event loop; @@ -284,7 +268,7 @@ public: * * inside the `.then()`, `.get()` can be used; * * * `.get()` can only be used when `.is_ready()` (as everything happens in - * a single-thread, the future would be guaranted to deadlock if `.get()` + * a single-thread, the future would be guaranteed to deadlock if `.get()` * is called when the future is not ready); * * * there is no future chaining support for now (`.then().then()`); @@ -294,8 +278,9 @@ public: template class Future { public: - Future() {} - Future(std::shared_ptr> state): state_(std::move(state)) {} + Future() = default; + explicit Future(std::shared_ptr> state) : state_(std::move(state)) {} + ~Future() = default; // Move type: Future(Future&) = delete; @@ -303,7 +288,8 @@ public: Future(Future&& that) : state_(std::move(that.state_)) {} Future& operator=(Future&& that) { - state_ = std::move(that.stat_); + state_ = std::move(that.state_); + return *this; } /** Whether the future is valid:. @@ -328,36 +314,76 @@ public: return state_ != nullptr && state_->is_ready(); } + /** Attach a continuation to this future + * + * This is like .then() but avoid the creation of a new future. + */ + template + void then_(F continuation) + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + // Give shared-ownership to the continuation: + auto state = std::move(state_); + state->set_continuation(simgrid::xbt::make_task(std::move(continuation), state)); + } + + /** Attach a continuation to this future + * + * This version never does future unwrapping. + */ + template auto then_no_unwrap(F continuation) -> Future + { + typedef decltype(continuation(std::move(*this))) R; + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + auto state = std::move(state_); + // Create a new future... + Promise promise; + Future future = promise.get_future(); + // ...and when the current future is ready... + state->set_continuation(simgrid::xbt::make_task( + [](Promise promise, std::shared_ptr> state, F continuation) { + // ...set the new future value by running the continuation. + Future future(std::move(state)); + simgrid::xbt::fulfill_promise(promise, [&continuation, &future] { return continuation(std::move(future)); }); + }, + std::move(promise), state, std::move(continuation))); + return future; + } + /** Attach a continuation to this future * * The future must be valid in order to make this call. * The continuation is executed when the future becomes ready. * The future becomes invalid after this call. * - * We don't support future chaining for now (`.then().then()`). - * * @param continuation This function is called with a ready future * the future is ready * @exception std::future_error no state is associated with the future */ + template + auto then(F continuation) -> typename std::enable_if::value, + Future>::type + { + return this->then_no_unwrap(std::move(continuation)); + } + + /** Attach a continuation to this future (future chaining) */ template - void then(F continuation) + auto then(F continuation) + -> typename std::enable_if< + is_future::value, + decltype(continuation(std::move(*this))) + >::type { - if (state_ == nullptr) - throw std::future_error(std::future_errc::no_state); - std::unique_ptr ptr = - std::unique_ptr( - new FutureContinuationImpl(state_, std::move(continuation))); - state_->set_continuation(std::move(ptr)); - state_ = nullptr; + return unwrap_future(this->then_no_unwrap(std::move(continuation))); } /** Get the value from the future - * - * This is expected to be called * * The future must be valid and ready in order to make this call. - * @ref std::future blocks when the future is not ready but we are + * std::future blocks when the future is not ready but we are * completely single-threaded so blocking would be a deadlock. * After the call, the future becomes invalid. * @@ -377,19 +403,27 @@ private: std::shared_ptr> state_; }; -/** Producer side of a @simgrid::kernel::Future +template Future unwrap_future(Future> future) +{ + Promise promise; + Future result = promise.get_future(); + bind_promise(std::move(promise), std::move(future)); + return result; +} + +/** Producer side of a @ref simgrid::kernel::Future * * A @ref Promise is connected to some `Future` and can be used to * set its result. * - * Similar to @ref std::promise + * Similar to std::promise * * * // Create a promise and a future: * auto promise = std::make_shared>(); * auto future = promise->get_future(); * - * SIMIX_timer_set(date, [promise] { + * simgrid::simix::Timer::set(date, [promise] { * try { * int value = compute_the_value(); * if (value < 0) @@ -399,8 +433,8 @@ private: * promise.set_value(value); * } * catch (...) { - * // If an error occured, we can set an exception which - * // will be throwed buy future.get(): + * // If an error occurred, we can set an exception which + * // will be thrown by future.get(): * promise.set_exception(std::current_exception()); * } * }); @@ -413,13 +447,13 @@ template class Promise { public: Promise() : state_(std::make_shared>()) {} - Promise(std::shared_ptr> state) : state_(std::move(state)) {} + explicit Promise(std::shared_ptr> state) : state_(std::move(state)) {} // Move type - Promise(Promise&) = delete; - Promise& operator=(Promise&) = delete; + Promise(Promise const&) = delete; + Promise& operator=(Promise const&) = delete; Promise(Promise&& that) : - state_(std::move(that.state_)), future_get_(that.future_set) + state_(std::move(that.state_)), future_get_(that.future_get_) { that.future_get_ = false; } @@ -468,7 +502,7 @@ template<> class Promise { public: Promise() : state_(std::make_shared>()) {} - Promise(std::shared_ptr> state) : state_(std::move(state)) {} + explicit Promise(std::shared_ptr> state) : state_(std::move(state)) {} ~Promise() { if (state_ && state_->get_status() == FutureStatus::not_ready) @@ -477,8 +511,8 @@ public: } // Move type - Promise(Promise&) = delete; - Promise& operator=(Promise&) = delete; + Promise(Promise const&) = delete; + Promise& operator=(Promise const&) = delete; Promise(Promise&& that) : state_(std::move(that.state_)), future_get_(that.future_get_) {