Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2022.
[simgrid.git] / include / simgrid / kernel / future.hpp
1 /* Copyright (c) 2016-2022. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #ifndef SIMGRID_KERNEL_FUTURE_HPP
8 #define SIMGRID_KERNEL_FUTURE_HPP
9
10 #include <functional>
11 #include <future>
12 #include <memory>
13 #include <utility>
14 #include <type_traits>
15
16 #include <boost/optional.hpp>
17
18 #include <xbt/base.h>
19 #include <xbt/functional.hpp>
20 #include <xbt/promise.hpp>
21
22 namespace simgrid {
23 namespace kernel {
24
25 // There are the public classes:
26 template<class T> class Future;
27 template<class T> class Promise;
28
29 // Those are implementation details:
30 enum class FutureStatus;
31 template<class T> class FutureState;
32
33 enum class FutureStatus {
34   not_ready,
35   ready,
36   done,
37 };
38
39 template<class T>
40 struct is_future : std::false_type {};
41 template<class T>
42 struct is_future<Future<T>> : std::true_type {};
43
44 /** Bases stuff for all @ref simgrid::kernel::FutureState<T> */
45 class FutureStateBase {
46 public:
47   // No copy/move:
48   FutureStateBase(FutureStateBase const&) = delete;
49   FutureStateBase& operator=(FutureStateBase const&) = delete;
50
51   XBT_PUBLIC void schedule(simgrid::xbt::Task<void()>&& job) const;
52
53   void set_exception(std::exception_ptr exception)
54   {
55     xbt_assert(exception_ == nullptr);
56     if (status_ != FutureStatus::not_ready)
57       throw std::future_error(std::future_errc::promise_already_satisfied);
58     exception_ = std::move(exception);
59     this->set_ready();
60   }
61
62   void set_continuation(simgrid::xbt::Task<void()>&& continuation)
63   {
64     xbt_assert(not continuation_);
65     switch (status_) {
66     case FutureStatus::done:
67       // This is not supposed to happen if continuation is set
68       // via the Promise:
69       xbt_die("Set continuation on finished future");
70       break;
71     case FutureStatus::ready:
72       // The future is ready, execute the continuation directly.
73       // We might execute it from the event loop instead:
74       schedule(std::move(continuation));
75       break;
76     case FutureStatus::not_ready:
77       // The future is not ready so we must keep the continuation for
78       // executing it later:
79       continuation_ = std::move(continuation);
80       break;
81     default:
82       DIE_IMPOSSIBLE;
83     }
84   }
85
86   FutureStatus get_status() const
87   {
88     return status_;
89   }
90
91   bool is_ready() const
92   {
93     return status_ == FutureStatus::ready;
94   }
95
96 protected:
97   FutureStateBase() = default;
98   ~FutureStateBase() = default;
99
100   /** Set the future as ready and trigger the continuation */
101   void set_ready()
102   {
103     status_ = FutureStatus::ready;
104     if (continuation_) {
105       // We unregister the continuation before executing it.
106       // We need to do this because the current implementation of the
107       // continuation has a shared_ptr to the FutureState.
108       auto continuation = std::move(continuation_);
109       this->schedule(std::move(continuation));
110     }
111   }
112
113   /** Set the future as done and raise an exception if any
114    *
115    *  This does half the job of `.get()`.
116    **/
117   void resolve()
118   {
119     xbt_assert(status_ == FutureStatus::ready, "Deadlock: this future is not ready");
120     status_ = FutureStatus::done;
121     if (exception_) {
122       std::exception_ptr exception = std::move(exception_);
123       exception_ = nullptr;
124       std::rethrow_exception(std::move(exception));
125     }
126   }
127
128 private:
129   FutureStatus status_ = FutureStatus::not_ready;
130   std::exception_ptr exception_;
131   simgrid::xbt::Task<void()> continuation_;
132 };
133
134 /** Shared state for future and promises
135  *
136  *  You are not expected to use them directly but to create them
137  *  implicitly through a @ref simgrid::kernel::Promise.
138  *  Alternatively kernel operations could inherit or contain FutureState
139  *  if they are managed with std::shared_ptr.
140  **/
141 template<class T>
142 class FutureState : public FutureStateBase {
143 public:
144   void set_value(T value)
145   {
146     if (this->get_status() != FutureStatus::not_ready)
147       throw std::future_error(std::future_errc::promise_already_satisfied);
148     value_ = std::move(value);
149     this->set_ready();
150   }
151
152   T get()
153   {
154     this->resolve();
155     xbt_assert(this->value_);
156     auto result = std::move(this->value_.get());
157     this->value_ = boost::optional<T>();
158     return result;
159   }
160
161 private:
162   boost::optional<T> value_;
163 };
164
165 template<class T>
166 class FutureState<T&> : public FutureStateBase {
167 public:
168   void set_value(T& value)
169   {
170     if (this->get_status() != FutureStatus::not_ready)
171       throw std::future_error(std::future_errc::promise_already_satisfied);
172     value_ = &value;
173     this->set_ready();
174   }
175
176   T& get()
177   {
178     this->resolve();
179     xbt_assert(this->value_);
180     T* result = value_;
181     value_ = nullptr;
182     return *result;
183   }
184
185 private:
186   T* value_ = nullptr;
187 };
188
189 template<>
190 class FutureState<void> : public FutureStateBase {
191 public:
192   void set_value()
193   {
194     if (this->get_status() != FutureStatus::not_ready)
195       throw std::future_error(std::future_errc::promise_already_satisfied);
196     this->set_ready();
197   }
198
199   void get()
200   {
201     this->resolve();
202   }
203 };
204
205 template <class T> void bind_promise(Promise<T>&& promise, Future<T> future)
206 {
207   class PromiseBinder {
208   public:
209     explicit PromiseBinder(Promise<T>&& promise) : promise_(std::move(promise)) {}
210     void operator()(Future<T> future) { simgrid::xbt::set_promise(promise_, future); }
211
212   private:
213     Promise<T> promise_;
214   };
215   future.then_(PromiseBinder(std::move(promise)));
216 }
217
218 template <class T> Future<T> unwrap_future(Future<Future<T>> future);
219
220 /** Result of some (probably) asynchronous operation in the SimGrid kernel
221  *
222  * @ref simgrid::simix::Future and @ref simgrid::simix::Future provide an
223  * abstraction for asynchronous stuff happening in the SimGrid kernel. They
224  * are based on C++1z futures.
225  *
226  * The future represents a value which will be available at some point when this
227  * asynchronous operation is finished. Alternatively, if this operations fails,
228  * the result of the operation might be an exception.
229  *
230  *  As the operation is possibly no terminated yet, we cannot get the result
231  *  yet. Moreover, as we cannot block in the SimGrid kernel we cannot wait for
232  *  it. However, we can attach some code/callback/continuation which will be
233  *  executed when the operation terminates.
234  *
235  *  Example of the API (`simgrid::kernel::createProcess` does not exist):
236  *  <pre>
237  *  // Create a new process using the Worker code, this process returns
238  *  // a std::string:
239  *  simgrid::kernel::Future<std::string> future =
240  *     simgrid::kernel::createProcess("worker42", host, Worker(42));
241  *  // At this point, we just created the process so the result is not available.
242  *  // However, we can attach some work do be done with this result:
243  *  future.then([](simgrid::kernel::Future<std::string> result) {
244  *    // This code is called when the operation is completed so the result is
245  *    // available:
246  *    try {
247  *      // Try to get value, this might throw an exception if the operation
248  *      // failed (such as an exception thrown by the worker process):
249  *      std::string value = result.get();
250  *      XBT_INFO("Value: %s", value.c_str());
251  *    }
252  *    catch(std::exception& e) {
253  *      // This is an exception from the asynchronous operation:
254  *      XBT_INFO("Error: %e", e.what());
255  *    }
256  *  );
257  *  </pre>
258  *
259  *  This is based on C++1z std::future but with some differences:
260  *
261  *  * there is no thread synchronization (atomic, mutex, condition variable,
262  *    etc.) because everything happens in the SimGrid event loop;
263  *
264  *  * it is purely asynchronous, you are expected to use `.then()`;
265  *
266  *  * inside the `.then()`, `.get()` can be used;
267  *
268  *  * `.get()` can only be used when `.is_ready()` (as everything happens in
269  *     a single-thread, the future would be guaranteed to deadlock if `.get()`
270  *     is called when the future is not ready);
271  *
272  *  * there is no future chaining support for now (`.then().then()`);
273  *
274  *  * there is no sharing (`shared_future`) for now.
275  */
276 template<class T>
277 class Future {
278 public:
279   Future() = default;
280   explicit Future(std::shared_ptr<FutureState<T>> state) : state_(std::move(state)) {}
281
282   // Move type:
283   Future(Future&) = delete;
284   Future& operator=(const Future&) = delete;
285   Future(Future&&) noexcept        = default;
286   Future& operator=(Future&&) noexcept = default;
287
288   /** Whether the future is valid:.
289    *
290    *  A future which as been used (`.then` of `.get`) becomes invalid.
291    *
292    *  We can use `.then` on a valid future.
293    */
294   bool valid() const
295   {
296     return state_ != nullptr;
297   }
298
299   /** Whether the future is ready
300    *
301    *  A future is ready when it has an associated value or exception.
302    *
303    *  We can use `.get()` on ready futures.
304    **/
305   bool is_ready() const
306   {
307     return state_ != nullptr && state_->is_ready();
308   }
309
310   /** Attach a continuation to this future
311    *
312    *  This is like .then() but avoid the creation of a new future.
313    */
314   template<class F>
315   void then_(F continuation)
316   {
317     if (state_ == nullptr)
318       throw std::future_error(std::future_errc::no_state);
319     // Give shared-ownership to the continuation:
320     auto state = std::move(state_);
321     state->set_continuation(simgrid::xbt::make_task(std::move(continuation), state));
322   }
323
324   /** Attach a continuation to this future
325    *
326    *  This version never does future unwrapping.
327    */
328   template <class F> auto then_no_unwrap(F continuation) -> Future<decltype(continuation(std::move(*this)))>
329   {
330     using R = decltype(continuation(std::move(*this)));
331     if (state_ == nullptr)
332       throw std::future_error(std::future_errc::no_state);
333     auto state = std::move(state_);
334     // Create a new future...
335     Promise<R> promise;
336     Future<R> future = promise.get_future();
337     // ...and when the current future is ready...
338     state->set_continuation(simgrid::xbt::make_task(
339         [](Promise<R> promise, std::shared_ptr<FutureState<T>> state, F continuation) {
340           // ...set the new future value by running the continuation.
341           Future<T> future(std::move(state));
342           simgrid::xbt::fulfill_promise(promise, [&continuation, &future] { return continuation(std::move(future)); });
343         },
344         std::move(promise), state, std::move(continuation)));
345     return future;
346   }
347
348   /** Attach a continuation to this future
349    *
350    *  The future must be valid in order to make this call.
351    *  The continuation is executed when the future becomes ready.
352    *  The future becomes invalid after this call.
353    *
354    * @param continuation This function is called with a ready future
355    *                     the future is ready
356    * @exception std::future_error no state is associated with the future
357    */
358   template <class F>
359   auto then(F continuation) -> typename std::enable_if_t<not is_future<decltype(continuation(std::move(*this)))>::value,
360                                                          Future<decltype(continuation(std::move(*this)))>>
361   {
362     return this->then_no_unwrap(std::move(continuation));
363   }
364
365   /** Attach a continuation to this future (future chaining) */
366   template <class F>
367   auto then(F continuation) -> typename std::enable_if_t<is_future<decltype(continuation(std::move(*this)))>::value,
368                                                          decltype(continuation(std::move(*this)))>
369   {
370     return unwrap_future(this->then_no_unwrap(std::move(continuation)));
371   }
372
373   /** Get the value from the future
374    *
375    *  The future must be valid and ready in order to make this call.
376    *  std::future blocks when the future is not ready but we are
377    *  completely single-threaded so blocking would be a deadlock.
378    *  After the call, the future becomes invalid.
379    *
380    *  @return                      value of the future
381    *  @exception any               Exception from the future
382    *  @exception std::future_error no state is associated with the future
383    */
384   T get()
385   {
386     if (state_ == nullptr)
387       throw std::future_error(std::future_errc::no_state);
388     std::shared_ptr<FutureState<T>> state = std::move(state_);
389     return state->get();
390   }
391
392 private:
393   std::shared_ptr<FutureState<T>> state_;
394 };
395
396 template <class T> Future<T> unwrap_future(Future<Future<T>> future)
397 {
398   Promise<T> promise;
399   Future<T> result = promise.get_future();
400   bind_promise(std::move(promise), std::move(future));
401   return result;
402 }
403
404 /** Producer side of a @ref simgrid::kernel::Future
405  *
406  *  A @ref Promise is connected to some `Future` and can be used to
407  *  set its result.
408  *
409  *  Similar to std::promise
410  *
411  *  <code>
412  *  // Create a promise and a future:
413  *  auto promise = std::make_shared<simgrid::kernel::Promise<T>>();
414  *  auto future = promise->get_future();
415  *
416  *  simgrid::kernel::timer::Timer::set(date, [promise] {
417  *    try {
418  *      int value = compute_the_value();
419  *      if (value < 0)
420  *        throw std::logic_error("Bad value");
421  *      // Whenever the operation is completed, we set the value
422  *      // for the future:
423  *      promise.set_value(value);
424  *    }
425  *    catch (...) {
426  *      // If an error occurred, we can set an exception which
427  *      // will be thrown by future.get():
428  *      promise.set_exception(std::current_exception());
429  *    }
430  *  });
431  *
432  *  // Return the future to the caller:
433  *  return future;
434  *  </code>
435  **/
436 template<class T>
437 class Promise {
438 public:
439   Promise() = default;
440   explicit Promise(std::shared_ptr<FutureState<T>> state) : state_(std::move(state)) {}
441
442   // Move type
443   Promise(Promise const&) = delete;
444   Promise& operator=(Promise const&) = delete;
445   Promise(Promise&& that) noexcept : state_(std::move(that.state_)) { std::swap(future_get_, that.future_get_); }
446
447   Promise& operator=(Promise&& that) noexcept
448   {
449     this->state_ = std::move(that.state_);
450     this->future_get_ = that.future_get_;
451     that.future_get_ = false;
452     return *this;
453   }
454   Future<T> get_future()
455   {
456     if (state_ == nullptr)
457       throw std::future_error(std::future_errc::no_state);
458     if (future_get_)
459       throw std::future_error(std::future_errc::future_already_retrieved);
460     future_get_ = true;
461     return Future<T>(state_);
462   }
463   void set_value(T value)
464   {
465     if (state_ == nullptr)
466       throw std::future_error(std::future_errc::no_state);
467     state_->set_value(std::move(value));
468   }
469   void set_exception(std::exception_ptr exception)
470   {
471     if (state_ == nullptr)
472       throw std::future_error(std::future_errc::no_state);
473     state_->set_exception(std::move(exception));
474   }
475   ~Promise()
476   {
477     if (state_ && state_->get_status() == FutureStatus::not_ready)
478       state_->set_exception(std::make_exception_ptr(
479         std::future_error(std::future_errc::broken_promise)));
480   }
481
482 private:
483   std::shared_ptr<FutureState<T>> state_ = std::make_shared<FutureState<T>>();
484   bool future_get_ = false;
485 };
486
487 template<>
488 class Promise<void> {
489 public:
490   Promise() = default;
491   explicit Promise(std::shared_ptr<FutureState<void>> state) : state_(std::move(state)) {}
492   ~Promise()
493   {
494     if (state_ && state_->get_status() == FutureStatus::not_ready)
495       state_->set_exception(std::make_exception_ptr(
496         std::future_error(std::future_errc::broken_promise)));
497   }
498
499   // Move type
500   Promise(Promise const&) = delete;
501   Promise& operator=(Promise const&) = delete;
502   Promise(Promise&& that) noexcept : state_(std::move(that.state_)) { std::swap(future_get_, that.future_get_); }
503   Promise& operator=(Promise&& that) noexcept
504   {
505     this->state_ = std::move(that.state_);
506     this->future_get_ = that.future_get_;
507     that.future_get_ = false;
508     return *this;
509   }
510
511   Future<void> get_future()
512   {
513     if (state_ == nullptr)
514       throw std::future_error(std::future_errc::no_state);
515     if (future_get_)
516       throw std::future_error(std::future_errc::future_already_retrieved);
517     future_get_ = true;
518     return Future<void>(state_);
519   }
520   void set_value() const
521   {
522     if (state_ == nullptr)
523       throw std::future_error(std::future_errc::no_state);
524     state_->set_value();
525   }
526   void set_exception(std::exception_ptr exception) const
527   {
528     if (state_ == nullptr)
529       throw std::future_error(std::future_errc::no_state);
530     state_->set_exception(std::move(exception));
531   }
532
533 private:
534   std::shared_ptr<FutureState<void>> state_ = std::make_shared<FutureState<void>>();
535   bool future_get_ = false;
536 };
537
538 }
539 }
540
541 #endif