From b035a64cb01b6a8a72ee15d61d956fc2d8ebe6c4 Mon Sep 17 00:00:00 2001 From: Arnaud Giersch Date: Wed, 15 Feb 2012 21:38:26 +0100 Subject: [PATCH] Implement stride for parmap_apply. --- include/xbt/parmap.h | 6 +++-- src/simix/smx_context_raw.c | 24 ++++++++++++++------ src/simix/smx_context_sysv.c | 23 ++++++++++++++----- src/surf/surf.c | 4 ++-- src/xbt/parmap.c | 43 +++++++++++++++++++++++++++--------- teshsuite/xbt/parmap_test.c | 4 ++-- testsuite/xbt/parmap_bench.c | 38 +++++++++++++++++-------------- 7 files changed, 95 insertions(+), 47 deletions(-) diff --git a/include/xbt/parmap.h b/include/xbt/parmap.h index 624d2783cd..f57514931d 100644 --- a/include/xbt/parmap.h +++ b/include/xbt/parmap.h @@ -49,8 +49,10 @@ XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap); XBT_PUBLIC(void) xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, - xbt_dynar_t data); -XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap); + xbt_dynar_t data, + unsigned stride, + int once); +XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap, unsigned *index); #ifdef HAVE_MC XBT_PUBLIC(xbt_parmap_t) xbt_parmap_mc_new(unsigned int num_workers, diff --git a/src/simix/smx_context_raw.c b/src/simix/smx_context_raw.c index 829f47e714..44a1278a47 100644 --- a/src/simix/smx_context_raw.c +++ b/src/simix/smx_context_raw.c @@ -21,6 +21,7 @@ typedef struct s_smx_ctx_raw { s_smx_ctx_base_t super; /* Fields of super implementation */ char *malloced_stack; /* malloced area containing the stack */ raw_stack_t stack_top; /* pointer to stack top (within previous area) */ + unsigned int parmap_index; #ifdef HAVE_VALGRIND_VALGRIND_H unsigned int valgrind_stack_id; /* the valgrind stack id */ #endif @@ -512,15 +513,19 @@ static void smx_ctx_raw_suspend_parallel(smx_context_t context) { #ifdef CONTEXT_THREADS /* determine the next context */ - smx_process_t next_work = xbt_parmap_next(raw_parmap); + smx_process_t next_work; smx_context_t next_context; raw_stack_t next_stack; + next_work = xbt_parmap_next(raw_parmap, + &((smx_ctx_raw_t) context)->parmap_index); if (next_work != NULL) { /* there is a next process to resume */ XBT_DEBUG("Run next process"); next_context = next_work->context; next_stack = ((smx_ctx_raw_t) next_context)->stack_top; + ((smx_ctx_raw_t) next_context)->parmap_index = + ((smx_ctx_raw_t) context)->parmap_index; } else { /* all processes were run, go to the barrier */ @@ -554,11 +559,15 @@ static void smx_ctx_raw_resume_parallel(smx_process_t first_process) raw_workers_context[worker_id] = worker_context; XBT_DEBUG("Saving worker stack %lu", worker_id); raw_stack_t* worker_stack = &(worker_context)->stack_top; - - - smx_context_t context = first_process->context; - SIMIX_context_set_current(context); - raw_swapcontext(worker_stack, ((smx_ctx_raw_t) context)->stack_top); + unsigned int index = 0; + + first_process = xbt_parmap_next(raw_parmap, &index); + if (first_process != NULL) { + smx_context_t context = first_process->context; + ((smx_ctx_raw_t) context)->parmap_index = index; + SIMIX_context_set_current(context); + raw_swapcontext(worker_stack, ((smx_ctx_raw_t) context)->stack_top); + } #endif } @@ -570,7 +579,8 @@ static void smx_ctx_raw_runall_parallel(void) #ifdef CONTEXT_THREADS raw_threads_working = 0; xbt_parmap_apply(raw_parmap, (void_f_pvoid_t) smx_ctx_raw_resume_parallel, - simix_global->process_to_run); + simix_global->process_to_run, + 1 + SIMIX_context_get_parallel_threshold() / SIMIX_context_get_nthreads(), 1); #else xbt_die("You asked for a parallel execution, but you don't have any threads."); #endif diff --git a/src/simix/smx_context_sysv.c b/src/simix/smx_context_sysv.c index 3af7b3cef9..25ef8ec607 100644 --- a/src/simix/smx_context_sysv.c +++ b/src/simix/smx_context_sysv.c @@ -29,6 +29,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context); typedef struct s_smx_ctx_sysv { s_smx_ctx_base_t super; /* Fields of super implementation */ ucontext_t uc; /* the ucontext that executes the code */ + unsigned int parmap_index; #ifdef HAVE_VALGRIND_VALGRIND_H unsigned int valgrind_stack_id; /* the valgrind stack id */ #endif @@ -278,15 +279,19 @@ static void smx_ctx_sysv_suspend_parallel(smx_context_t context) { #ifdef CONTEXT_THREADS /* determine the next context */ - smx_process_t next_work = xbt_parmap_next(sysv_parmap); + smx_process_t next_work; smx_context_t next_context; ucontext_t* next_stack; + next_work = xbt_parmap_next(sysv_parmap, + &((smx_ctx_sysv_t) context)->parmap_index); if (next_work != NULL) { /* there is a next process to resume */ XBT_DEBUG("Run next process"); next_context = next_work->context; next_stack = &((smx_ctx_sysv_t) next_context)->uc; + ((smx_ctx_sysv_t) next_context)->parmap_index = + ((smx_ctx_sysv_t) context)->parmap_index; } else { /* all processes were run, go to the barrier */ @@ -310,10 +315,15 @@ static void smx_ctx_sysv_resume_parallel(smx_process_t first_process) smx_ctx_sysv_t worker_context = (smx_ctx_sysv_t)SIMIX_context_self(); sysv_workers_context[worker_id] = worker_context; ucontext_t* worker_stack = &worker_context->uc; - - smx_context_t context = first_process->context; - SIMIX_context_set_current(context); - swapcontext(worker_stack, &((smx_ctx_sysv_t) context)->uc); + unsigned int index = 0; + + first_process = xbt_parmap_next(sysv_parmap, &index); + if (first_process != NULL) { + smx_context_t context = first_process->context; + ((smx_ctx_sysv_t) context)->parmap_index = index; + SIMIX_context_set_current(context); + swapcontext(worker_stack, &((smx_ctx_sysv_t) context)->uc); + } #endif } @@ -322,6 +332,7 @@ static void smx_ctx_sysv_runall_parallel(void) #ifdef CONTEXT_THREADS sysv_threads_working = 0; xbt_parmap_apply(sysv_parmap, (void_f_pvoid_t) smx_ctx_sysv_resume_parallel, - simix_global->process_to_run); + simix_global->process_to_run, + 1 + SIMIX_context_get_parallel_threshold() / SIMIX_context_get_nthreads(), 1); #endif } diff --git a/src/surf/surf.c b/src/surf/surf.c index 72a754283d..d32f9e56f1 100644 --- a/src/surf/surf.c +++ b/src/surf/surf.c @@ -593,7 +593,7 @@ double surf_solve(double max_date) if (surf_get_nthreads() > 1 && surf_do_par()) { /* parallel version */ #ifdef CONTEXT_THREADS - xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_share_resources, model_list); + xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_share_resources, model_list, 1, 0); #else xbt_die("Asked to run in parallel, but no thread at hand..."); #endif @@ -684,7 +684,7 @@ double surf_solve(double max_date) if (surf_get_nthreads() > 1 && surf_do_par()) { /* parallel version */ #ifdef CONTEXT_THREADS - xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_update_actions_state, model_list); + xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_update_actions_state, model_list, 1, 0); #endif } else { diff --git a/src/xbt/parmap.c b/src/xbt/parmap.c index 3e56435315..fd685b03c8 100644 --- a/src/xbt/parmap.c +++ b/src/xbt/parmap.c @@ -71,6 +71,8 @@ typedef struct s_xbt_parmap { void_f_pvoid_t fun; /**< function to run in parallel on each element of data */ xbt_dynar_t data; /**< parameters to pass to fun in parallel */ unsigned int index; /**< index of the next element of data to pick */ + unsigned stride; + int apply_once; #ifdef HAVE_MC int finish; @@ -269,12 +271,15 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode) * \param fun the function to call in parallel * \param data each element of this dynar will be passed as an argument to fun */ -void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data) +void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, + xbt_dynar_t data, unsigned stride, int once) { /* Assign resources to worker threads */ parmap->fun = fun; parmap->data = data; parmap->index = 0; + parmap->stride = stride; + parmap->apply_once = once; parmap->master_signal_f(parmap); xbt_parmap_work(parmap); parmap->master_wait_f(parmap); @@ -288,21 +293,37 @@ void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data) * * \return the next task to process, or NULL if there is no more work */ -void* xbt_parmap_next(xbt_parmap_t parmap) +void* xbt_parmap_next(xbt_parmap_t parmap, unsigned *index) { - unsigned int index = __sync_fetch_and_add(&parmap->index, 1); - if (index < xbt_dynar_length(parmap->data)) { - return xbt_dynar_get_as(parmap->data, index, void*); - } - return NULL; + unsigned i = *index; + if (i % parmap->stride == 0) + i = __sync_fetch_and_add(&parmap->index, parmap->stride); + *index = i + 1; + return i < xbt_dynar_length(parmap->data) ? + xbt_dynar_get_as(parmap->data, i, void*) : NULL; } static void xbt_parmap_work(xbt_parmap_t parmap) { - unsigned index; - while ((index = __sync_fetch_and_add(&parmap->index, 1)) - < xbt_dynar_length(parmap->data)) - parmap->fun(xbt_dynar_get_as(parmap->data, index, void*)); + if (parmap->apply_once) { + parmap->fun(NULL); + } else { + int more = 1; + unsigned index; + unsigned next; + do { + index = __sync_fetch_and_add(&parmap->index, parmap->stride); + next = index + parmap->stride; + if (next >= xbt_dynar_length(parmap->data)) { + next = xbt_dynar_length(parmap->data); + more = 0; + } + while (index < next) { + parmap->fun(xbt_dynar_get_as(parmap->data, index, void*)); + index++; + } + } while (more); + } } /** diff --git a/teshsuite/xbt/parmap_test.c b/teshsuite/xbt/parmap_test.c index 52c4050925..bc687da267 100644 --- a/teshsuite/xbt/parmap_test.c +++ b/teshsuite/xbt/parmap_test.c @@ -42,7 +42,7 @@ static int test_parmap_basic(e_xbt_parmap_mode_t mode) } for (i = 0; i < num; i++) - xbt_parmap_apply(parmap, fun_double, data); + xbt_parmap_apply(parmap, fun_double, data, 10, 0); for (i = 0; i < len; i++) { unsigned expected = (1U << num) * (i + 1) - 1; @@ -94,7 +94,7 @@ static int test_parmap_extended(e_xbt_parmap_mode_t mode) for (i = 0; i < len; i++) xbt_dynar_push_as(data, void *, &a[i]); - xbt_parmap_apply(parmap, fun_get_id, data); + xbt_parmap_apply(parmap, fun_get_id, data, 1, 0); qsort(a, len, sizeof a[0], fun_compare); count = 1; diff --git a/testsuite/xbt/parmap_bench.c b/testsuite/xbt/parmap_bench.c index e95d77deae..1ce420a5da 100644 --- a/testsuite/xbt/parmap_bench.c +++ b/testsuite/xbt/parmap_bench.c @@ -82,7 +82,7 @@ static void array_new(unsigned **a, xbt_dynar_t *data) } } -static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode) +static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode, unsigned stride) { unsigned *a; xbt_dynar_t data; @@ -102,7 +102,7 @@ static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode) start_time = xbt_os_time(); do { parmap = xbt_parmap_new(nthreads, mode); - xbt_parmap_apply(parmap, fun_to_apply, data); + xbt_parmap_apply(parmap, fun_to_apply, data, stride, 0); xbt_parmap_destroy(parmap); elapsed_time = xbt_os_time() - start_time; i++; @@ -115,7 +115,7 @@ static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode) xbt_free(a); } -static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode) +static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode, unsigned stride) { unsigned *a; xbt_dynar_t data; @@ -135,7 +135,7 @@ static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode) i = 0; start_time = xbt_os_time(); do { - xbt_parmap_apply(parmap, fun_to_apply, data); + xbt_parmap_apply(parmap, fun_to_apply, data, stride, 0); elapsed_time = xbt_os_time() - start_time; i++; } while (elapsed_time < TIMEOUT); @@ -148,8 +148,8 @@ static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode) xbt_free(a); } -static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t), - int nthreads, unsigned modes) +static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t, unsigned), + int nthreads, unsigned modes, unsigned stride) { e_xbt_parmap_mode_t all_modes[] = { XBT_PARMAP_POSIX, XBT_PARMAP_FUTEX, @@ -158,7 +158,7 @@ static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t), unsigned i; for (i = 0 ; i < sizeof all_modes / sizeof all_modes[0] ; i++) { if (1U << i & modes) - bench_fun(nthreads, all_modes[i]); + bench_fun(nthreads, all_modes[i], stride); } } @@ -166,14 +166,16 @@ int main(int argc, char *argv[]) { int nthreads; unsigned modes = MODES_DEFAULT; + unsigned stride = 1; SIMIX_global_init(&argc, argv); - if (argc != 2 && argc != 3) { + if (argc < 2 || argc > 4) { fprintf(stderr, - "Usage: %s nthreads [modes]\n" + "Usage: %s nthreads [modes [stride]]\n" " nthreads - number of working threads\n" - " modes - bitmask of modes to test\n", + " modes - bitmask of modes to test\n" + " stride - parmap stride\n", argv[0]); return EXIT_FAILURE; } @@ -182,30 +184,32 @@ int main(int argc, char *argv[]) fprintf(stderr, "ERROR: invalid thread count: %d\n", nthreads); return EXIT_FAILURE; } - if (argc == 3) + if (argc >= 3) modes = strtol(argv[2], NULL, 0); + if (argc >= 4) + stride = atoi(argv[3]); - printf("Parmap benchmark with %d workers (modes = %#x)...\n\n", - nthreads, modes); + printf("Parmap benchmark with %d workers (modes = %#x; stride = %d)...\n\n", + nthreads, modes, stride); fun_to_apply = fun_small_comp; printf("Benchmark for parmap create+apply+destroy (small comp):\n"); - bench_all_modes(bench_parmap_full, nthreads, modes); + bench_all_modes(bench_parmap_full, nthreads, modes, stride); printf("\n"); printf("Benchmark for parmap apply only (small comp):\n"); - bench_all_modes(bench_parmap_apply, nthreads, modes); + bench_all_modes(bench_parmap_apply, nthreads, modes, stride); printf("\n"); fun_to_apply = fun_big_comp; printf("Benchmark for parmap create+apply+destroy (big comp):\n"); - bench_all_modes(bench_parmap_full, nthreads, modes); + bench_all_modes(bench_parmap_full, nthreads, modes, stride); printf("\n"); printf("Benchmark for parmap apply only (big comp):\n"); - bench_all_modes(bench_parmap_apply, nthreads, modes); + bench_all_modes(bench_parmap_apply, nthreads, modes, stride); printf("\n"); return EXIT_SUCCESS; -- 2.20.1