Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add of context creation in parmap.
authorGuillaume Serrière <guillaume.serriere@esial.net>
Tue, 14 May 2013 14:59:06 +0000 (16:59 +0200)
committerGuillaume Serrière <guillaume.serriere@esial.net>
Thu, 16 May 2013 14:08:13 +0000 (16:08 +0200)
Remove parmap from unitTesting because we need a context factory
for parmap creation now.

Signed-off-by: Guillaume Serrière <guillaume.serriere@esial.net>
buildtools/Cmake/UnitTesting.cmake
src/simix/smx_context.c
src/simix/smx_context_raw.c
src/simix/smx_context_sysv.c
src/xbt/parmap.c

index 4351f1b..7800769 100644 (file)
@@ -12,7 +12,6 @@ set(TEST_CFILES
   src/xbt/xbt_strbuff.c
   src/xbt/xbt_sha.c
   src/xbt/config.c
-  src/xbt/parmap.c
   )
 set(TEST_UNITS
   ${CMAKE_CURRENT_BINARY_DIR}/src/cunit_unit.c
@@ -25,7 +24,6 @@ set(TEST_UNITS
   ${CMAKE_CURRENT_BINARY_DIR}/src/xbt_strbuff_unit.c
   ${CMAKE_CURRENT_BINARY_DIR}/src/xbt_sha_unit.c
   ${CMAKE_CURRENT_BINARY_DIR}/src/config_unit.c
-  ${CMAKE_CURRENT_BINARY_DIR}/src/parmap_unit.c
 
   ${CMAKE_CURRENT_BINARY_DIR}/src/simgrid_units_main.c
   )
index ba6d03f..64891fd 100644 (file)
@@ -36,6 +36,11 @@ static e_xbt_parmap_mode_t smx_parallel_synchronization_mode = XBT_PARMAP_DEFAUL
  */
 void SIMIX_context_mod_init(void)
 {
+#if defined(CONTEXT_THREADS) && !defined(HAVE_THREAD_LOCAL_STORAGE)
+  /* the __thread storage class is not available on this platform:
+   * use getspecific/setspecific instead to store the current context in each thread */
+  xbt_os_thread_key_create(&smx_current_context_key);
+#endif
   if (!simix_global->context_factory) {
     /* select the context factory to use to create the contexts */
     if (smx_factory_initializer_to_use) {
@@ -86,12 +91,6 @@ void SIMIX_context_mod_init(void)
       }
     }
   }
-
-#if defined(CONTEXT_THREADS) && !defined(HAVE_THREAD_LOCAL_STORAGE)
-  /* the __thread storage class is not available on this platform:
-   * use getspecific/setspecific instead to store the current context in each thread */
-  xbt_os_thread_key_create(&smx_current_context_key);
-#endif
 }
 
 /**
@@ -138,7 +137,6 @@ XBT_INLINE int SIMIX_context_get_nthreads(void) {
  * \param nb_threads the number of threads to use
  */
 XBT_INLINE void SIMIX_context_set_nthreads(int nb_threads) {
-
   if (nb_threads<=0) {  
      nb_threads = xbt_os_get_numcores();
      XBT_INFO("Auto-setting contexts/nthreads to %d",nb_threads);
@@ -149,7 +147,6 @@ XBT_INLINE void SIMIX_context_set_nthreads(int nb_threads) {
     THROWF(arg_error, 0, "The thread factory cannot be run in parallel");
 #endif
   }
   smx_parallel_contexts = nb_threads;
 }
 
index fd9058e..f2f0544 100644 (file)
@@ -31,7 +31,7 @@ typedef struct s_smx_ctx_raw {
 
 #ifdef CONTEXT_THREADS
 static xbt_parmap_t raw_parmap;
-static raw_stack_t* raw_workers_stacks;       /* space to save the worker stack in each thread */
+static smx_ctx_raw_t* raw_workers_context;    /* space to save the worker context in each thread */
 static unsigned long raw_threads_working;     /* number of threads that have started their work */
 static xbt_os_thread_key_t raw_worker_id_key; /* thread-specific storage for the thread id */
 #endif
@@ -241,9 +241,11 @@ void SIMIX_ctx_raw_factory_init(smx_context_factory_t *factory)
   if (SIMIX_context_is_parallel()) {
 #ifdef CONTEXT_THREADS
     int nthreads = SIMIX_context_get_nthreads();
-    raw_parmap = xbt_parmap_new(nthreads, SIMIX_context_get_parallel_mode());
-    raw_workers_stacks = xbt_new(raw_stack_t, nthreads);
     xbt_os_thread_key_create(&raw_worker_id_key);
+    raw_parmap = xbt_parmap_new(nthreads, SIMIX_context_get_parallel_mode());
+    raw_workers_context = xbt_new(smx_ctx_raw_t, nthreads);
+    raw_maestro_context=NULL;
+
 #endif
     if (SIMIX_context_get_parallel_threshold() > 1) {
       /* choose dynamically */
@@ -280,7 +282,7 @@ static int smx_ctx_raw_factory_finalize(smx_context_factory_t *factory)
 #ifdef CONTEXT_THREADS
   if (raw_parmap)
     xbt_parmap_destroy(raw_parmap);
-  xbt_free(raw_workers_stacks);
+  xbt_free(raw_workers_context);
 #endif
   return smx_ctx_base_factory_finalize(factory);
 }
@@ -325,7 +327,8 @@ smx_ctx_raw_create_context(xbt_main_func_t code, int argc, char **argv,
 #endif                          /* HAVE_VALGRIND_VALGRIND_H */
 
      } else {
-       raw_maestro_context = context;
+       if(data != NULL && raw_maestro_context==NULL)
+         raw_maestro_context = context;
 
        if(MC_is_active())
          MC_ignore_heap(&(raw_maestro_context->stack_top), sizeof(raw_maestro_context->stack_top));
@@ -521,12 +524,12 @@ static void smx_ctx_raw_suspend_parallel(smx_context_t context)
   else {
     /* all processes were run, go to the barrier */
     XBT_DEBUG("No more processes to run");
-    next_context = (smx_context_t) raw_maestro_context;
     unsigned long worker_id =
         (unsigned long) xbt_os_thread_get_specific(raw_worker_id_key);
+    next_context = (smx_context_t)raw_workers_context[worker_id];
     XBT_DEBUG("Restoring worker stack %lu (working threads = %lu)",
         worker_id, raw_threads_working);
-    next_stack = raw_workers_stacks[worker_id];
+    next_stack = ((smx_ctx_raw_t)next_context)->stack_top;
   }
 
   SIMIX_context_set_current(next_context);
@@ -544,8 +547,11 @@ static void smx_ctx_raw_resume_parallel(smx_process_t first_process)
 #ifdef CONTEXT_THREADS
   unsigned long worker_id = __sync_fetch_and_add(&raw_threads_working, 1);
   xbt_os_thread_set_specific(raw_worker_id_key, (void*) worker_id);
+  smx_ctx_raw_t worker_context = (smx_ctx_raw_t)SIMIX_context_self();
+  raw_workers_context[worker_id] = worker_context;
   XBT_DEBUG("Saving worker stack %lu", worker_id);
-  raw_stack_t* worker_stack = &raw_workers_stacks[worker_id];
+  raw_stack_t* worker_stack = &(worker_context)->stack_top;
+
 
   smx_context_t context = first_process->context;
   SIMIX_context_set_current(context);
index 90e6b84..6a19200 100644 (file)
@@ -37,7 +37,7 @@ typedef struct s_smx_ctx_sysv {
 
 #ifdef CONTEXT_THREADS
 static xbt_parmap_t sysv_parmap;
-static ucontext_t* sysv_workers_stacks;        /* space to save the worker's stack in each thread */
+static smx_ctx_sysv_t* sysv_workers_context;   /* space to save the worker's context in each thread */
 static unsigned long sysv_threads_working;     /* number of threads that have started their work */
 static xbt_os_thread_key_t sysv_worker_id_key; /* thread-specific storage for the thread id */
 #endif
@@ -90,7 +90,8 @@ void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory)
 #ifdef CONTEXT_THREADS  /* To use parallel ucontexts a thread pool is needed */
     int nthreads = SIMIX_context_get_nthreads();
     sysv_parmap = xbt_parmap_new(nthreads, SIMIX_context_get_parallel_mode());
-    sysv_workers_stacks = xbt_new(ucontext_t, nthreads);
+    sysv_workers_context = xbt_new(smx_ctx_sysv_t, nthreads);
+    sysv_maestro_context = NULL;
     xbt_os_thread_key_create(&sysv_worker_id_key);
     (*factory)->stop = smx_ctx_sysv_stop_parallel;
     (*factory)->suspend = smx_ctx_sysv_suspend_parallel;
@@ -110,7 +111,7 @@ static int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory)
 #ifdef CONTEXT_THREADS
   if (sysv_parmap)
     xbt_parmap_destroy(sysv_parmap);
-  xbt_free(sysv_workers_stacks);
+  xbt_free(sysv_workers_context);
 #endif
   return smx_ctx_base_factory_finalize(factory);
 }
@@ -165,7 +166,8 @@ smx_ctx_sysv_create_context_sized(size_t size, xbt_main_func_t code,
               sizeof(smx_ctx_sysv_t), sizeof(int), CTX_ADDR_LEN);
     }
   } else {
-    sysv_maestro_context = context;
+    if(data != NULL && sysv_maestro_context == NULL)
+      sysv_maestro_context = context;
   }
 
   if(MC_is_active() && code)
@@ -289,10 +291,10 @@ static void smx_ctx_sysv_suspend_parallel(smx_context_t context)
   else {
     /* all processes were run, go to the barrier */
     XBT_DEBUG("No more processes to run");
-    next_context = (smx_context_t) sysv_maestro_context;
     unsigned long worker_id =
         (unsigned long) xbt_os_thread_get_specific(sysv_worker_id_key);
-    next_stack = &sysv_workers_stacks[worker_id];
+    next_context = (smx_context_t)sysv_workers_context[worker_id];
+    next_stack = &((smx_ctx_sysv_t)next_context)->uc;
   }
 
   SIMIX_context_set_current(next_context);
@@ -305,7 +307,9 @@ static void smx_ctx_sysv_resume_parallel(smx_process_t first_process)
 #ifdef CONTEXT_THREADS
   unsigned long worker_id = __sync_fetch_and_add(&sysv_threads_working, 1);
   xbt_os_thread_set_specific(sysv_worker_id_key, (void*) worker_id);
-  ucontext_t* worker_stack = &sysv_workers_stacks[worker_id];
+  smx_ctx_sysv_t worker_context = (smx_ctx_sysv_t)SIMIX_context_self();
+  sysv_workers_context[worker_id] = worker_context;
+  ucontext_t* worker_stack = &worker_context->uc;
 
   smx_context_t context = first_process->context;
   SIMIX_context_set_current(context);
index 6e0a728..084a113 100644 (file)
@@ -21,6 +21,7 @@
 #include "xbt/dynar.h"
 #include "xbt/xbt_os_thread.h"
 #include "xbt/sysdep.h"
+#include "simix/smx_private.h"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
 XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing");
@@ -53,6 +54,7 @@ static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
 
+
 /**
  * \brief Parallel map structure
  */
@@ -80,6 +82,16 @@ typedef struct s_xbt_parmap {
   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
 } s_xbt_parmap_t;
 
+/**
+ * \brief Thread data transmission structure
+ */
+typedef struct s_xbt_parmap_thread_data{
+  xbt_parmap_t parmap;
+  int worker_id;
+} s_xbt_parmap_thread_data_t;
+
+typedef s_xbt_parmap_thread_data_t *xbt_parmap_thread_data_t;
+
 /**
  * \brief Creates a parallel map object
  * \param num_workers number of worker threads to create
@@ -101,8 +113,12 @@ xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
   xbt_parmap_set_mode(parmap, mode);
 
   /* Create the pool of worker threads */
+  xbt_parmap_thread_data_t data;
   for (i = 1; i < num_workers; i++) {
-    worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
+    data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
+    data->parmap = parmap;
+    data->worker_id = i;
+    worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, data, NULL);
     xbt_os_thread_detach(worker);
   }
   return parmap;
@@ -243,8 +259,11 @@ static void xbt_parmap_work(xbt_parmap_t parmap)
  */
 static void *xbt_parmap_worker_main(void *arg)
 {
-  xbt_parmap_t parmap = (xbt_parmap_t) arg;
+  xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
+  xbt_parmap_t parmap = data->parmap;
   unsigned round = 0;
+  smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL);
+  SIMIX_context_set_current(context);
 
   XBT_DEBUG("New worker thread created");
 
@@ -253,15 +272,16 @@ static void *xbt_parmap_worker_main(void *arg)
     parmap->worker_wait_f(parmap, ++round);
     if (parmap->status == XBT_PARMAP_WORK) {
 
-      XBT_DEBUG("Worker got a job");
+      XBT_DEBUG("Worker %d got a job", data->worker_id);
 
       xbt_parmap_work(parmap);
       parmap->worker_signal_f(parmap);
 
-      XBT_DEBUG("Worker has finished");
+      XBT_DEBUG("Worker %d has finished", data->worker_id);
 
     /* We are destroying the parmap */
     } else {
+      xbt_free(data);
       parmap->worker_signal_f(parmap);
       return NULL;
     }
@@ -478,152 +498,3 @@ static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
     xbt_os_thread_yield();
   }
 }
-
-#ifdef SIMGRID_TEST
-#include "xbt.h"
-#include "xbt/ex.h"
-#include "xbt/xbt_os_thread.h"
-#include "xbt/xbt_os_time.h"
-#include "internal_config.h"        /* HAVE_FUTEX_H */
-
-XBT_TEST_SUITE("parmap", "Parallel Map");
-XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit);
-
-#ifdef HAVE_FUTEX_H
-#define TEST_PARMAP_SKIP_TEST(mode) 0
-#else
-#define TEST_PARMAP_SKIP_TEST(mode) ((mode) == XBT_PARMAP_FUTEX)
-#endif
-
-#define TEST_PARMAP_VALIDATE_MODE(mode) \
-  if (TEST_PARMAP_SKIP_TEST(mode)) { xbt_test_skip(); return; } else ((void)0)
-
-static void fun_double(void *arg)
-{
-  unsigned *u = arg;
-  *u = 2 * *u + 1;
-}
-
-/* Check that the computations are correctly done. */
-static void test_parmap_basic(e_xbt_parmap_mode_t mode)
-{
-  unsigned num_workers;
-
-  for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
-    const unsigned len = 1033;
-    const unsigned num = 5;
-    unsigned *a;
-    xbt_dynar_t data;
-    xbt_parmap_t parmap;
-    unsigned i;
-
-    xbt_test_add("Basic parmap usage (%u workers)", num_workers);
-
-    TEST_PARMAP_VALIDATE_MODE(mode);
-    parmap = xbt_parmap_new(num_workers, mode);
-
-    a = xbt_malloc(len * sizeof *a);
-    data = xbt_dynar_new(sizeof a, NULL);
-    for (i = 0; i < len; i++) {
-      a[i] = i;
-      xbt_dynar_push_as(data, void *, &a[i]);
-    }
-
-    for (i = 0; i < num; i++)
-      xbt_parmap_apply(parmap, fun_double, data);
-
-    for (i = 0; i < len; i++) {
-      unsigned expected = (1U << num) * (i + 1) - 1;
-      xbt_test_assert(a[i] == expected,
-                      "a[%u]: expected %u, got %u", i, expected, a[i]);
-    }
-
-    xbt_dynar_free(&data);
-    xbt_free(a);
-    xbt_parmap_destroy(parmap);
-  }
-}
-
-XBT_TEST_UNIT("basic_posix", test_parmap_basic_posix, "Basic usage: posix")
-{
-  test_parmap_basic(XBT_PARMAP_POSIX);
-}
-
-XBT_TEST_UNIT("basic_futex", test_parmap_basic_futex, "Basic usage: futex")
-{
-  test_parmap_basic(XBT_PARMAP_FUTEX);
-}
-
-XBT_TEST_UNIT("basic_busy_wait", test_parmap_basic_busy_wait, "Basic usage: busy_wait")
-{
-  test_parmap_basic(XBT_PARMAP_BUSY_WAIT);
-}
-
-static void fun_get_id(void *arg)
-{
-  *(uintptr_t *)arg = (uintptr_t)xbt_os_thread_self();
-  xbt_os_sleep(0.5);
-}
-
-static int fun_compare(const void *pa, const void *pb)
-{
-  uintptr_t a = *(uintptr_t *)pa;
-  uintptr_t b = *(uintptr_t *)pb;
-  return a < b ? -1 : a > b ? 1 : 0;
-}
-
-/* Check that all threads are working. */
-static void test_parmap_extended(e_xbt_parmap_mode_t mode)
-{
-  unsigned num_workers;
-
-  for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
-    const unsigned len = 2 * num_workers;
-    uintptr_t *a;
-    xbt_parmap_t parmap;
-    xbt_dynar_t data;
-    unsigned i;
-    unsigned count;
-
-    xbt_test_add("Extended parmap usage (%u workers)", num_workers);
-
-    TEST_PARMAP_VALIDATE_MODE(mode);
-    parmap = xbt_parmap_new(num_workers, mode);
-
-    a = xbt_malloc(len * sizeof *a);
-    data = xbt_dynar_new(sizeof a, NULL);
-    for (i = 0; i < len; i++)
-      xbt_dynar_push_as(data, void *, &a[i]);
-
-    xbt_parmap_apply(parmap, fun_get_id, data);
-
-    qsort(a, len, sizeof a[0], fun_compare);
-    count = 1;
-    for (i = 1; i < len; i++)
-      if (a[i] != a[i - 1])
-        count++;
-    xbt_test_assert(count == num_workers,
-                    "only %u/%u threads did some work", count, num_workers);
-
-    xbt_dynar_free(&data);
-    xbt_free(a);
-    xbt_parmap_destroy(parmap);
-  }
-}
-
-XBT_TEST_UNIT("extended_posix", test_parmap_extended_posix, "Extended usage: posix")
-{
-  test_parmap_extended(XBT_PARMAP_POSIX);
-}
-
-XBT_TEST_UNIT("extended_futex", test_parmap_extended_futex, "Extended usage: futex")
-{
-  test_parmap_extended(XBT_PARMAP_FUTEX);
-}
-
-XBT_TEST_UNIT("extended_busy_wait", test_parmap_extended_busy_wait, "Extended usage: busy_wait")
-{
-  test_parmap_extended(XBT_PARMAP_BUSY_WAIT);
-}
-
-#endif /* SIMGRID_TEST */