From a2c3c96044c07288f1e1a2c3245f8a1fb62c22d6 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Guillaume=20Serri=C3=A8re?= Date: Tue, 14 May 2013 16:59:06 +0200 Subject: [PATCH 1/1] Add of context creation in parmap. MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Remove parmap from unitTesting because we need a context factory for parmap creation now. Signed-off-by: Guillaume Serrière --- buildtools/Cmake/UnitTesting.cmake | 2 - src/simix/smx_context.c | 13 +-- src/simix/smx_context_raw.c | 22 ++-- src/simix/smx_context_sysv.c | 18 +-- src/xbt/parmap.c | 177 ++++------------------------- 5 files changed, 54 insertions(+), 178 deletions(-) diff --git a/buildtools/Cmake/UnitTesting.cmake b/buildtools/Cmake/UnitTesting.cmake index 4351f1b60e..7800769b06 100644 --- a/buildtools/Cmake/UnitTesting.cmake +++ b/buildtools/Cmake/UnitTesting.cmake @@ -12,7 +12,6 @@ set(TEST_CFILES src/xbt/xbt_strbuff.c src/xbt/xbt_sha.c src/xbt/config.c - src/xbt/parmap.c ) set(TEST_UNITS ${CMAKE_CURRENT_BINARY_DIR}/src/cunit_unit.c @@ -25,7 +24,6 @@ set(TEST_UNITS ${CMAKE_CURRENT_BINARY_DIR}/src/xbt_strbuff_unit.c ${CMAKE_CURRENT_BINARY_DIR}/src/xbt_sha_unit.c ${CMAKE_CURRENT_BINARY_DIR}/src/config_unit.c - ${CMAKE_CURRENT_BINARY_DIR}/src/parmap_unit.c ${CMAKE_CURRENT_BINARY_DIR}/src/simgrid_units_main.c ) diff --git a/src/simix/smx_context.c b/src/simix/smx_context.c index ba6d03f6aa..64891fd82a 100644 --- a/src/simix/smx_context.c +++ b/src/simix/smx_context.c @@ -36,6 +36,11 @@ static e_xbt_parmap_mode_t smx_parallel_synchronization_mode = XBT_PARMAP_DEFAUL */ void SIMIX_context_mod_init(void) { +#if defined(CONTEXT_THREADS) && !defined(HAVE_THREAD_LOCAL_STORAGE) + /* the __thread storage class is not available on this platform: + * use getspecific/setspecific instead to store the current context in each thread */ + xbt_os_thread_key_create(&smx_current_context_key); +#endif if (!simix_global->context_factory) { /* select the context factory to use to create the contexts */ if (smx_factory_initializer_to_use) { @@ -86,12 +91,6 @@ void SIMIX_context_mod_init(void) } } } - -#if defined(CONTEXT_THREADS) && !defined(HAVE_THREAD_LOCAL_STORAGE) - /* the __thread storage class is not available on this platform: - * use getspecific/setspecific instead to store the current context in each thread */ - xbt_os_thread_key_create(&smx_current_context_key); -#endif } /** @@ -138,7 +137,6 @@ XBT_INLINE int SIMIX_context_get_nthreads(void) { * \param nb_threads the number of threads to use */ XBT_INLINE void SIMIX_context_set_nthreads(int nb_threads) { - if (nb_threads<=0) { nb_threads = xbt_os_get_numcores(); XBT_INFO("Auto-setting contexts/nthreads to %d",nb_threads); @@ -149,7 +147,6 @@ XBT_INLINE void SIMIX_context_set_nthreads(int nb_threads) { THROWF(arg_error, 0, "The thread factory cannot be run in parallel"); #endif } - smx_parallel_contexts = nb_threads; } diff --git a/src/simix/smx_context_raw.c b/src/simix/smx_context_raw.c index fd9058e7ce..f2f05446a2 100644 --- a/src/simix/smx_context_raw.c +++ b/src/simix/smx_context_raw.c @@ -31,7 +31,7 @@ typedef struct s_smx_ctx_raw { #ifdef CONTEXT_THREADS static xbt_parmap_t raw_parmap; -static raw_stack_t* raw_workers_stacks; /* space to save the worker stack in each thread */ +static smx_ctx_raw_t* raw_workers_context; /* space to save the worker context in each thread */ static unsigned long raw_threads_working; /* number of threads that have started their work */ static xbt_os_thread_key_t raw_worker_id_key; /* thread-specific storage for the thread id */ #endif @@ -241,9 +241,11 @@ void SIMIX_ctx_raw_factory_init(smx_context_factory_t *factory) if (SIMIX_context_is_parallel()) { #ifdef CONTEXT_THREADS int nthreads = SIMIX_context_get_nthreads(); - raw_parmap = xbt_parmap_new(nthreads, SIMIX_context_get_parallel_mode()); - raw_workers_stacks = xbt_new(raw_stack_t, nthreads); xbt_os_thread_key_create(&raw_worker_id_key); + raw_parmap = xbt_parmap_new(nthreads, SIMIX_context_get_parallel_mode()); + raw_workers_context = xbt_new(smx_ctx_raw_t, nthreads); + raw_maestro_context=NULL; + #endif if (SIMIX_context_get_parallel_threshold() > 1) { /* choose dynamically */ @@ -280,7 +282,7 @@ static int smx_ctx_raw_factory_finalize(smx_context_factory_t *factory) #ifdef CONTEXT_THREADS if (raw_parmap) xbt_parmap_destroy(raw_parmap); - xbt_free(raw_workers_stacks); + xbt_free(raw_workers_context); #endif return smx_ctx_base_factory_finalize(factory); } @@ -325,7 +327,8 @@ smx_ctx_raw_create_context(xbt_main_func_t code, int argc, char **argv, #endif /* HAVE_VALGRIND_VALGRIND_H */ } else { - raw_maestro_context = context; + if(data != NULL && raw_maestro_context==NULL) + raw_maestro_context = context; if(MC_is_active()) MC_ignore_heap(&(raw_maestro_context->stack_top), sizeof(raw_maestro_context->stack_top)); @@ -521,12 +524,12 @@ static void smx_ctx_raw_suspend_parallel(smx_context_t context) else { /* all processes were run, go to the barrier */ XBT_DEBUG("No more processes to run"); - next_context = (smx_context_t) raw_maestro_context; unsigned long worker_id = (unsigned long) xbt_os_thread_get_specific(raw_worker_id_key); + next_context = (smx_context_t)raw_workers_context[worker_id]; XBT_DEBUG("Restoring worker stack %lu (working threads = %lu)", worker_id, raw_threads_working); - next_stack = raw_workers_stacks[worker_id]; + next_stack = ((smx_ctx_raw_t)next_context)->stack_top; } SIMIX_context_set_current(next_context); @@ -544,8 +547,11 @@ static void smx_ctx_raw_resume_parallel(smx_process_t first_process) #ifdef CONTEXT_THREADS unsigned long worker_id = __sync_fetch_and_add(&raw_threads_working, 1); xbt_os_thread_set_specific(raw_worker_id_key, (void*) worker_id); + smx_ctx_raw_t worker_context = (smx_ctx_raw_t)SIMIX_context_self(); + raw_workers_context[worker_id] = worker_context; XBT_DEBUG("Saving worker stack %lu", worker_id); - raw_stack_t* worker_stack = &raw_workers_stacks[worker_id]; + raw_stack_t* worker_stack = &(worker_context)->stack_top; + smx_context_t context = first_process->context; SIMIX_context_set_current(context); diff --git a/src/simix/smx_context_sysv.c b/src/simix/smx_context_sysv.c index 90e6b84df3..6a19200e9b 100644 --- a/src/simix/smx_context_sysv.c +++ b/src/simix/smx_context_sysv.c @@ -37,7 +37,7 @@ typedef struct s_smx_ctx_sysv { #ifdef CONTEXT_THREADS static xbt_parmap_t sysv_parmap; -static ucontext_t* sysv_workers_stacks; /* space to save the worker's stack in each thread */ +static smx_ctx_sysv_t* sysv_workers_context; /* space to save the worker's context in each thread */ static unsigned long sysv_threads_working; /* number of threads that have started their work */ static xbt_os_thread_key_t sysv_worker_id_key; /* thread-specific storage for the thread id */ #endif @@ -90,7 +90,8 @@ void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory) #ifdef CONTEXT_THREADS /* To use parallel ucontexts a thread pool is needed */ int nthreads = SIMIX_context_get_nthreads(); sysv_parmap = xbt_parmap_new(nthreads, SIMIX_context_get_parallel_mode()); - sysv_workers_stacks = xbt_new(ucontext_t, nthreads); + sysv_workers_context = xbt_new(smx_ctx_sysv_t, nthreads); + sysv_maestro_context = NULL; xbt_os_thread_key_create(&sysv_worker_id_key); (*factory)->stop = smx_ctx_sysv_stop_parallel; (*factory)->suspend = smx_ctx_sysv_suspend_parallel; @@ -110,7 +111,7 @@ static int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory) #ifdef CONTEXT_THREADS if (sysv_parmap) xbt_parmap_destroy(sysv_parmap); - xbt_free(sysv_workers_stacks); + xbt_free(sysv_workers_context); #endif return smx_ctx_base_factory_finalize(factory); } @@ -165,7 +166,8 @@ smx_ctx_sysv_create_context_sized(size_t size, xbt_main_func_t code, sizeof(smx_ctx_sysv_t), sizeof(int), CTX_ADDR_LEN); } } else { - sysv_maestro_context = context; + if(data != NULL && sysv_maestro_context == NULL) + sysv_maestro_context = context; } if(MC_is_active() && code) @@ -289,10 +291,10 @@ static void smx_ctx_sysv_suspend_parallel(smx_context_t context) else { /* all processes were run, go to the barrier */ XBT_DEBUG("No more processes to run"); - next_context = (smx_context_t) sysv_maestro_context; unsigned long worker_id = (unsigned long) xbt_os_thread_get_specific(sysv_worker_id_key); - next_stack = &sysv_workers_stacks[worker_id]; + next_context = (smx_context_t)sysv_workers_context[worker_id]; + next_stack = &((smx_ctx_sysv_t)next_context)->uc; } SIMIX_context_set_current(next_context); @@ -305,7 +307,9 @@ static void smx_ctx_sysv_resume_parallel(smx_process_t first_process) #ifdef CONTEXT_THREADS unsigned long worker_id = __sync_fetch_and_add(&sysv_threads_working, 1); xbt_os_thread_set_specific(sysv_worker_id_key, (void*) worker_id); - ucontext_t* worker_stack = &sysv_workers_stacks[worker_id]; + smx_ctx_sysv_t worker_context = (smx_ctx_sysv_t)SIMIX_context_self(); + sysv_workers_context[worker_id] = worker_context; + ucontext_t* worker_stack = &worker_context->uc; smx_context_t context = first_process->context; SIMIX_context_set_current(context); diff --git a/src/xbt/parmap.c b/src/xbt/parmap.c index 6e0a7286ac..084a113758 100644 --- a/src/xbt/parmap.c +++ b/src/xbt/parmap.c @@ -21,6 +21,7 @@ #include "xbt/dynar.h" #include "xbt/xbt_os_thread.h" #include "xbt/sysdep.h" +#include "simix/smx_private.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map"); XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing"); @@ -53,6 +54,7 @@ static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap); static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap); static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round); + /** * \brief Parallel map structure */ @@ -80,6 +82,16 @@ typedef struct s_xbt_parmap { void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */ } s_xbt_parmap_t; +/** + * \brief Thread data transmission structure + */ +typedef struct s_xbt_parmap_thread_data{ + xbt_parmap_t parmap; + int worker_id; +} s_xbt_parmap_thread_data_t; + +typedef s_xbt_parmap_thread_data_t *xbt_parmap_thread_data_t; + /** * \brief Creates a parallel map object * \param num_workers number of worker threads to create @@ -101,8 +113,12 @@ xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode) xbt_parmap_set_mode(parmap, mode); /* Create the pool of worker threads */ + xbt_parmap_thread_data_t data; for (i = 1; i < num_workers; i++) { - worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL); + data = xbt_new0(s_xbt_parmap_thread_data_t, 1); + data->parmap = parmap; + data->worker_id = i; + worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, data, NULL); xbt_os_thread_detach(worker); } return parmap; @@ -243,8 +259,11 @@ static void xbt_parmap_work(xbt_parmap_t parmap) */ static void *xbt_parmap_worker_main(void *arg) { - xbt_parmap_t parmap = (xbt_parmap_t) arg; + xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg; + xbt_parmap_t parmap = data->parmap; unsigned round = 0; + smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL); + SIMIX_context_set_current(context); XBT_DEBUG("New worker thread created"); @@ -253,15 +272,16 @@ static void *xbt_parmap_worker_main(void *arg) parmap->worker_wait_f(parmap, ++round); if (parmap->status == XBT_PARMAP_WORK) { - XBT_DEBUG("Worker got a job"); + XBT_DEBUG("Worker %d got a job", data->worker_id); xbt_parmap_work(parmap); parmap->worker_signal_f(parmap); - XBT_DEBUG("Worker has finished"); + XBT_DEBUG("Worker %d has finished", data->worker_id); /* We are destroying the parmap */ } else { + xbt_free(data); parmap->worker_signal_f(parmap); return NULL; } @@ -478,152 +498,3 @@ static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round) xbt_os_thread_yield(); } } - -#ifdef SIMGRID_TEST -#include "xbt.h" -#include "xbt/ex.h" -#include "xbt/xbt_os_thread.h" -#include "xbt/xbt_os_time.h" -#include "internal_config.h" /* HAVE_FUTEX_H */ - -XBT_TEST_SUITE("parmap", "Parallel Map"); -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit); - -#ifdef HAVE_FUTEX_H -#define TEST_PARMAP_SKIP_TEST(mode) 0 -#else -#define TEST_PARMAP_SKIP_TEST(mode) ((mode) == XBT_PARMAP_FUTEX) -#endif - -#define TEST_PARMAP_VALIDATE_MODE(mode) \ - if (TEST_PARMAP_SKIP_TEST(mode)) { xbt_test_skip(); return; } else ((void)0) - -static void fun_double(void *arg) -{ - unsigned *u = arg; - *u = 2 * *u + 1; -} - -/* Check that the computations are correctly done. */ -static void test_parmap_basic(e_xbt_parmap_mode_t mode) -{ - unsigned num_workers; - - for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) { - const unsigned len = 1033; - const unsigned num = 5; - unsigned *a; - xbt_dynar_t data; - xbt_parmap_t parmap; - unsigned i; - - xbt_test_add("Basic parmap usage (%u workers)", num_workers); - - TEST_PARMAP_VALIDATE_MODE(mode); - parmap = xbt_parmap_new(num_workers, mode); - - a = xbt_malloc(len * sizeof *a); - data = xbt_dynar_new(sizeof a, NULL); - for (i = 0; i < len; i++) { - a[i] = i; - xbt_dynar_push_as(data, void *, &a[i]); - } - - for (i = 0; i < num; i++) - xbt_parmap_apply(parmap, fun_double, data); - - for (i = 0; i < len; i++) { - unsigned expected = (1U << num) * (i + 1) - 1; - xbt_test_assert(a[i] == expected, - "a[%u]: expected %u, got %u", i, expected, a[i]); - } - - xbt_dynar_free(&data); - xbt_free(a); - xbt_parmap_destroy(parmap); - } -} - -XBT_TEST_UNIT("basic_posix", test_parmap_basic_posix, "Basic usage: posix") -{ - test_parmap_basic(XBT_PARMAP_POSIX); -} - -XBT_TEST_UNIT("basic_futex", test_parmap_basic_futex, "Basic usage: futex") -{ - test_parmap_basic(XBT_PARMAP_FUTEX); -} - -XBT_TEST_UNIT("basic_busy_wait", test_parmap_basic_busy_wait, "Basic usage: busy_wait") -{ - test_parmap_basic(XBT_PARMAP_BUSY_WAIT); -} - -static void fun_get_id(void *arg) -{ - *(uintptr_t *)arg = (uintptr_t)xbt_os_thread_self(); - xbt_os_sleep(0.5); -} - -static int fun_compare(const void *pa, const void *pb) -{ - uintptr_t a = *(uintptr_t *)pa; - uintptr_t b = *(uintptr_t *)pb; - return a < b ? -1 : a > b ? 1 : 0; -} - -/* Check that all threads are working. */ -static void test_parmap_extended(e_xbt_parmap_mode_t mode) -{ - unsigned num_workers; - - for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) { - const unsigned len = 2 * num_workers; - uintptr_t *a; - xbt_parmap_t parmap; - xbt_dynar_t data; - unsigned i; - unsigned count; - - xbt_test_add("Extended parmap usage (%u workers)", num_workers); - - TEST_PARMAP_VALIDATE_MODE(mode); - parmap = xbt_parmap_new(num_workers, mode); - - a = xbt_malloc(len * sizeof *a); - data = xbt_dynar_new(sizeof a, NULL); - for (i = 0; i < len; i++) - xbt_dynar_push_as(data, void *, &a[i]); - - xbt_parmap_apply(parmap, fun_get_id, data); - - qsort(a, len, sizeof a[0], fun_compare); - count = 1; - for (i = 1; i < len; i++) - if (a[i] != a[i - 1]) - count++; - xbt_test_assert(count == num_workers, - "only %u/%u threads did some work", count, num_workers); - - xbt_dynar_free(&data); - xbt_free(a); - xbt_parmap_destroy(parmap); - } -} - -XBT_TEST_UNIT("extended_posix", test_parmap_extended_posix, "Extended usage: posix") -{ - test_parmap_extended(XBT_PARMAP_POSIX); -} - -XBT_TEST_UNIT("extended_futex", test_parmap_extended_futex, "Extended usage: futex") -{ - test_parmap_extended(XBT_PARMAP_FUTEX); -} - -XBT_TEST_UNIT("extended_busy_wait", test_parmap_extended_busy_wait, "Extended usage: busy_wait") -{ - test_parmap_extended(XBT_PARMAP_BUSY_WAIT); -} - -#endif /* SIMGRID_TEST */ -- 2.20.1