From c6eb984ee08e3c7eec012130fbad9c875862e7cc Mon Sep 17 00:00:00 2001 From: Arnaud Giersch Date: Tue, 24 Oct 2017 14:43:04 +0200 Subject: [PATCH] ContextThread: define {Serial,Parallel}ThreadContext, and reorganize. --- src/kernel/context/ContextThread.cpp | 143 ++++++++++++++++----------- src/kernel/context/ContextThread.hpp | 71 ++++++++++--- 2 files changed, 145 insertions(+), 69 deletions(-) diff --git a/src/kernel/context/ContextThread.cpp b/src/kernel/context/ContextThread.cpp index 8be0edf754..8679b200dc 100644 --- a/src/kernel/context/ContextThread.cpp +++ b/src/kernel/context/ContextThread.cpp @@ -17,77 +17,46 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context); -static xbt_os_sem_t smx_ctx_thread_sem = nullptr; - namespace simgrid { namespace kernel { namespace context { -XBT_PRIVATE ContextFactory* thread_factory() -{ - XBT_VERB("Activating thread context factory"); - return new ThreadContextFactory(); -} +// ThreadContextFactory ThreadContextFactory::ThreadContextFactory() - : ContextFactory("ThreadContextFactory") + : ContextFactory("ThreadContextFactory"), parallel_(SIMIX_context_is_parallel()) { - if (SIMIX_context_is_parallel()) { - smx_ctx_thread_sem = xbt_os_sem_init(SIMIX_context_get_nthreads()); - } else { - smx_ctx_thread_sem = nullptr; - } + if (parallel_) + ParallelThreadContext::initialize(); } ThreadContextFactory::~ThreadContextFactory() { - if (smx_ctx_thread_sem) { - xbt_os_sem_destroy(smx_ctx_thread_sem); - smx_ctx_thread_sem = nullptr; - } + if (parallel_) + ParallelThreadContext::finalize(); } -ThreadContext* ThreadContextFactory::create_context( - std::function code, - void_pfn_smxprocess_t cleanup, smx_actor_t process) +ThreadContext* ThreadContextFactory::create_context(std::function code, void_pfn_smxprocess_t cleanup, + smx_actor_t process, bool maestro) { - return this->new_context(std::move(code), cleanup, process, not code); + if (parallel_) + return this->new_context(std::move(code), cleanup, process, maestro); + else + return this->new_context(std::move(code), cleanup, process, maestro); } void ThreadContextFactory::run_all() { - if (smx_ctx_thread_sem == nullptr) { - // Serial execution - for (smx_actor_t const& process : simix_global->process_to_run) { - XBT_DEBUG("Handling %p",process); - ThreadContext* context = static_cast(process->context); - xbt_os_sem_release(context->begin_); - xbt_os_sem_acquire(context->end_); - } - } else { + if (parallel_) { // Parallel execution - for (smx_actor_t const& process : simix_global->process_to_run) - xbt_os_sem_release(static_cast(process->context)->begin_); - for (smx_actor_t const& process : simix_global->process_to_run) - xbt_os_sem_acquire(static_cast(process->context)->end_); + ParallelThreadContext::run_all(); + } else { + // Serial execution + SerialThreadContext::run_all(); } } -ThreadContext* ThreadContextFactory::self() -{ - return static_cast(xbt_os_thread_get_extra_data()); -} - -ThreadContext* ThreadContextFactory::attach(void_pfn_smxprocess_t cleanup_func, smx_actor_t process) -{ - return this->new_context( - std::function(), cleanup_func, process, false); -} - -ThreadContext* ThreadContextFactory::create_maestro(std::function code, smx_actor_t process) -{ - return this->new_context(std::move(code), nullptr, process, true); -} +// ThreadContext ThreadContext::ThreadContext(std::function code, void_pfn_smxprocess_t cleanup, smx_actor_t process, bool maestro) @@ -147,11 +116,11 @@ void *ThreadContext::wrapper(void *param) try { (*context)(); - if (not context->is_maestro_) // really? + if (not context->isMaestro()) // really? context->Context::stop(); } catch (StopRequest const&) { XBT_DEBUG("Caught a StopRequest"); - xbt_assert(not context->is_maestro_, "I'm not supposed to be maestro here."); + xbt_assert(not context->isMaestro(), "I'm not supposed to be maestro here."); } // Signal to the caller (normally the maestro) that we have finished: @@ -164,17 +133,25 @@ void *ThreadContext::wrapper(void *param) return nullptr; } +void ThreadContext::release() +{ + xbt_os_sem_release(this->begin_); +} + +void ThreadContext::wait() +{ + xbt_os_sem_acquire(this->end_); +} + void ThreadContext::start() { xbt_os_sem_acquire(this->begin_); - if (not is_maestro_ && smx_ctx_thread_sem) /* parallel run */ - xbt_os_sem_acquire(smx_ctx_thread_sem); + this->start_hook(); } void ThreadContext::yield() { - if (not is_maestro_ && smx_ctx_thread_sem) /* parallel run */ - xbt_os_sem_release(smx_ctx_thread_sem); + this->yield_hook(); xbt_os_sem_release(this->end_); } @@ -195,13 +172,13 @@ void ThreadContext::attach_start() // We're breaking the layers here by depending on the upper layer: ThreadContext* maestro = (ThreadContext*) simix_global->maestro_process->context; xbt_os_sem_release(maestro->begin_); - xbt_assert(not this->is_maestro_); + xbt_assert(not this->isMaestro()); this->start(); } void ThreadContext::attach_stop() { - xbt_assert(not this->is_maestro_); + xbt_assert(not this->isMaestro()); this->yield(); ThreadContext* maestro = (ThreadContext*) simix_global->maestro_process->context; @@ -210,4 +187,56 @@ void ThreadContext::attach_stop() xbt_os_thread_set_extra_data(nullptr); } +// SerialThreadContext + +void SerialThreadContext::run_all() +{ + for (smx_actor_t const& process : simix_global->process_to_run) { + XBT_DEBUG("Handling %p", process); + ThreadContext* context = static_cast(process->context); + context->release(); + context->wait(); + } +} + +// ParallelThreadContext + +xbt_os_sem_t ParallelThreadContext::thread_sem_ = nullptr; + +void ParallelThreadContext::initialize() +{ + thread_sem_ = xbt_os_sem_init(SIMIX_context_get_nthreads()); +} + +void ParallelThreadContext::finalize() +{ + xbt_os_sem_destroy(thread_sem_); + thread_sem_ = nullptr; +} + +void ParallelThreadContext::run_all() +{ + for (smx_actor_t const& process : simix_global->process_to_run) + static_cast(process->context)->release(); + for (smx_actor_t const& process : simix_global->process_to_run) + static_cast(process->context)->wait(); +} + +void ParallelThreadContext::start_hook() +{ + if (not isMaestro()) /* parallel run */ + xbt_os_sem_acquire(thread_sem_); +} + +void ParallelThreadContext::yield_hook() +{ + if (not isMaestro()) /* parallel run */ + xbt_os_sem_release(thread_sem_); +} + +XBT_PRIVATE ContextFactory* thread_factory() +{ + XBT_VERB("Activating thread context factory"); + return new ThreadContextFactory(); +} }}} // namespace diff --git a/src/kernel/context/ContextThread.hpp b/src/kernel/context/ContextThread.hpp index 7ce8de5fc6..f47f59f3af 100644 --- a/src/kernel/context/ContextThread.hpp +++ b/src/kernel/context/ContextThread.hpp @@ -16,18 +16,19 @@ namespace simgrid { namespace kernel { namespace context { -class ThreadContext; -class ThreadContextFactory; - class ThreadContext : public AttachContext { public: - friend ThreadContextFactory; ThreadContext(std::function code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process, bool maestro); ~ThreadContext() override; void stop() override; void suspend() override; void attach_start() override; void attach_stop() override; + + bool isMaestro() const { return is_maestro_; } + void release(); // unblock context's start() + void wait(); // wait for context's yield() + private: /** A portable thread */ xbt_os_thread_t thread_ = nullptr; @@ -37,26 +38,72 @@ private: xbt_os_sem_t end_ = nullptr; bool is_maestro_; + void start(); // match a call to release() + void yield(); // match a call to yield() + virtual void start_hook() {} // called after start() + virtual void yield_hook() {} // called before yield() + static void* wrapper(void *param); +}; + +class SerialThreadContext : public ThreadContext { +public: + SerialThreadContext(std::function code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process, bool maestro) + : ThreadContext(std::move(code), cleanup_func, process, maestro) + { + } + + static void run_all(); +}; + +class ParallelThreadContext : public ThreadContext { public: - void start(); - void yield(); + ParallelThreadContext(std::function code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process, + bool maestro) + : ThreadContext(std::move(code), cleanup_func, process, maestro) + { + } + + static void initialize(); + static void finalize(); + static void run_all(); + +private: + static xbt_os_sem_t thread_sem_; + + void start_hook() override; + void yield_hook() override; }; class ThreadContextFactory : public ContextFactory { public: ThreadContextFactory(); ~ThreadContextFactory() override; - ThreadContext* create_context(std::function code, - void_pfn_smxprocess_t cleanup_func, smx_actor_t process) override; + ThreadContext* create_context(std::function code, void_pfn_smxprocess_t cleanup_func, + smx_actor_t process) override + { + bool maestro = not code; + return create_context(std::move(code), cleanup_func, process, maestro); + } void run_all() override; - ThreadContext* self() override; + ThreadContext* self() override { return static_cast(xbt_os_thread_get_extra_data()); } // Optional methods: - ThreadContext* attach(void_pfn_smxprocess_t cleanup_func, smx_actor_t process) override; - ThreadContext* create_maestro(std::function code, smx_actor_t process) override; -}; + ThreadContext* attach(void_pfn_smxprocess_t cleanup_func, smx_actor_t process) override + { + return create_context(std::function(), cleanup_func, process, false); + } + ThreadContext* create_maestro(std::function code, smx_actor_t process) override + { + return create_context(std::move(code), nullptr, process, true); + } + +private: + bool parallel_; + ThreadContext* create_context(std::function code, void_pfn_smxprocess_t cleanup_func, smx_actor_t process, + bool maestro); +}; }}} // namespace #endif -- 2.20.1