From da2ce25eaa2fbae84dd3308348b88dab9269211e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Christophe=20Thi=C3=A9ry?= Date: Wed, 2 Nov 2011 09:52:01 +0100 Subject: [PATCH] Switch directly between runnable contextes (work in progress) --- include/simix/context.h | 2 +- src/simix/private.h | 7 +- src/simix/smx_context_raw.c | 24 ++--- src/simix/smx_context_sysv.c | 141 ++++++++++++++++++--------- src/simix/smx_context_sysv_private.h | 11 +-- src/simix/smx_context_thread.c | 14 +-- src/simix/smx_process.c | 5 +- src/xbt/parmap.c | 15 ++- 8 files changed, 129 insertions(+), 90 deletions(-) diff --git a/include/simix/context.h b/include/simix/context.h index 98512e234a..4b4d5b9eb3 100644 --- a/include/simix/context.h +++ b/include/simix/context.h @@ -33,7 +33,7 @@ typedef void (*smx_pfn_context_free_t) (smx_context_t); typedef void (*smx_pfn_context_start_t) (smx_context_t); typedef void (*smx_pfn_context_stop_t) (smx_context_t); typedef void (*smx_pfn_context_suspend_t) (smx_context_t context); -typedef void (*smx_pfn_context_runall_t) (xbt_dynar_t processes); +typedef void (*smx_pfn_context_runall_t) (void); typedef smx_context_t (*smx_pfn_context_self_t) (void); typedef void* (*smx_pfn_context_get_data_t) (smx_context_t context); diff --git a/src/simix/private.h b/src/simix/private.h index 7b2d98c3c3..2008f9c1eb 100644 --- a/src/simix/private.h +++ b/src/simix/private.h @@ -231,12 +231,11 @@ static XBT_INLINE void SIMIX_context_suspend(smx_context_t context) } /** - \brief executes all the processes (in parallel if possible) - \param processes the dynar of processes to execute + \brief Executes all the processes to run (in parallel if possible). */ -static XBT_INLINE void SIMIX_context_runall(xbt_dynar_t processes) +static XBT_INLINE void SIMIX_context_runall() { - (*(simix_global->context_factory->runall)) (processes); + (*(simix_global->context_factory->runall)) (); } /** diff --git a/src/simix/smx_context_raw.c b/src/simix/smx_context_raw.c index 059870eb84..66a92649e7 100644 --- a/src/simix/smx_context_raw.c +++ b/src/simix/smx_context_raw.c @@ -360,33 +360,35 @@ void smx_ctx_raw_new_sr(void) XBT_VERB("New scheduling round"); } #else -static void smx_ctx_raw_runall_serial(xbt_dynar_t processes) +static void smx_ctx_raw_runall_serial() { smx_process_t process; unsigned int cursor; - xbt_dynar_foreach(processes, cursor, process) { - XBT_DEBUG("Schedule item %u of %lu",cursor,xbt_dynar_length(processes)); + xbt_dynar_foreach(simix_global->process_to_run, cursor, process) { + XBT_DEBUG("Schedule item %u of %lu", cursor, xbt_dynar_length(simix_global->process_to_run)); smx_ctx_raw_resume(process); } } #endif -static void smx_ctx_raw_runall_parallel(xbt_dynar_t processes) +static void smx_ctx_raw_runall_parallel() { #ifdef CONTEXT_THREADS - xbt_parmap_apply(parmap, (void_f_pvoid_t)smx_ctx_raw_resume, processes); + xbt_parmap_apply(parmap, (void_f_pvoid_t) smx_ctx_raw_resume, + simix_global->process_to_run); #endif } -static void smx_ctx_raw_runall(xbt_dynar_t processes) +static void smx_ctx_raw_runall() { - if (xbt_dynar_length(processes) >= SIMIX_context_get_parallel_threshold()) { - XBT_DEBUG("Runall // %lu", xbt_dynar_length(processes)); - smx_ctx_raw_runall_parallel(processes); + unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run); + if (nb_processes >= SIMIX_context_get_parallel_threshold()) { + XBT_DEBUG("Runall // %lu", nb_processes); + smx_ctx_raw_runall_parallel(); } else { - XBT_DEBUG("Runall serial %lu", xbt_dynar_length(processes)); - smx_ctx_raw_runall_serial(processes); + XBT_DEBUG("Runall serial %lu", nb_processes); + smx_ctx_raw_runall_serial(); } } diff --git a/src/simix/smx_context_sysv.c b/src/simix/smx_context_sysv.c index 4d30329226..984be819d7 100644 --- a/src/simix/smx_context_sysv.c +++ b/src/simix/smx_context_sysv.c @@ -10,6 +10,7 @@ #include "smx_context_sysv_private.h" #include "xbt/parmap.h" +#include "xbt/parmap_private.h" #include "simix/private.h" #include "gras_config.h" @@ -28,6 +29,9 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context); #ifdef CONTEXT_THREADS static xbt_parmap_t parmap; #endif +static unsigned long smx_ctx_sysv_process_index = 0; /* index of the next process to run in the + * list of runnable processes */ +static smx_ctx_sysv_t maestro_context; static smx_context_t smx_ctx_sysv_create_context(xbt_main_func_t code, int argc, char **argv, @@ -35,6 +39,16 @@ smx_ctx_sysv_create_context(xbt_main_func_t code, int argc, char **argv, static void smx_ctx_sysv_wrapper(int count, ...); +static void smx_ctx_sysv_stop_serial(smx_context_t context); +static void smx_ctx_sysv_suspend_serial(smx_context_t context); +static void smx_ctx_sysv_runall_serial(void); + +static void smx_ctx_sysv_stop_parallel(smx_context_t context); +static void smx_ctx_sysv_suspend_parallel(smx_context_t context); +static void smx_ctx_sysv_runall_parallel(void); + +static void smx_ctx_sysv_resume(smx_process_t first_process); + /* This is a bit paranoid about SIZEOF_VOIDP not being a multiple of SIZEOF_INT, * but it doesn't harm. */ #define CTX_ADDR_LEN (SIZEOF_VOIDP / SIZEOF_INT + !!(SIZEOF_VOIDP % SIZEOF_INT)) @@ -59,27 +73,29 @@ void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory) (*factory)->create_context = smx_ctx_sysv_create_context; /* Do not overload that method (*factory)->finalize */ (*factory)->free = smx_ctx_sysv_free; - (*factory)->stop = smx_ctx_sysv_stop; - (*factory)->suspend = smx_ctx_sysv_suspend; (*factory)->name = "smx_sysv_context_factory"; if (SIMIX_context_is_parallel()) { #ifdef CONTEXT_THREADS /* To use parallel ucontexts a thread pool is needed */ parmap = xbt_parmap_new(2); + (*factory)->stop = smx_ctx_sysv_stop_parallel; + (*factory)->suspend = smx_ctx_sysv_suspend_parallel; (*factory)->runall = smx_ctx_sysv_runall_parallel; (*factory)->self = smx_ctx_sysv_self_parallel; #else THROWF(arg_error, 0, "No thread support for parallel context execution"); #endif - }else{ - (*factory)->runall = smx_ctx_sysv_runall; + } else { + (*factory)->stop = smx_ctx_sysv_stop_serial; + (*factory)->suspend = smx_ctx_sysv_suspend_serial; + (*factory)->runall = smx_ctx_sysv_runall_serial; } } int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory) { #ifdef CONTEXT_THREADS - if(parmap) + if (parmap) xbt_parmap_destroy(parmap); #endif return smx_ctx_base_factory_finalize(factory); @@ -100,15 +116,14 @@ smx_ctx_sysv_create_context_sized(size_t size, xbt_main_func_t code, cleanup_func, data); - /* If the user provided a function for the process then use it - otherwise is the context for maestro */ + /* if the user provided a function for the process then use it, + otherwise it is the context for maestro */ if (code) { - int res; - res = getcontext(&(context->uc)); + int res = getcontext(&(context->uc)); xbt_assert(res == 0, - "Error in context saving: %d (%s)", errno, - strerror(errno)); + "Error in context saving: %d (%s)", errno, + strerror(errno)); context->uc.uc_link = NULL; @@ -127,12 +142,11 @@ smx_ctx_sysv_create_context_sized(size_t size, xbt_main_func_t code, ctx_addr.addr = context; makecontext(&context->uc, (void (*)())smx_ctx_sysv_wrapper, CTX_ADDR_LEN, CTX_ADDR_SPLIT(ctx_addr)); - }else{ + } else { maestro_context = context; } return (smx_context_t) context; - } static smx_context_t @@ -161,12 +175,6 @@ void smx_ctx_sysv_free(smx_context_t context) smx_ctx_base_free(context); } -void smx_ctx_sysv_stop(smx_context_t context) -{ - smx_ctx_base_stop(context); - smx_ctx_sysv_suspend(context); -} - void smx_ctx_sysv_wrapper(int first, ...) { union u_ctx_addr ctx_addr; @@ -177,60 +185,101 @@ void smx_ctx_sysv_wrapper(int first, ...) va_list ap; int i; va_start(ap, first); - for(i = 1; i < CTX_ADDR_LEN; i++) + for (i = 1; i < CTX_ADDR_LEN; i++) ctx_addr.intv[i] = va_arg(ap, int); va_end(ap); } context = ctx_addr.addr; (context->super.code) (context->super.argc, context->super.argv); - smx_ctx_sysv_stop((smx_context_t) context); + simix_global->context_factory->stop((smx_context_t) context); +} + +static void smx_ctx_sysv_stop_serial(smx_context_t context) +{ + smx_ctx_base_stop(context); + smx_ctx_sysv_suspend_serial(context); } -void smx_ctx_sysv_suspend(smx_context_t context) +static void smx_ctx_sysv_suspend_serial(smx_context_t context) { - SIMIX_context_set_current((smx_context_t) maestro_context); - int rv; - rv = swapcontext(&((smx_ctx_sysv_t) context)->uc, &((smx_ctx_sysv_t)context)->old_uc); + /* determine the next context */ + smx_context_t next_context; + unsigned long int i = smx_ctx_sysv_process_index++; + + if (i < xbt_dynar_length(simix_global->process_to_run)) { + XBT_DEBUG("Run next process"); - xbt_assert((rv == 0), "Context swapping failure"); + /* execute the next process */ + next_context = xbt_dynar_get_as( + simix_global->process_to_run,i, smx_process_t)->context; + } + else { + /* all processes were run, return to maestro */ + XBT_DEBUG("No more process to run"); + + next_context = (smx_context_t) maestro_context; + } + SIMIX_context_set_current(next_context); + swapcontext(&((smx_ctx_sysv_t) context)->uc, + &((smx_ctx_sysv_t) next_context)->uc); } -void smx_ctx_sysv_resume(smx_context_t context) +static void smx_ctx_sysv_resume(smx_process_t first_process) { + smx_context_t context = first_process->context; SIMIX_context_set_current(context); - int rv; - rv = swapcontext(&((smx_ctx_sysv_t)context)->old_uc, &((smx_ctx_sysv_t) context)->uc); - - xbt_assert((rv == 0), "Context swapping failure"); + swapcontext(&maestro_context->uc, + &((smx_ctx_sysv_t) context)->uc); } -void smx_ctx_sysv_runall(xbt_dynar_t processes) +static void smx_ctx_sysv_runall_serial(void) { - smx_process_t process; - unsigned int cursor; + if (xbt_dynar_length(simix_global->process_to_run) > 0) { + smx_process_t first_process = + xbt_dynar_get_as(simix_global->process_to_run, 0, smx_process_t); + smx_ctx_sysv_process_index = 1; - xbt_dynar_foreach(processes, cursor, process) { - XBT_DEBUG("Schedule item %u of %lu",cursor,xbt_dynar_length(processes)); - smx_ctx_sysv_resume(process->context); + /* execute the first process */ + smx_ctx_sysv_resume(first_process); } } -void smx_ctx_sysv_resume_parallel(smx_process_t process) +static void smx_ctx_sysv_stop_parallel(smx_context_t context) +{ + smx_ctx_base_stop(context); + smx_ctx_sysv_suspend_parallel(context); +} + +static void smx_ctx_sysv_suspend_parallel(smx_context_t context) { - smx_context_t context = process->context; - SIMIX_context_set_current((smx_context_t) context); - int rv; - rv = swapcontext(&((smx_ctx_sysv_t)context)->old_uc, &((smx_ctx_sysv_t) context)->uc); - SIMIX_context_set_current((smx_context_t) maestro_context); +#ifdef CONTEXT_THREADS + /* determine the next context */ + smx_context_t next_context; + unsigned long int i = __sync_fetch_and_add(&parmap->index, 1); + + if (i < xbt_dynar_length(simix_global->process_to_run)) { + /* execute the next process */ + XBT_DEBUG("Run next process"); + next_context = xbt_dynar_get_as(simix_global->process_to_run, i, smx_process_t)->context; + } + else { + /* all processes were run, go to the barrier */ + XBT_DEBUG("No more processes to run"); + next_context = (smx_context_t) maestro_context; + } - xbt_assert((rv == 0), "Context swapping failure"); + SIMIX_context_set_current(next_context); + swapcontext(&((smx_ctx_sysv_t) context)->uc, + &((smx_ctx_sysv_t) next_context)->uc); +#endif } -void smx_ctx_sysv_runall_parallel(xbt_dynar_t processes) +static void smx_ctx_sysv_runall_parallel(void) { #ifdef CONTEXT_THREADS - xbt_parmap_apply(parmap, (void_f_pvoid_t)smx_ctx_sysv_resume_parallel, processes); + xbt_parmap_apply(parmap, (void_f_pvoid_t) smx_ctx_sysv_resume, + simix_global->process_to_run); #endif } diff --git a/src/simix/smx_context_sysv_private.h b/src/simix/smx_context_sysv_private.h index 168379b1cc..d829422898 100644 --- a/src/simix/smx_context_sysv_private.h +++ b/src/simix/smx_context_sysv_private.h @@ -25,16 +25,13 @@ SG_BEGIN_DECL() typedef struct s_smx_ctx_sysv { s_smx_ctx_base_t super; /* Fields of super implementation */ - ucontext_t uc; /* the thread that execute the code */ - ucontext_t old_uc; /* the context that was swapped with */ + ucontext_t uc; /* the ucontext that executes the code */ #ifdef HAVE_VALGRIND_VALGRIND_H unsigned int valgrind_stack_id; /* the valgrind stack id */ #endif char stack[0]; /* the thread stack (must remain the last element of the structure) */ } s_smx_ctx_sysv_t, *smx_ctx_sysv_t; -smx_ctx_sysv_t maestro_context; - void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory); int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory); @@ -45,12 +42,6 @@ smx_ctx_sysv_create_context_sized(size_t structure_size, void_pfn_smxprocess_t cleanup_func, void *data); void smx_ctx_sysv_free(smx_context_t context); -void smx_ctx_sysv_stop(smx_context_t context); -void smx_ctx_sysv_suspend(smx_context_t context); -void smx_ctx_sysv_resume(smx_context_t new_context); -void smx_ctx_sysv_runall(xbt_dynar_t processes); -void smx_ctx_sysv_resume_parallel(smx_process_t new_context); -void smx_ctx_sysv_runall_parallel(xbt_dynar_t processes); int smx_ctx_sysv_get_thread_id(void); smx_context_t smx_ctx_sysv_self_parallel(void); diff --git a/src/simix/smx_context_thread.c b/src/simix/smx_context_thread.c index a602a225f4..e3fbcf0f4b 100644 --- a/src/simix/smx_context_thread.c +++ b/src/simix/smx_context_thread.c @@ -35,8 +35,8 @@ static void smx_ctx_thread_free(smx_context_t context); static void smx_ctx_thread_stop(smx_context_t context); static void smx_ctx_thread_suspend(smx_context_t context); static void smx_ctx_thread_resume(smx_context_t new_context); -static void smx_ctx_thread_runall_serial(xbt_dynar_t processes); -static void smx_ctx_thread_runall_parallel(xbt_dynar_t processes); +static void smx_ctx_thread_runall_serial(void); +static void smx_ctx_thread_runall_parallel(void); static smx_context_t smx_ctx_thread_self(void); static int smx_ctx_thread_factory_finalize(smx_context_factory_t *factory); @@ -171,26 +171,26 @@ static void smx_ctx_thread_suspend(smx_context_t context) xbt_os_sem_acquire(smx_ctx_thread_sem); } -static void smx_ctx_thread_runall_serial(xbt_dynar_t processes) +static void smx_ctx_thread_runall_serial(void) { smx_process_t process; unsigned int cursor; - xbt_dynar_foreach(processes, cursor, process) { + xbt_dynar_foreach(simix_global->process_to_run, cursor, process) { xbt_os_sem_release(((smx_ctx_thread_t) process->context)->begin); xbt_os_sem_acquire(((smx_ctx_thread_t) process->context)->end); } } -static void smx_ctx_thread_runall_parallel(xbt_dynar_t processes) +static void smx_ctx_thread_runall_parallel(void) { unsigned int index; smx_process_t process; - xbt_dynar_foreach(processes, index, process) + xbt_dynar_foreach(simix_global->process_to_run, index, process) xbt_os_sem_release(((smx_ctx_thread_t) process->context)->begin); - xbt_dynar_foreach(processes, index, process) { + xbt_dynar_foreach(simix_global->process_to_run, index, process) { xbt_os_sem_acquire(((smx_ctx_thread_t) process->context)->end); } } diff --git a/src/simix/smx_process.c b/src/simix/smx_process.c index ac73df56c1..366cc77f8a 100644 --- a/src/simix/smx_process.c +++ b/src/simix/smx_process.c @@ -214,7 +214,8 @@ void SIMIX_process_create(smx_process_t *process, */ void SIMIX_process_runall(void) { - SIMIX_context_runall(simix_global->process_to_run); + SIMIX_context_runall(); + xbt_dynar_t tmp = simix_global->process_that_ran; simix_global->process_that_ran = simix_global->process_to_run; simix_global->process_to_run = tmp; @@ -572,7 +573,7 @@ void SIMIX_process_yield(void) SIMIX_context_suspend(self->context); /* Ok, maestro returned control to us */ - XBT_DEBUG("Maestro returned control to me: '%s'", self->name); + XBT_DEBUG("Control returned to me: '%s'", self->name); if (self->context->iwannadie){ XBT_DEBUG("I wanna die!"); diff --git a/src/xbt/parmap.c b/src/xbt/parmap.c index c88a26ba49..dccc15d261 100644 --- a/src/xbt/parmap.c +++ b/src/xbt/parmap.c @@ -88,26 +88,23 @@ static void *_xbt_parmap_worker_main(void *arg) XBT_DEBUG("New worker thread created (%u)", worker_id); /* Worker's main loop */ - while(1){ + while (1) { #ifdef HAVE_FUTEX_H xbt_event_wait(parmap->sync_event); #endif - if(parmap->status == PARMAP_WORK){ - unsigned int i; - unsigned int n = 0; + if (parmap->status == PARMAP_WORK) { XBT_DEBUG("Worker %u got a job", worker_id); - while ((i = __sync_fetch_and_add(&parmap->index, 1)) - < xbt_dynar_length(parmap->data)) { + unsigned int i = __sync_fetch_and_add(&parmap->index, 1); + if (i < xbt_dynar_length(parmap->data)) { parmap->fun(xbt_dynar_get_as(parmap->data, i, void*)); - n++; } - XBT_DEBUG("Worker %u processed %u tasks", worker_id, n); + XBT_DEBUG("Worker %u has finished", worker_id); /* We are destroying the parmap */ - }else{ + } else { #ifdef HAVE_FUTEX_H xbt_event_end(parmap->sync_event); #endif -- 2.20.1