From: Christophe ThiƩry Date: Wed, 2 Nov 2011 11:10:40 +0000 (+0100) Subject: Direct context switching: clean the semantics of parmap X-Git-Tag: exp_20120216~528^2~6 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/f57edc1d5b4f497883b451fc85d2d653d27a8247 Direct context switching: clean the semantics of parmap --- diff --git a/include/xbt/parmap.h b/include/xbt/parmap.h index f661994a00..94125959b4 100644 --- a/include/xbt/parmap.h +++ b/include/xbt/parmap.h @@ -18,9 +18,13 @@ SG_BEGIN_DECL() /** @addtogroup XBT_parmap * @brief Parallel map. * - * A function is applied to all the elements of a dynar in parallel - * using threads. The threads are persistent until the destruction - * of the parmap object. + * A function is applied to the n first elements of a dynar in parallel, + * where n is the number of threads. The threads are persistent until the + * destruction of the parmap object. + * + * If there are more than n elements in the dynar, the worker threads should + * fetch themselves remaining work with xbt_parmap_next() and execute it. + * * @{ */ /** \brief Queue data type (opaque type) */ @@ -28,12 +32,13 @@ SG_BEGIN_DECL() typedef struct s_xbt_parmap *xbt_parmap_t; XBT_PUBLIC(xbt_parmap_t) xbt_parmap_new(unsigned int num_workers); +XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap); XBT_PUBLIC(void) xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data); - -XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap); +XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap); +XBT_PUBLIC(unsigned long) xbt_parmap_get_worker_id(xbt_parmap_t parmap); /** @} */ diff --git a/src/simix/smx_context_sysv.c b/src/simix/smx_context_sysv.c index 984be819d7..626c9ecfea 100644 --- a/src/simix/smx_context_sysv.c +++ b/src/simix/smx_context_sysv.c @@ -10,7 +10,6 @@ #include "smx_context_sysv_private.h" #include "xbt/parmap.h" -#include "xbt/parmap_private.h" #include "simix/private.h" #include "gras_config.h" @@ -28,10 +27,12 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context); #ifdef CONTEXT_THREADS static xbt_parmap_t parmap; +static ucontext_t* smx_ctx_sysv_local_maestro_uc; /* space to save maestro's stack in each thead */ #endif static unsigned long smx_ctx_sysv_process_index = 0; /* index of the next process to run in the - * list of runnable processes */ -static smx_ctx_sysv_t maestro_context; + * list of runnable processes */ +static smx_ctx_sysv_t smx_ctx_sysv_maestro_context; + static smx_context_t smx_ctx_sysv_create_context(xbt_main_func_t code, int argc, char **argv, @@ -41,13 +42,14 @@ static void smx_ctx_sysv_wrapper(int count, ...); static void smx_ctx_sysv_stop_serial(smx_context_t context); static void smx_ctx_sysv_suspend_serial(smx_context_t context); +static void smx_ctx_sysv_resume_serial(smx_process_t first_process); static void smx_ctx_sysv_runall_serial(void); static void smx_ctx_sysv_stop_parallel(smx_context_t context); static void smx_ctx_sysv_suspend_parallel(smx_context_t context); +static void smx_ctx_sysv_resume_parallel(smx_process_t first_process); static void smx_ctx_sysv_runall_parallel(void); -static void smx_ctx_sysv_resume(smx_process_t first_process); /* This is a bit paranoid about SIZEOF_VOIDP not being a multiple of SIZEOF_INT, * but it doesn't harm. */ @@ -77,7 +79,9 @@ void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory) if (SIMIX_context_is_parallel()) { #ifdef CONTEXT_THREADS /* To use parallel ucontexts a thread pool is needed */ - parmap = xbt_parmap_new(2); + int nthreads = SIMIX_context_get_nthreads(); + parmap = xbt_parmap_new(nthreads); + smx_ctx_sysv_local_maestro_uc = xbt_new(ucontext_t, nthreads); (*factory)->stop = smx_ctx_sysv_stop_parallel; (*factory)->suspend = smx_ctx_sysv_suspend_parallel; (*factory)->runall = smx_ctx_sysv_runall_parallel; @@ -97,6 +101,7 @@ int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory) #ifdef CONTEXT_THREADS if (parmap) xbt_parmap_destroy(parmap); + xbt_free(smx_ctx_sysv_local_maestro_uc); #endif return smx_ctx_base_factory_finalize(factory); } @@ -143,7 +148,7 @@ smx_ctx_sysv_create_context_sized(size_t size, xbt_main_func_t code, makecontext(&context->uc, (void (*)())smx_ctx_sysv_wrapper, CTX_ADDR_LEN, CTX_ADDR_SPLIT(ctx_addr)); } else { - maestro_context = context; + smx_ctx_sysv_maestro_context = context; } return (smx_context_t) context; @@ -208,28 +213,26 @@ static void smx_ctx_sysv_suspend_serial(smx_context_t context) unsigned long int i = smx_ctx_sysv_process_index++; if (i < xbt_dynar_length(simix_global->process_to_run)) { - XBT_DEBUG("Run next process"); - /* execute the next process */ + XBT_DEBUG("Run next process"); next_context = xbt_dynar_get_as( simix_global->process_to_run,i, smx_process_t)->context; } else { /* all processes were run, return to maestro */ XBT_DEBUG("No more process to run"); - - next_context = (smx_context_t) maestro_context; + next_context = (smx_context_t) smx_ctx_sysv_maestro_context; } SIMIX_context_set_current(next_context); swapcontext(&((smx_ctx_sysv_t) context)->uc, &((smx_ctx_sysv_t) next_context)->uc); } -static void smx_ctx_sysv_resume(smx_process_t first_process) +static void smx_ctx_sysv_resume_serial(smx_process_t first_process) { smx_context_t context = first_process->context; SIMIX_context_set_current(context); - swapcontext(&maestro_context->uc, + swapcontext(&smx_ctx_sysv_maestro_context->uc, &((smx_ctx_sysv_t) context)->uc); } @@ -241,7 +244,7 @@ static void smx_ctx_sysv_runall_serial(void) smx_ctx_sysv_process_index = 1; /* execute the first process */ - smx_ctx_sysv_resume(first_process); + smx_ctx_sysv_resume_serial(first_process); } } @@ -255,30 +258,45 @@ static void smx_ctx_sysv_suspend_parallel(smx_context_t context) { #ifdef CONTEXT_THREADS /* determine the next context */ + smx_process_t next_work = xbt_parmap_next(parmap); smx_context_t next_context; - unsigned long int i = __sync_fetch_and_add(&parmap->index, 1); + ucontext_t* next_uc; - if (i < xbt_dynar_length(simix_global->process_to_run)) { - /* execute the next process */ + if (next_work != NULL) { + /* there is a next process to resume */ XBT_DEBUG("Run next process"); - next_context = xbt_dynar_get_as(simix_global->process_to_run, i, smx_process_t)->context; + next_context = next_work->context; + next_uc = &((smx_ctx_sysv_t) next_context)->uc; } else { /* all processes were run, go to the barrier */ XBT_DEBUG("No more processes to run"); - next_context = (smx_context_t) maestro_context; + next_context = (smx_context_t) smx_ctx_sysv_maestro_context; + unsigned long worker_id = (unsigned long) xbt_os_thread_get_extra_data(); + next_uc = &smx_ctx_sysv_local_maestro_uc[worker_id]; } SIMIX_context_set_current(next_context); - swapcontext(&((smx_ctx_sysv_t) context)->uc, - &((smx_ctx_sysv_t) next_context)->uc); + swapcontext(&((smx_ctx_sysv_t) context)->uc, next_uc); +#endif +} + +static void smx_ctx_sysv_resume_parallel(smx_process_t first_process) +{ +#ifdef CONTEXT_THREADS + unsigned long worker_id = (unsigned long) xbt_os_thread_get_extra_data(); + ucontext_t* local_maestro_uc = &smx_ctx_sysv_local_maestro_uc[worker_id]; + + smx_context_t context = first_process->context; + SIMIX_context_set_current(context); + swapcontext(local_maestro_uc, &((smx_ctx_sysv_t) context)->uc); #endif } static void smx_ctx_sysv_runall_parallel(void) { #ifdef CONTEXT_THREADS - xbt_parmap_apply(parmap, (void_f_pvoid_t) smx_ctx_sysv_resume, + xbt_parmap_apply(parmap, (void_f_pvoid_t) smx_ctx_sysv_resume_parallel, simix_global->process_to_run); #endif } diff --git a/src/xbt/parmap.c b/src/xbt/parmap.c index dccc15d261..19dcb58027 100644 --- a/src/xbt/parmap.c +++ b/src/xbt/parmap.c @@ -55,7 +55,6 @@ xbt_parmap_t xbt_parmap_new(unsigned int num_workers) void xbt_parmap_destroy(xbt_parmap_t parmap) { - XBT_DEBUG("Destroy parmap %p", parmap); parmap->status = PARMAP_DESTROY; #ifdef HAVE_FUTEX_H xbt_event_signal(parmap->sync_event); @@ -76,14 +75,27 @@ void xbt_parmap_destroy(xbt_parmap_t parmap) XBT_DEBUG("Job done"); } +void* xbt_parmap_next(xbt_parmap_t parmap) { + + unsigned int index = __sync_fetch_and_add(&parmap->index, 1); + if (index < xbt_dynar_length(parmap->data)) { + return xbt_dynar_get_as(parmap->data, index, void*); + } + return NULL; +} + +unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap) { + return (unsigned long) xbt_os_thread_get_extra_data(); +} + static void *_xbt_parmap_worker_main(void *arg) { unsigned int worker_id; - xbt_parmap_t parmap = (xbt_parmap_t)arg; + xbt_parmap_t parmap = (xbt_parmap_t) arg; /* Fetch a worker id */ worker_id = __sync_fetch_and_add(&parmap->workers_max_id, 1); - xbt_os_thread_set_extra_data((void *)(unsigned long)worker_id); + xbt_os_thread_set_extra_data((void*) (unsigned long) worker_id); XBT_DEBUG("New worker thread created (%u)", worker_id); @@ -96,9 +108,9 @@ static void *_xbt_parmap_worker_main(void *arg) XBT_DEBUG("Worker %u got a job", worker_id); - unsigned int i = __sync_fetch_and_add(&parmap->index, 1); - if (i < xbt_dynar_length(parmap->data)) { - parmap->fun(xbt_dynar_get_as(parmap->data, i, void*)); + void* work = xbt_parmap_next(parmap); + if (work != NULL) { + parmap->fun(work); } XBT_DEBUG("Worker %u has finished", worker_id);