From: Gabriel Corona Date: Tue, 21 Jun 2016 11:07:30 +0000 (+0200) Subject: Future chaining X-Git-Tag: v3_14~898 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/c3afdfa7a603897d1fbca56e7d1705983038405e?hp=b34a0fc8695b7a1556c9b933400bbe6efb068020 Future chaining --- diff --git a/include/simgrid/kernel/future.hpp b/include/simgrid/kernel/future.hpp index 9b6a5fc88b..cedfd39c96 100644 --- a/include/simgrid/kernel/future.hpp +++ b/include/simgrid/kernel/future.hpp @@ -7,6 +7,9 @@ #ifndef SIMGRID_KERNEL_FUTURE_HPP #define SIMGRID_KERNEL_FUTURE_HPP +#include +#include + #include #include @@ -35,6 +38,11 @@ enum class FutureStatus { done, }; +template +struct is_future : public std::integral_constant {}; +template +struct is_future> : public std::integral_constant {}; + /** Bases stuff for all @ref simgrid::kernel::FutureState */ class FutureStateBase { public: @@ -195,6 +203,24 @@ public: } }; +template +void bindPromise(Promise promise, Future future) +{ + struct PromiseBinder { + public: + PromiseBinder(Promise promise) : promise_(std::move(promise)) {} + void operator()(Future future) + { + simgrid::xbt::setPromise(promise_, future); + } + private: + Promise promise_; + }; + future.then_(PromiseBinder(std::move(promise))); +} + +template Future unwrapFuture(Future> future); + /** Result of some (probably) asynchronous operation in the SimGrid kernel * * @ref simgrid::simix::Future and @ref simgrid::simix::Future provide an @@ -289,27 +315,78 @@ public: return state_ != nullptr && state_->is_ready(); } + /** Attach a continuation to this future + * + * This is like .then() but avoid the creation of a new future. + */ + template + void then_(F continuation) + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + // Give shared-ownership to the continuation: + auto state = std::move(state_); + state->set_continuation(simgrid::xbt::makeTask( + std::move(continuation), state)); + } + + /** Attach a continuation to this future + * + * This version never does future unwrapping. + */ + template + auto thenNoUnwrap(F continuation) + -> Future + { + typedef decltype(continuation(std::move(*this))) R; + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + auto state = std::move(state_); + // Create a new future... + Promise promise; + Future future = promise.get_future(); + // ...and when the current future is ready... + state->set_continuation(simgrid::xbt::makeTask( + [](Promise promise, std::shared_ptr> state, F continuation) { + // ...set the new future value by running the continuation. + Future future(std::move(state)); + simgrid::xbt::fulfillPromise(promise,[&]{ + return continuation(std::move(future)); + }); + }, + std::move(promise), state, std::move(continuation))); + return std::move(future); + } + /** Attach a continuation to this future * * The future must be valid in order to make this call. * The continuation is executed when the future becomes ready. * The future becomes invalid after this call. * - * We don't support future chaining for now (`.then().then()`). - * * @param continuation This function is called with a ready future * the future is ready * @exception std::future_error no state is associated with the future */ template - void then(F continuation) + auto then(F continuation) + -> typename std::enable_if< + !is_future::value, + Future + >::type { - if (state_ == nullptr) - throw std::future_error(std::future_errc::no_state); - // Give shared-ownership to the continuation: - auto state = std::move(state_); - state->set_continuation(simgrid::xbt::makeTask( - std::move(continuation), state)); + return this->thenNoUnwrap(std::move(continuation)); + } + + /** Attach a continuation to this future (future chaining) */ + template + auto then(F continuation) + -> typename std::enable_if< + is_future::value, + decltype(continuation(std::move(*this))) + >::type + { + return unwrapFuture(this->thenNoUnwap(std::move(continuation))); } /** Get the value from the future @@ -337,6 +414,15 @@ private: std::shared_ptr> state_; }; +template +Future unwrapFuture(Future> future) +{ + Promise promise; + Future result = promise.get_future(); + bindPromise(std::move(promise), std::move(future)); + return std::move(result); +} + /** Producer side of a @simgrid::kernel::Future * * A @ref Promise is connected to some `Future` and can be used to @@ -376,10 +462,10 @@ public: Promise(std::shared_ptr> state) : state_(std::move(state)) {} // Move type - Promise(Promise&) = delete; - Promise& operator=(Promise&) = delete; + Promise(Promise const&) = delete; + Promise& operator=(Promise const&) = delete; Promise(Promise&& that) : - state_(std::move(that.state_)), future_get_(that.future_set) + state_(std::move(that.state_)), future_get_(that.future_get_) { that.future_get_ = false; } @@ -437,8 +523,8 @@ public: } // Move type - Promise(Promise&) = delete; - Promise& operator=(Promise&) = delete; + Promise(Promise const&) = delete; + Promise& operator=(Promise const&) = delete; Promise(Promise&& that) : state_(std::move(that.state_)), future_get_(that.future_get_) { diff --git a/include/simgrid/simix/blocking_simcall.hpp b/include/simgrid/simix/blocking_simcall.hpp index f7f233c046..94774659ca 100644 --- a/include/simgrid/simix/blocking_simcall.hpp +++ b/include/simgrid/simix/blocking_simcall.hpp @@ -57,7 +57,7 @@ auto kernelSync(F code) -> decltype(code().get()) simcall_run_blocking([&result, self, &code]{ try { auto future = code(); - future.then([&result, self](simgrid::kernel::Future value) { + future.then_([&result, self](simgrid::kernel::Future value) { simgrid::xbt::setPromise(result, value); simgrid::simix::unblock(self); }); @@ -92,7 +92,7 @@ public: simcall_run_blocking([this, &result, self]{ try { // When the kernel future is ready... - this->future_.then([this, &result, self](simgrid::kernel::Future value) { + this->future_.then_([this, &result, self](simgrid::kernel::Future value) { // ... wake up the process with the result of the kernel future. simgrid::xbt::setPromise(result, value); simgrid::simix::unblock(self); @@ -122,7 +122,7 @@ public: simcall_run_blocking([this, &exception, self]{ try { // When the kernel future is ready... - this->future_.then([this, self](simgrid::kernel::Future value) { + this->future_.then_([this, self](simgrid::kernel::Future value) { // ...store it the simix kernel and wake up. this->future_ = std::move(value); simgrid::simix::unblock(self); diff --git a/include/xbt/functional.hpp b/include/xbt/functional.hpp index d9146a3f0a..e4ab77636a 100644 --- a/include/xbt/functional.hpp +++ b/include/xbt/functional.hpp @@ -205,27 +205,29 @@ public: } }; +template +class TaskImpl { +private: + F code_; + std::tuple args_; + typedef decltype(simgrid::xbt::apply(std::move(code_), std::move(args_))) result_type; +public: + TaskImpl(F code, std::tuple args) : + code_(std::move(code)), + args_(std::move(args)) + {} + result_type operator()() + { + return simgrid::xbt::apply(std::move(code_), std::move(args_)); + } +}; + template auto makeTask(F code, Args... args) -> Task< decltype(code(std::move(args)...))() > { - typedef decltype(code(std::move(args)...)) result_type; - - class Impl { - private: - F code_; - std::tuple args_; - public: - Impl(F code, std::tuple args) : - code_(std::move(code)), - args_(std::move(args)) {} - result_type operator()() - { - return simgrid::xbt::apply(std::move(code_), std::move(args_)); - } - }; - - return Impl(std::move(code), std::make_tuple(std::move(args)...)); + TaskImpl task(std::move(code), std::make_tuple(std::move(args)...)); + return std::move(task); } } diff --git a/teshsuite/simix/generic_simcalls/generic_simcalls.cpp b/teshsuite/simix/generic_simcalls/generic_simcalls.cpp index 1156e9b9c8..cf42eb708d 100644 --- a/teshsuite/simix/generic_simcalls/generic_simcalls.cpp +++ b/teshsuite/simix/generic_simcalls/generic_simcalls.cpp @@ -18,20 +18,14 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(test, "my log messages"); namespace example { -/** Execute the code in the kernel at some time - * - * @param date when we should execute the code - * @param code code to execute - * @return future with the result of the call - */ -template -auto kernel_defer(double date, F code) -> simgrid::kernel::Future +/** Create a future which becomes ready when the date is reached */ +static +simgrid::kernel::Future kernel_wait_until(double date) { - typedef decltype(code()) T; - auto promise = std::make_shared>(); + auto promise = std::make_shared>(); auto future = promise->get_future(); - SIMIX_timer_set(date, [promise, code] { - simgrid::xbt::fulfillPromise(*promise, std::move(code)); + SIMIX_timer_set(date, [promise] { + promise->set_value(); }); return future; } @@ -47,7 +41,8 @@ static int master(int argc, char *argv[]) // Synchronize on a successful Future: simgrid::simix::kernelSync([&] { - return kernel_defer(10, [] { + return kernel_wait_until(10).then([](simgrid::kernel::Future future) { + future.get(); XBT_INFO("kernelSync with void"); }); }); @@ -56,7 +51,8 @@ static int master(int argc, char *argv[]) // Synchronize on a failing Future: try { simgrid::simix::kernelSync([&] { - return kernel_defer(20, [] { + return kernel_wait_until(20).then([](simgrid::kernel::Future future) { + future.get(); throw std::runtime_error("Exception throwed from kernel_defer"); }); }); @@ -68,7 +64,8 @@ static int master(int argc, char *argv[]) // Synchronize on a successul Future and get the value: int res = simgrid::simix::kernelSync([&] { - return kernel_defer(30, [] { + return kernel_wait_until(30).then([](simgrid::kernel::Future future) { + future.get(); XBT_INFO("kernelSync with value"); return 42; }); @@ -77,7 +74,8 @@ static int master(int argc, char *argv[]) // Synchronize on a successul Future and get the value: simgrid::simix::Future future = simgrid::simix::kernelAsync([&] { - return kernel_defer(50, [] { + return kernel_wait_until(50).then([](simgrid::kernel::Future future) { + future.get(); XBT_INFO("kernelAsync with value"); return 43; }); @@ -87,7 +85,8 @@ static int master(int argc, char *argv[]) // Synchronize on a successul Future and get the value: future = simgrid::simix::kernelAsync([&] { - return kernel_defer(60, [] { + return kernel_wait_until(60).then([](simgrid::kernel::Future future) { + future.get(); XBT_INFO("kernelAsync with value"); return 43; });