From: Gabriel Corona Date: Tue, 31 May 2016 14:36:51 +0000 (+0200) Subject: [simix] Add a `run_blocking` simcall and simix::kernelSync X-Git-Tag: v3_14~971 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/467a0c53018ee489de1dd7ae61a083d52048b8e8 [simix] Add a `run_blocking` simcall and simix::kernelSync * run_blocking() is a generic blocking simcall. It is given a callback which is executed immediately in the SimGrid kernel. The callback is responsible for setting the suitable logic for waking up the process when needed. * simix::kernelSync() is a higher level wrapper for this. It is given a callback which is executed in the kernel SimGrid and returns a simgrid::kernel::Future. The kernel blocks the process until the Future is ready and either the value wrapped in the future to the process or raises the exception stored in the Future in the process. * simgrid::simix::{Future,Promise} provide an abstration for asynchronous stuff happening in the SimGrid kernel. They are based on C++1z futures. --- diff --git a/.gitignore b/.gitignore index 1e766e4c94..c1bdf8aa4f 100644 --- a/.gitignore +++ b/.gitignore @@ -269,6 +269,7 @@ teshsuite/simdag/evaluate-get-route-time/evaluate-get-route-time teshsuite/simdag/evaluate-parse-time/evaluate-parse-time teshsuite/simix/check_defaults/check_defaults teshsuite/simix/stack_overflow/stack_overflow +teshsuite/simix/generic_simcalls/generic_simcalls teshsuite/smpi/bug-17132/bug-17132 teshsuite/smpi/coll-allgather/coll-allgather teshsuite/smpi/coll-allgatherv/coll-allgatherv diff --git a/include/simgrid/kernel/future.hpp b/include/simgrid/kernel/future.hpp new file mode 100644 index 0000000000..a99a155aac --- /dev/null +++ b/include/simgrid/kernel/future.hpp @@ -0,0 +1,514 @@ +/* Copyright (c) 2016. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_KERNEL_FUTURE_HPP +#define SIMGRID_KERNEL_FUTURE_HPP + +#include + +#include + +#include +#include +#include +#include +#include + +namespace simgrid { +namespace kernel { + +// There are the public classes: +template class Future; +template class Promise; + +// Those are implementation details: +enum class FutureStatus; +template class FutureState; +class FutureContinuation; +template class FutureContinuationImpl; + +enum class FutureStatus { + not_ready, + ready, + done, +}; + +/** A continuation attached to a future to be executed when it is ready */ +XBT_PUBLIC_CLASS FutureContinuation { +public: + FutureContinuation() {} + + // No copy: + FutureContinuation(FutureContinuation&) = delete; + FutureContinuation& operator=(FutureContinuation&) = delete; + + virtual ~FutureContinuation() {} + virtual void operator()() = 0; +}; + +/** Default implementation of `FutureContinuation` + * + * @param T value type of the future + * @param F type of the wrapped code/callback/continuation + */ +template +class FutureContinuationImpl : public FutureContinuation { +public: + FutureContinuationImpl(std::shared_ptr> ptr, F callback) + : ptr_(std::move(ptr)), callback_(std::move(callback)) {} + ~FutureContinuationImpl() override {} + void operator()() override + { + try { + callback_(Future(ptr_)); + } + // Those exceptions are lost. + // If we want to implement callback chaining, we'll have to catch them and + // foward them to the next future. + catch (...) { + // We could log this. + } + } +private: + std::shared_ptr> ptr_; + F callback_; +}; + +/** Bases stuff for all @ref simgrid::kernel::FutureState */ +class FutureStateBase { +public: + // No copy/move: + FutureStateBase(FutureStateBase const&) = delete; + FutureStateBase& operator=(FutureStateBase const&) = delete; + + void set_exception(std::exception_ptr exception) + { + xbt_assert(exception_ == nullptr); + if (status_ != FutureStatus::not_ready) + throw std::future_error(std::future_errc::promise_already_satisfied); + exception_ = std::move(exception); + this->set_ready(); + } + + void set_continuation(std::unique_ptr continuation) + { + xbt_assert(!continuation_); + switch (status_) { + case FutureStatus::done: + // This is not supposed to happen if continuation is set + // via the Promise: + xbt_die("Set continuation on finished future"); + break; + case FutureStatus::ready: + // The future is ready, execute the continuation directly. + // We might execute it from the event loop instead: + (*continuation)(); + break; + case FutureStatus::not_ready: + // The future is not ready so we mast keep the continuation for + // executing it later: + continuation_ = std::move(continuation); + break; + } + } + + FutureStatus get_status() const + { + return status_; + } + + bool is_ready() const + { + return status_ == FutureStatus::ready; + } + +protected: + FutureStateBase() {} + ~FutureStateBase() {}; + + /** Set the future as ready and trigger the continuation */ + void set_ready() + { + status_ = FutureStatus::ready; + if (continuation_) { + // We unregister the continuation before executing it. + // We need to do this becase the current implementation of the + // continuation has a shared_ptr to the FutureState. + auto continuation = std::move(continuation_); + (*continuation)(); + } + } + + /** Set the future as done and raise an exception if any + * + * This does half the job of `.get()`. + **/ + void resolve() + { + if (status_ != FutureStatus::ready) + xbt_die("Deadlock: this future is not ready"); + status_ = FutureStatus::done; + if (exception_) { + std::exception_ptr exception = std::move(exception_); + std::rethrow_exception(std::move(exception)); + } + } + +private: + FutureStatus status_ = FutureStatus::not_ready; + std::exception_ptr exception_; + std::unique_ptr continuation_; +}; + +/** Shared state for future and promises + * + * You are not expected to use them directly but to create them + * implicitely through a @ref simgrid::kernel::Promise. + * Alternatively kernel operations could inherit or contain FutureState + * if they are managed with @ref std::shared_ptr. + **/ +template +class FutureState : public FutureStateBase { +public: + + void set_value(T value) + { + if (this->get_status() != FutureStatus::not_ready) + throw std::future_error(std::future_errc::promise_already_satisfied); + value_ = std::move(value); + this->set_ready(); + } + + T get() + { + this->resolve(); + xbt_assert(this->value_); + auto result = std::move(this->value_.get()); + this->value_ = boost::optional(); + return std::move(result); + } + +private: + boost::optional value_; +}; + +template +class FutureState : public FutureStateBase { +public: + void set_value(T& value) + { + if (this->get_status() != FutureStatus::not_ready) + throw std::future_error(std::future_errc::promise_already_satisfied); + value_ = &value; + this->set_ready(); + } + + T& get() + { + this->resolve(); + xbt_assert(this->value_); + T* result = value_; + value_ = nullptr; + return *value_; + } + +private: + T* value_ = nullptr; +}; + +template<> +class FutureState : public FutureStateBase { +public: + void set_value() + { + if (this->get_status() != FutureStatus::not_ready) + throw std::future_error(std::future_errc::promise_already_satisfied); + this->set_ready(); + } + + void get() + { + this->resolve(); + } +}; + +/** Result of some (possibly ongoing, asynchronous) operation in the SimGrid kernel + * + * As the operation may not be completed yet, the result might be an exception. + * + * Example of the API (`simgrid::kernel::createProcess` does not exist): + *
+ *  // Create a new process using the Worker code, this process returns
+ *  // a std::string:
+ *  simgrid::kernel::Future future =
+ *     simgrid::kernel::createProcess("worker42", host, Worker(42));
+ *  // At this point, we just created the process so the result is not available.
+ *  // However, we can attach some work do be done with this result:
+ *  future.then([](simgrid::kernel::Future result) {
+ *    // This code is called when the operation is completed so the result is
+ *    // available:
+ *    try {
+ *      // Try to get value, this might throw an exception if the operation
+ *      // failed (such as an exception throwed by the worker process):
+ *      std::string value = result.get();
+ *      XBT_INFO("Value: %s", value.c_str());
+ *    }
+ *    catch(std::exception& e) {
+ *      // This is an exception from the asynchronous operation:
+ *      XBT_INFO("Error: %e", e.what());
+ *    }
+ *  );
+ *  
+ * + * This is based on C++1z @ref std::future but with some differences: + * + * * there is no thread synchronization (atomic, mutex, condition variable, + * etc.) because everything happens in the SimGrid event loop; + * + * * it is purely asynchronous, you are expected to use `.then()`; + * + * * inside the `.then()`, `.get()` can be used; + * + * * `.get()` can only be used when `.is_ready()` (as everything happens in + * a single-thread, the future would be guaranted to deadlock if `.get()` + * is called when the future is not ready); + * + * * there is no future chaining support for now (`.then().then()`); + * + * * there is no sharing (`shared_future`) for now. + */ +template +class Future { +public: + Future() {} + Future(std::shared_ptr> state): state_(std::move(state)) {} + + // Move type: + Future(Future&) = delete; + Future& operator=(Future&) = delete; + Future(Future&& that) : state_(std::move(that.state_)) {} + Future& operator=(Future&& that) + { + state_ = std::move(that.stat_); + } + + /** Whether the future is valid:. + * + * A future which as been used (`.then` of `.get`) becomes invalid. + * + * We can use `.then` on a valid future. + */ + bool valid() const + { + return state_ != nullptr; + } + + /** Whether the future is ready + * + * A future is ready when it has an associated value or exception. + * + * We can use `.get()` on ready futures. + **/ + bool is_ready() const + { + return state_ != nullptr && state_->is_ready(); + } + + /** Attach a continuation to this future + * + * The future must be valid in order to make this call. + * The continuation is executed when the future becomes ready. + * The future becomes invalid after this call. + * + * We don't support future chaining for now (`.then().then()`). + * + * @param continuation This function is called with a ready future + * the future is ready + * @exception std::future_error no state is associated with the future + */ + template + void then(F continuation) + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + std::unique_ptr ptr = + std::unique_ptr( + new FutureContinuationImpl(state_, std::move(continuation))); + state_->set_continuation(std::move(ptr)); + state_ = nullptr; + } + + /** Get the value from the future + * + * This is expected to be called + * + * The future must be valid and ready in order to make this call. + * @ref std::future blocks when the future is not ready but we are + * completely single-threaded so blocking would be a deadlock. + * After the call, the future becomes invalid. + * + * @return value of the future + * @exception any Exception from the future + * @exception std::future_error no state is associated with the future + */ + T get() + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + std::shared_ptr> state = std::move(state_); + return state->get(); + } + +private: + std::shared_ptr> state_; +}; + +/** Producer side of a @simgrid::kernel::Future + * + * A @ref Promise is connected to some `Future` and can be used to + * set its result. + * + * Similar to @ref std::promise + * + * + * // Create a promise and a future: + * auto promise = std::make_shared>(); + * auto future = promise->get_future(); + * + * SIMIX_timer_set(date, [promise] { + * try { + * int value = compute_the_value(); + * if (value < 0) + * throw std::logic_error("Bad value"); + * // Whenever the operation is completed, we set the value + * // for the future: + * promise.set_value(value); + * } + * catch (...) { + * // If an error occured, we can set an exception which + * // will be throwed buy future.get(): + * promise.set_exception(std::current_exception()); + * } + * }); + * + * // Return the future to the caller: + * return future; + * + **/ +template +class Promise { +public: + Promise() : state_(std::make_shared>()) {} + Promise(std::shared_ptr> state) : state_(std::move(state)) {} + + // Move type + Promise(Promise&) = delete; + Promise& operator=(Promise&) = delete; + Promise(Promise&& that) : + state_(std::move(that.state_)), future_get_(that.future_set) + { + that.future_get_ = false; + } + + Promise& operator=(Promise&& that) + { + this->state_ = std::move(that.state_); + this->future_get_ = that.future_get_; + that.future_get_ = false; + return *this; + } + Future get_future() + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + if (future_get_) + throw std::future_error(std::future_errc::future_already_retrieved); + future_get_ = true; + return Future(state_); + } + void set_value(T value) + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + state_->set_value(std::move(value)); + } + void set_exception(std::exception_ptr exception) + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + state_->set_exception(std::move(exception)); + } + ~Promise() + { + if (state_ && state_->get_status() == FutureStatus::not_ready) + state_->set_exception(std::make_exception_ptr( + std::future_error(std::future_errc::broken_promise))); + } + +private: + std::shared_ptr> state_; + bool future_get_ = false; +}; + +template<> +class Promise { +public: + Promise() : state_(std::make_shared>()) {} + Promise(std::shared_ptr> state) : state_(std::move(state)) {} + ~Promise() + { + if (state_ && state_->get_status() == FutureStatus::not_ready) + state_->set_exception(std::make_exception_ptr( + std::future_error(std::future_errc::broken_promise))); + } + + // Move type + Promise(Promise&) = delete; + Promise& operator=(Promise&) = delete; + Promise(Promise&& that) : + state_(std::move(that.state_)), future_get_(that.future_get_) + { + that.future_get_ = false; + } + Promise& operator=(Promise&& that) + { + this->state_ = std::move(that.state_); + this->future_get_ = that.future_get_; + that.future_get_ = false; + return *this; + } + + Future get_future() + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + if (future_get_) + throw std::future_error(std::future_errc::future_already_retrieved); + future_get_ = true; + return Future(state_); + } + void set_value() + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + state_->set_value(); + } + void set_exception(std::exception_ptr exception) + { + if (state_ == nullptr) + throw std::future_error(std::future_errc::no_state); + state_->set_exception(std::move(exception)); + } + +private: + std::shared_ptr> state_; + bool future_get_ = false; +}; + +} +} + +#endif diff --git a/include/simgrid/simix.hpp b/include/simgrid/simix.hpp index 901081aa1b..ad48794a4b 100644 --- a/include/simgrid/simix.hpp +++ b/include/simgrid/simix.hpp @@ -9,13 +9,11 @@ #include -#include #include #include #include #include #include -#include #include #include @@ -23,14 +21,21 @@ #include XBT_PUBLIC(void) simcall_run_kernel(std::function const& code); +XBT_PUBLIC(void) simcall_run_blocking(std::function const& code); template inline void simcall_run_kernel(F& f) { simcall_run_kernel(std::function(std::ref(f))); } +template inline +void simcall_run_blocking(F& f) +{ + simcall_run_blocking(std::function(std::ref(f))); +} namespace simgrid { + namespace simix { /** Execute some code in the kernel/maestro @@ -191,7 +196,13 @@ XBT_PUBLIC(smx_process_t) simcall_process_create(const char *name, xbt_dict_t properties, int auto_restart); -XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, std::function callback); +XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, std::packaged_task callback); + +template inline +XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, F callback) +{ + return SIMIX_timer_set(date, std::packaged_task(std::move(callback))); +} template inline XBT_PUBLIC(smx_timer_t) SIMIX_timer_set(double date, R(*callback)(T*), T* arg) diff --git a/include/simgrid/simix/blocking_simcall.hpp b/include/simgrid/simix/blocking_simcall.hpp new file mode 100644 index 0000000000..aa85620fdb --- /dev/null +++ b/include/simgrid/simix/blocking_simcall.hpp @@ -0,0 +1,74 @@ +/* Copyright (c) 2016. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_SIMIX_BLOCKING_SIMCALL_HPP +#define SIMGRID_SIMIX_BLOCKING_SIMCALL_HPP + +#include + +#include + +#include + +#include + +#include +#include +#include + +XBT_PUBLIC(void) simcall_run_blocking(std::function const& code); + +namespace simgrid { +namespace simix { + +XBT_PUBLIC(void) unblock(smx_process_t process); + +/** Execute some code in kernel mode and wakes up the process when + * the result is available. + * + * The code given is executed in SimGrid kernel and expected to return + * a `simgrid::kernel::Future`. The current process is resumed whenever + * the Future becomes ready and gets the value or exception of the future: + * + * This can be used to implement blocking calls in without adding new simcalls. + * One downside of this approach is that we don't have any semantic on what + * the process is waiting. This might be a problem for the model-checker and + * we'll have to device a way to make it work. + * + * @param code Kernel code returning a `simgrid::kernel::Future` + * @return Value of the kernel future + * @exception Exception from the kernel future + */ +template +auto blocking_simcall(F code) -> decltype(code().get()) +{ + typedef decltype(code().get()) T; + if (SIMIX_is_maestro()) + xbt_die("Can't execute blocking call in kernel mode"); + + smx_process_t self = SIMIX_process_self(); + simgrid::xbt::Result result; + + simcall_run_blocking([&result, self, &code]{ + try { + auto future = code(); + future.then([&result, self](simgrid::kernel::Future value) { + simgrid::xbt::setPromise(result, value); + simgrid::simix::unblock(self); + }); + } + catch (...) { + result.set_exception(std::current_exception()); + simgrid::simix::unblock(self); + } + }); + return result.get(); +} + +} +} + +#endif diff --git a/include/xbt/future.hpp b/include/xbt/future.hpp index 81c88453e6..b7e52e0a8c 100644 --- a/include/xbt/future.hpp +++ b/include/xbt/future.hpp @@ -11,6 +11,9 @@ #include #include +#include + +#include namespace simgrid { namespace xbt { @@ -172,9 +175,17 @@ public: }; /** Execute some code and set a promise or result accordingly + * + * Roughly this does: + * + *
+ *  promise.set_value(code());
+ *  
+ * + * but it takes care of exceptions and works with void. * * We might need this when working with generic code because - * the trivial implementation does not work with void (before C++1z). + * the trivial implementation does not work with `void` (before C++1z). * * @param code What we want to do * @param promise Where to want to store the result @@ -184,7 +195,7 @@ auto fulfillPromise(R& promise, F&& code) -> decltype(promise.set_value(code())) { try { - promise.set_value(code()); + promise.set_value(std::forward(code)()); } catch(...) { promise.set_exception(std::current_exception()); @@ -196,7 +207,7 @@ auto fulfillPromise(P& promise, F&& code) -> decltype(promise.set_value()) { try { - (code)(); + std::forward(code)(); promise.set_value(); } catch(...) { @@ -204,6 +215,26 @@ auto fulfillPromise(P& promise, F&& code) } } +/** Set a promise/result from a future/resul + * + * Roughly this does: + * + *
promise.set_value(future);
+ * + * but it takes care of exceptions and works with `void`. + * + * We might need this when working with generic code because + * the trivial implementation does not work with `void` (before C++1z). + * + * @param promise output (a valid future or a result) + * @param future input (a ready/waitable future or a valid result) + */ +template inline +void setPromise(P& promise, F&& future) +{ + fulfillPromise(promise, [&]{ return std::forward(future).get(); }); +} + } } diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index e7317084f5..e91ebb83db 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -17,13 +17,14 @@ #include +#include + #include "src/mc/mc_replay.h" #include "smx_private.h" #include "src/mc/mc_forward.hpp" #include "xbt/ex.h" #include "mc/mc.h" #include "src/simix/smx_host_private.h" - #include "src/simix/SynchroComm.hpp" #include @@ -1093,6 +1094,11 @@ void simcall_run_kernel(std::function const& code) return simcall_BODY_run_kernel(&code); } +void simcall_run_blocking(std::function const& code) +{ + return simcall_BODY_run_blocking(&code); +} + int simcall_mc_random(int min, int max) { return simcall_BODY_mc_random(min, max); } @@ -1103,3 +1109,15 @@ int simcall_mc_random(int min, int max) { const char *SIMIX_simcall_name(e_smx_simcall_t kind) { return simcall_names[kind]; } + +namespace simgrid { +namespace simix { + +void unblock(smx_process_t process) +{ + xbt_assert(SIMIX_is_maestro()); + SIMIX_simcall_answer(&process->simcall); +} + +} +} \ No newline at end of file diff --git a/src/simix/popping.cpp b/src/simix/popping.cpp index 8e938734d8..d5cabef667 100644 --- a/src/simix/popping.cpp +++ b/src/simix/popping.cpp @@ -44,3 +44,16 @@ void SIMIX_run_kernel(std::function const* code) { (*code)(); } + +/** Kernel code for run_blocking + * + * This looks a lot like SIMIX_run_kernel ^^ + * + * However, this `run_blocking` is blocking so the process will not be woken + * up until `SIMIX_simcall_answer(simcall)`` is called by the kernel. + * This means that `code` is responsible for doing this. + */ +void SIMIX_run_blocking(std::function const* code) +{ + (*code)(); +} diff --git a/src/simix/popping_accessors.h b/src/simix/popping_accessors.h index 5e913d9bf4..32e906ec8f 100644 --- a/src/simix/popping_accessors.h +++ b/src/simix/popping_accessors.h @@ -1150,6 +1150,13 @@ static inline void simcall_run_kernel__set__code(smx_simcall_t simcall, std::fun simgrid::simix::marshal const*>(simcall->args[0], arg); } +static inline std::function const* simcall_run_blocking__get__code(smx_simcall_t simcall) { + return simgrid::simix::unmarshal const*>(simcall->args[0]); +} +static inline void simcall_run_blocking__set__code(smx_simcall_t simcall, std::function const* arg) { + simgrid::simix::marshal const*>(simcall->args[0], arg); +} + /* The prototype of all simcall handlers, automatically generated for you */ XBT_PRIVATE void simcall_HANDLER_vm_suspend(smx_simcall_t simcall, sg_host_t ind_vm); diff --git a/src/simix/popping_bodies.cpp b/src/simix/popping_bodies.cpp index 74b4dcfec0..ba83d704e2 100644 --- a/src/simix/popping_bodies.cpp +++ b/src/simix/popping_bodies.cpp @@ -441,4 +441,10 @@ inline static void simcall_BODY_run_kernel(std::function const* code) { /* Go to that function to follow the code flow through the simcall barrier */ if (0) SIMIX_run_kernel(code); return simcall const*>(SIMCALL_RUN_KERNEL, code); + } + +inline static void simcall_BODY_run_blocking(std::function const* code) { + /* Go to that function to follow the code flow through the simcall barrier */ + if (0) SIMIX_run_blocking(code); + return simcall const*>(SIMCALL_RUN_BLOCKING, code); }/** @endcond */ diff --git a/src/simix/popping_enum.h b/src/simix/popping_enum.h index f7590f898d..4f631eb282 100644 --- a/src/simix/popping_enum.h +++ b/src/simix/popping_enum.h @@ -85,5 +85,6 @@ typedef enum { SIMCALL_ASR_GET_PROPERTIES, SIMCALL_MC_RANDOM, SIMCALL_SET_CATEGORY, - SIMCALL_RUN_KERNEL, NUM_SIMCALLS + SIMCALL_RUN_KERNEL, + SIMCALL_RUN_BLOCKING, NUM_SIMCALLS } e_smx_simcall_t; diff --git a/src/simix/popping_generated.cpp b/src/simix/popping_generated.cpp index f7b5259cc1..1ba87540ed 100644 --- a/src/simix/popping_generated.cpp +++ b/src/simix/popping_generated.cpp @@ -90,7 +90,8 @@ const char* simcall_names[] = { "SIMCALL_ASR_GET_PROPERTIES", "SIMCALL_MC_RANDOM", "SIMCALL_SET_CATEGORY", - "SIMCALL_RUN_KERNEL",}; + "SIMCALL_RUN_KERNEL", + "SIMCALL_RUN_BLOCKING",}; /** @private * @brief (in kernel mode) unpack the simcall and activate the handler @@ -423,6 +424,10 @@ case SIMCALL_RUN_KERNEL: SIMIX_run_kernel(simgrid::simix::unmarshal const*>(simcall->args[0])); SIMIX_simcall_answer(simcall); break; + +case SIMCALL_RUN_BLOCKING: + SIMIX_run_blocking(simgrid::simix::unmarshal const*>(simcall->args[0])); + break; case NUM_SIMCALLS: break; case SIMCALL_NONE: diff --git a/src/simix/popping_private.h b/src/simix/popping_private.h index 3c33b23087..d6a3a57115 100644 --- a/src/simix/popping_private.h +++ b/src/simix/popping_private.h @@ -60,6 +60,7 @@ XBT_PRIVATE void SIMIX_simcall_handle(smx_simcall_t simcall, int value); XBT_PRIVATE void SIMIX_simcall_exit(smx_synchro_t synchro); XBT_PRIVATE const char *SIMIX_simcall_name(e_smx_simcall_t kind); XBT_PRIVATE void SIMIX_run_kernel(std::function const* code); +XBT_PRIVATE void SIMIX_run_blocking(std::function const* code); SG_END_DECL() diff --git a/src/simix/simcalls.in b/src/simix/simcalls.in index c8cf97346f..55858b21f1 100644 --- a/src/simix/simcalls.in +++ b/src/simix/simcalls.in @@ -116,3 +116,4 @@ int mc_random(int min, int max); void set_category(smx_synchro_t synchro, const char* category) [[nohandler]]; void run_kernel(std::function const* code) [[nohandler]]; +void run_blocking(std::function const* code) [[block,nohandler]]; diff --git a/src/simix/smx_global.cpp b/src/simix/smx_global.cpp index 7643bddd11..ab8d5956ad 100644 --- a/src/simix/smx_global.cpp +++ b/src/simix/smx_global.cpp @@ -50,10 +50,10 @@ static xbt_heap_t simix_timers = nullptr; /** @brief Timer datatype */ typedef struct s_smx_timer { double date = 0.0; - std::function callback; + std::packaged_task callback; s_smx_timer() {} - s_smx_timer(double date, std::function callback) + s_smx_timer(double date, std::packaged_task callback) : date(date), callback(std::move(callback)) {} } s_smx_timer_t; @@ -474,13 +474,11 @@ void SIMIX_run(void) //FIXME: make the timers being real callbacks // (i.e. provide dispatchers that read and expand the args) timer = (smx_timer_t) xbt_heap_pop(simix_timers); - if (timer->callback) { - try { - timer->callback(); - } - catch(...) { - xbt_die("Exception throwed ouf of timer callback"); - } + try { + timer->callback(); + } + catch(...) { + xbt_die("Exception throwed ouf of timer callback"); } delete timer; } @@ -539,12 +537,13 @@ void SIMIX_run(void) */ smx_timer_t SIMIX_timer_set(double date, void (*function)(void*), void *arg) { - smx_timer_t timer = new s_smx_timer_t(date, std::bind(function, arg)); + smx_timer_t timer = new s_smx_timer_t(date, + std::packaged_task(std::bind(function, arg))); xbt_heap_push(simix_timers, timer, date); return timer; } -smx_timer_t SIMIX_timer_set(double date, std::function callback) +smx_timer_t SIMIX_timer_set(double date, std::packaged_task callback) { smx_timer_t timer = new s_smx_timer_t(date, std::move(callback)); xbt_heap_push(simix_timers, timer, date); diff --git a/teshsuite/simix/CMakeLists.txt b/teshsuite/simix/CMakeLists.txt index 980b9e2950..0fbec1dd02 100644 --- a/teshsuite/simix/CMakeLists.txt +++ b/teshsuite/simix/CMakeLists.txt @@ -6,12 +6,20 @@ foreach(x check_defaults stack_overflow) set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.c) endforeach() +foreach(x generic_simcalls) + add_executable (${x} ${x}/${x}.cpp) + target_link_libraries(${x} simgrid) + set_target_properties(${x} PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${x}) + set(teshsuite_src ${teshsuite_src} ${CMAKE_CURRENT_SOURCE_DIR}/${x}/${x}.cpp) +endforeach() + foreach (factory raw thread boost ucontext) set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/check_defaults/factory_${factory}.tesh) endforeach() set(teshsuite_src ${teshsuite_src} PARENT_SCOPE) set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/stack_overflow/stack_overflow.tesh PARENT_SCOPE) +set(tesh_files ${tesh_files} ${CMAKE_CURRENT_SOURCE_DIR}/generic_simcalls/generic_simcalls.tesh PARENT_SCOPE) IF(HAVE_RAW_CONTEXTS) ADD_TESH(tesh-simix-factory-default --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/simix/check_defaults --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/simix/check_defaults factory_raw.tesh) @@ -25,6 +33,7 @@ ENDIF() if (NOT enable_memcheck) ADD_TESH_FACTORIES(stack-overflow "thread;ucontext;boost;raw" --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/simix/stack_overflow --setenv srcdir=${CMAKE_HOME_DIRECTORY} --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/simix/stack_overflow stack_overflow.tesh) +ADD_TESH_FACTORIES(generic-simcalls "thread;ucontext;boost;raw" --setenv bindir=${CMAKE_BINARY_DIR}/teshsuite/simix/generic_simcalls --setenv srcdir=${CMAKE_HOME_DIRECTORY} --cd ${CMAKE_HOME_DIRECTORY}/teshsuite/simix/generic_simcalls generic_simcalls.tesh) endif() foreach (factory raw thread boost ucontext) diff --git a/teshsuite/simix/generic_simcalls/generic_simcalls.cpp b/teshsuite/simix/generic_simcalls/generic_simcalls.cpp new file mode 100644 index 0000000000..f1743b585f --- /dev/null +++ b/teshsuite/simix/generic_simcalls/generic_simcalls.cpp @@ -0,0 +1,92 @@ +/* Copyright (c) 2016. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include + +#include + +#include +#include +#include +#include + +XBT_LOG_NEW_DEFAULT_CATEGORY(test, "my log messages"); + +namespace example { + +/** Execute the code in the kernel at some time + * + * @param date when we should execute the code + * @param code code to execute + * @return future with the result of the call + */ +template +auto kernel_defer(double date, F code) -> simgrid::kernel::Future +{ + typedef decltype(code()) T; + auto promise = std::make_shared>(); + auto future = promise->get_future(); + SIMIX_timer_set(date, [promise, code] { + simgrid::xbt::fulfillPromise(*promise, std::move(code)); + }); + return future; +} + +static int master(int argc, char *argv[]) +{ + // Test the simple immediate execution: + XBT_INFO("Start"); + simgrid::simix::kernel([] { + XBT_INFO("kernel"); + }); + XBT_INFO("kernel, returned"); + + // Synchronize on a successful Future: + simgrid::simix::blocking_simcall([&] { + return kernel_defer(10, [] { + XBT_INFO("blocking_simcall with void"); + }); + }); + XBT_INFO("blocking_simcall with void, returned"); + + // Synchronize on a failing Future: + try { + simgrid::simix::blocking_simcall([&] { + return kernel_defer(20, [] { + throw std::runtime_error("Exception throwed from kernel_defer"); + }); + }); + XBT_ERROR("No exception caught!"); + } + catch(std::runtime_error& e) { + XBT_INFO("Exception caught: %s", e.what()); + } + + // Synchronize on a successul Future and get the value: + int res = simgrid::simix::blocking_simcall([&] { + return kernel_defer(30, [] { + XBT_INFO("blocking_simcall with value"); + return 42; + }); + }); + XBT_INFO("blocking_simcall with value returned with %i", res); + + return 0; +} + +} + +int main(int argc, char *argv[]) +{ + SIMIX_global_init(&argc, argv); + xbt_assert(argc == 2, "Usage: %s platform.xml\n", argv[0]); + SIMIX_function_register("master", example::master); + SIMIX_create_environment(argv[1]); + simcall_process_create("master", example::master, NULL, "Tremblay", -1, 0, NULL, NULL, 0); + SIMIX_run(); + return 0; +} diff --git a/teshsuite/simix/generic_simcalls/generic_simcalls.tesh b/teshsuite/simix/generic_simcalls/generic_simcalls.tesh new file mode 100644 index 0000000000..1c10ecb064 --- /dev/null +++ b/teshsuite/simix/generic_simcalls/generic_simcalls.tesh @@ -0,0 +1,9 @@ +$ ${bindir:=.}/generic_simcalls --cfg=contexts/stack-size:96 ${srcdir:=.}/examples/platforms/small_platform.xml +> [Tremblay:master:(0) 0.000000] [test/INFO] Start +> [0.000000] [test/INFO] kernel +> [Tremblay:master:(0) 0.000000] [test/INFO] kernel, returned +> [10.000000] [test/INFO] blocking_simcall with void +> [Tremblay:master:(0) 10.000000] [test/INFO] blocking_simcall with void, returned +> [Tremblay:master:(0) 20.000000] [test/INFO] Exception caught: Exception throwed from kernel_defer +> [30.000000] [test/INFO] blocking_simcall with value +> [Tremblay:master:(0) 30.000000] [test/INFO] blocking_simcall with value returned with 42 diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 1416671408..d46f4b2d29 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -634,6 +634,8 @@ set(headers_to_install include/simgrid/forward.h include/simgrid/simix.h include/simgrid/simix.hpp + include/simgrid/simix/sync.hpp + include/simgrid/kernel/future.hpp include/simgrid/host.h include/simgrid/link.h include/simgrid/s4u/forward.hpp