From 3fe150481a6802850ebb47a7f4a1b77c53f2cd99 Mon Sep 17 00:00:00 2001 From: Gabriel Corona Date: Thu, 18 Jun 2015 14:23:01 +0200 Subject: [PATCH] Parallel implementation for Boost.Context context This is really a copy of the code of smx_context_sysv.c. (The same logic is duplicated without mushc difference between most implementations.) --- src/simix/smx_context_boost.cpp | 101 ++++++++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 5 deletions(-) diff --git a/src/simix/smx_context_boost.cpp b/src/simix/smx_context_boost.cpp index 6dbcac6c19..71d34f1872 100644 --- a/src/simix/smx_context_boost.cpp +++ b/src/simix/smx_context_boost.cpp @@ -35,8 +35,21 @@ static void smx_ctx_boost_suspend_serial(smx_context_t context); static void smx_ctx_boost_resume_serial(smx_process_t first_process); static void smx_ctx_boost_runall_serial(void); +#ifdef CONTEXT_THREADS +static void smx_ctx_boost_stop_parallel(smx_context_t context); +static void smx_ctx_boost_suspend_parallel(smx_context_t context); +static void smx_ctx_boost_resume_parallel(smx_process_t first_process); +static void smx_ctx_boost_runall_parallel(void); +#endif + +#ifdef CONTEXT_THREADS +static xbt_parmap_t boost_parmap; +static smx_ctx_boost_t* boost_workers_context; +static unsigned long boost_threads_working; +static xbt_os_thread_key_t boost_worker_id_key; +#endif + static unsigned long boost_process_index = 0; -static boost::context::fcontext_t boost_maestro_fcontext; static smx_ctx_boost_t boost_maestro_context; void SIMIX_ctx_boost_factory_init(smx_context_factory_t *factory) @@ -51,7 +64,20 @@ void SIMIX_ctx_boost_factory_init(smx_context_factory_t *factory) (*factory)->name = "smx_boost_context_factory"; if (SIMIX_context_is_parallel()) { +#ifndef CONTEXT_THREADS THROWF(arg_error, 0, "No thread support for parallel context execution"); +#else + int nthreads = SIMIX_context_get_nthreads(); + boost_parmap = xbt_parmap_new(nthreads, SIMIX_context_get_parallel_mode()); + boost_workers_context = xbt_new(smx_ctx_boost_t, nthreads); + boost_maestro_context = NULL; + + xbt_os_thread_key_create(&boost_worker_id_key); + + (*factory)->stop = smx_ctx_boost_stop_parallel; + (*factory)->suspend = smx_ctx_boost_suspend_parallel; + (*factory)->runall = smx_ctx_boost_runall_parallel; +#endif } else { (*factory)->stop = smx_ctx_boost_stop_serial; (*factory)->suspend = smx_ctx_boost_suspend_serial; @@ -63,6 +89,14 @@ void SIMIX_ctx_boost_factory_init(smx_context_factory_t *factory) static int smx_ctx_boost_factory_finalize(smx_context_factory_t *factory) { +#ifdef CONTEXT_THREADS + if (boost_parmap) { + xbt_parmap_destroy(boost_parmap); + boost_parmap = nullptr; + } + xbt_free(boost_workers_context); + boost_workers_context = nullptr; +#endif return smx_ctx_base_factory_finalize(factory); } @@ -90,14 +124,15 @@ smx_ctx_boost_create_context(xbt_main_func_t code, int argc, char **argv, #else void* stack = context->stack; #endif - context->fc = boost::context::make_fcontext( + context->fc = boost::context::make_fcontext( stack, smx_context_usable_stack_size, smx_ctx_boost_wrapper); - } else if (process != nullptr && boost_maestro_context == nullptr) { + } else { context->stack = nullptr; - context->fc = &boost_maestro_fcontext; - boost_maestro_context = context; + context->fc = new boost::context::fcontext_t(); + if (boost_maestro_context == nullptr) + boost_maestro_context = context; } return (smx_context_t) context; @@ -108,6 +143,8 @@ static void smx_ctx_boost_free(smx_context_t c) smx_ctx_boost_t context = (smx_ctx_boost_t) c; if (!context) return; + if (!context->stack) + delete context->fc; if ((smx_ctx_boost_t) c == boost_maestro_context) boost_maestro_context = nullptr; SIMIX_context_stack_delete(context->stack); @@ -166,3 +203,57 @@ static void smx_ctx_boost_runall_serial(void) /* execute the first process */ smx_ctx_boost_resume_serial(first_process); } + +// **** Parallel code + +#ifdef CONTEXT_THREADS + +static void smx_ctx_boost_stop_parallel(smx_context_t context) +{ + smx_ctx_base_stop(context); + smx_ctx_boost_suspend_parallel(context); +} + +static void smx_ctx_boost_suspend_parallel(smx_context_t context) +{ + smx_process_t next_work = (smx_process_t) xbt_parmap_next(boost_parmap); + smx_ctx_boost_t next_context; + + if (next_work != NULL) { + XBT_DEBUG("Run next process"); + next_context = (smx_ctx_boost_t) next_work->context; + } + else { + XBT_DEBUG("No more processes to run"); + unsigned long worker_id = + (unsigned long) xbt_os_thread_get_specific(boost_worker_id_key); + next_context = boost_workers_context[worker_id]; + } + + SIMIX_context_set_current((smx_context_t) next_context); + boost::context::jump_fcontext( + ((smx_ctx_boost_t)context)->fc, next_context->fc, (intptr_t)next_context); +} + +static void smx_ctx_boost_resume_parallel(smx_process_t process) +{ + unsigned long worker_id = __sync_fetch_and_add(&boost_threads_working, 1); + xbt_os_thread_set_specific(boost_worker_id_key, (void*) worker_id); + + smx_ctx_boost_t worker_context = (smx_ctx_boost_t)SIMIX_context_self(); + boost_workers_context[worker_id] = worker_context; + smx_ctx_boost_t context = (smx_ctx_boost_t) process->context; + + SIMIX_context_set_current((smx_context_t) context); + boost::context::jump_fcontext(worker_context->fc, context->fc, + (intptr_t)context); +} + +static void smx_ctx_boost_runall_parallel(void) +{ + boost_threads_working = 0; + xbt_parmap_apply(boost_parmap, (void_f_pvoid_t) smx_ctx_boost_resume_parallel, + simix_global->process_to_run); +} + +#endif -- 2.20.1