From 866550c0a0ea154753da12952a154cb71dfb8264 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Christophe=20Thi=C3=A9ry?= Date: Wed, 7 Dec 2011 16:34:28 +0100 Subject: [PATCH] Remove xbt_parmap_get_worker_id to simplify the parmap interface --- include/xbt/parmap.h | 11 +++++------ src/simix/smx_context_raw.c | 15 ++++++++++++--- src/simix/smx_context_sysv.c | 25 +++++++++++++++---------- src/xbt/parmap.c | 10 ---------- 4 files changed, 32 insertions(+), 29 deletions(-) diff --git a/include/xbt/parmap.h b/include/xbt/parmap.h index 94125959b4..172ef6be61 100644 --- a/include/xbt/parmap.h +++ b/include/xbt/parmap.h @@ -15,8 +15,8 @@ SG_BEGIN_DECL() -/** @addtogroup XBT_parmap - * @brief Parallel map. +/** \addtogroup XBT_parmap + * \brief Parallel map. * * A function is applied to the n first elements of a dynar in parallel, * where n is the number of threads. The threads are persistent until the @@ -25,10 +25,10 @@ SG_BEGIN_DECL() * If there are more than n elements in the dynar, the worker threads should * fetch themselves remaining work with xbt_parmap_next() and execute it. * - * @{ + * \{ */ - /** \brief Queue data type (opaque type) */ +/** \brief Parallel map data type (opaque type) */ typedef struct s_xbt_parmap *xbt_parmap_t; XBT_PUBLIC(xbt_parmap_t) xbt_parmap_new(unsigned int num_workers); @@ -38,9 +38,8 @@ XBT_PUBLIC(void) xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data); XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap); -XBT_PUBLIC(unsigned long) xbt_parmap_get_worker_id(xbt_parmap_t parmap); -/** @} */ +/** \} */ SG_END_DECL() diff --git a/src/simix/smx_context_raw.c b/src/simix/smx_context_raw.c index c5aec4a9bd..432c972873 100644 --- a/src/simix/smx_context_raw.c +++ b/src/simix/smx_context_raw.c @@ -30,7 +30,9 @@ typedef struct s_smx_ctx_raw { #ifdef CONTEXT_THREADS static xbt_parmap_t raw_parmap; -static raw_stack_t* raw_workers_stacks; /* space to save the worker stack in each thread */ +static raw_stack_t* raw_workers_stacks; /* space to save the worker stack in each thread */ +static unsigned long raw_threads_working; /* number of threads that have started their work */ +static xbt_os_thread_key_t raw_worker_id_key; /* thread-specific storage for the thread id */ #endif static unsigned long raw_process_index = 0; /* index of the next process to run in the @@ -227,6 +229,7 @@ void SIMIX_ctx_raw_factory_init(smx_context_factory_t *factory) int nthreads = SIMIX_context_get_nthreads(); raw_parmap = xbt_parmap_new(nthreads); raw_workers_stacks = xbt_new(raw_stack_t, nthreads); + xbt_os_thread_key_create(&raw_worker_id_key); #endif if (SIMIX_context_get_parallel_threshold() > 1) { /* choose dynamically */ @@ -463,7 +466,10 @@ static void smx_ctx_raw_suspend_parallel(smx_context_t context) /* all processes were run, go to the barrier */ XBT_DEBUG("No more processes to run"); next_context = (smx_context_t) raw_maestro_context; - unsigned long worker_id = xbt_parmap_get_worker_id(raw_parmap); + unsigned long worker_id = + (unsigned long) xbt_os_thread_get_specific(raw_worker_id_key); + XBT_DEBUG("Restoring worker stack %lu (working threads = %lu)", + worker_id, raw_threads_working); next_stack = raw_workers_stacks[worker_id]; } @@ -475,7 +481,9 @@ static void smx_ctx_raw_suspend_parallel(smx_context_t context) static void smx_ctx_raw_resume_parallel(smx_process_t first_process) { #ifdef CONTEXT_THREADS - unsigned long worker_id = xbt_parmap_get_worker_id(raw_parmap); + unsigned long worker_id = __sync_fetch_and_add(&raw_threads_working, 1); + xbt_os_thread_set_specific(raw_worker_id_key, (void*) worker_id); + XBT_DEBUG("Saving worker stack %lu", worker_id); raw_stack_t* worker_stack = &raw_workers_stacks[worker_id]; smx_context_t context = first_process->context; @@ -487,6 +495,7 @@ static void smx_ctx_raw_resume_parallel(smx_process_t first_process) static void smx_ctx_raw_runall_parallel(void) { #ifdef CONTEXT_THREADS + raw_threads_working = 0; xbt_parmap_apply(raw_parmap, (void_f_pvoid_t) smx_ctx_raw_resume_parallel, simix_global->process_to_run); #endif diff --git a/src/simix/smx_context_sysv.c b/src/simix/smx_context_sysv.c index ffd96262cc..7f2c5c34c7 100644 --- a/src/simix/smx_context_sysv.c +++ b/src/simix/smx_context_sysv.c @@ -36,10 +36,12 @@ typedef struct s_smx_ctx_sysv { #ifdef CONTEXT_THREADS static xbt_parmap_t sysv_parmap; -static ucontext_t* sysv_workers_stacks; /* space to save the worker's stack in each thread */ +static ucontext_t* sysv_workers_stacks; /* space to save the worker's stack in each thread */ +static unsigned long sysv_threads_working; /* number of threads that have started their work */ +static xbt_os_thread_key_t sysv_worker_id_key; /* thread-specific storage for the thread id */ #endif -static unsigned long sysv_process_index = 0; /* index of the next process to run in the - * list of runnable processes */ +static unsigned long sysv_process_index = 0; /* index of the next process to run in the + * list of runnable processes */ static smx_ctx_sysv_t sysv_maestro_context; static int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory); @@ -50,7 +52,6 @@ smx_ctx_sysv_create_context_sized(size_t structure_size, void_pfn_smxprocess_t cleanup_func, void *data); static void smx_ctx_sysv_free(smx_context_t context); -static int smx_ctx_sysv_get_thread_id(void); static smx_context_t smx_ctx_sysv_self_parallel(void); static smx_context_t smx_ctx_sysv_create_context(xbt_main_func_t code, int argc, char **argv, @@ -99,6 +100,7 @@ void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory) int nthreads = SIMIX_context_get_nthreads(); sysv_parmap = xbt_parmap_new(nthreads); sysv_workers_stacks = xbt_new(ucontext_t, nthreads); + xbt_os_thread_key_create(&sysv_worker_id_key); (*factory)->stop = smx_ctx_sysv_stop_parallel; (*factory)->suspend = smx_ctx_sysv_suspend_parallel; (*factory)->runall = smx_ctx_sysv_runall_parallel; @@ -273,31 +275,33 @@ static void smx_ctx_sysv_suspend_parallel(smx_context_t context) /* determine the next context */ smx_process_t next_work = xbt_parmap_next(sysv_parmap); smx_context_t next_context; - ucontext_t* next_uc; + ucontext_t* next_stack; if (next_work != NULL) { /* there is a next process to resume */ XBT_DEBUG("Run next process"); next_context = next_work->context; - next_uc = &((smx_ctx_sysv_t) next_context)->uc; + next_stack = &((smx_ctx_sysv_t) next_context)->uc; } else { /* all processes were run, go to the barrier */ XBT_DEBUG("No more processes to run"); next_context = (smx_context_t) sysv_maestro_context; - unsigned long worker_id = xbt_parmap_get_worker_id(sysv_parmap); - next_uc = &sysv_workers_stacks[worker_id]; + unsigned long worker_id = + (unsigned long) xbt_os_thread_get_specific(sysv_worker_id_key); + next_stack = &sysv_workers_stacks[worker_id]; } SIMIX_context_set_current(next_context); - swapcontext(&((smx_ctx_sysv_t) context)->uc, next_uc); + swapcontext(&((smx_ctx_sysv_t) context)->uc, next_stack); #endif } static void smx_ctx_sysv_resume_parallel(smx_process_t first_process) { #ifdef CONTEXT_THREADS - unsigned long worker_id = xbt_parmap_get_worker_id(sysv_parmap); + unsigned long worker_id = __sync_fetch_and_add(&sysv_threads_working, 1); + xbt_os_thread_set_specific(sysv_worker_id_key, (void*) worker_id); ucontext_t* worker_stack = &sysv_workers_stacks[worker_id]; smx_context_t context = first_process->context; @@ -309,6 +313,7 @@ static void smx_ctx_sysv_resume_parallel(smx_process_t first_process) static void smx_ctx_sysv_runall_parallel(void) { #ifdef CONTEXT_THREADS + sysv_threads_working = 0; xbt_parmap_apply(sysv_parmap, (void_f_pvoid_t) smx_ctx_sysv_resume_parallel, simix_global->process_to_run); #endif diff --git a/src/xbt/parmap.c b/src/xbt/parmap.c index c40fa0d941..7bb119be4f 100644 --- a/src/xbt/parmap.c +++ b/src/xbt/parmap.c @@ -126,16 +126,6 @@ void* xbt_parmap_next(xbt_parmap_t parmap) return NULL; } -/** - * \brief Returns the worker id of the current thread. - * \param parmap a parmap - * \return the worker id - */ -unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap) -{ - return (unsigned long) xbt_os_thread_get_extra_data(); -} - /** * \brief Main function of a worker thread. * \param arg the parmap -- 2.20.1