Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Direct context switching: clean the semantics of parmap
authorChristophe Thiéry <christopho128@gmail.com>
Wed, 2 Nov 2011 11:10:40 +0000 (12:10 +0100)
committerChristophe Thiéry <christopho128@gmail.com>
Wed, 2 Nov 2011 11:11:30 +0000 (12:11 +0100)
include/xbt/parmap.h
src/simix/smx_context_sysv.c
src/xbt/parmap.c

index f661994..9412595 100644 (file)
@@ -18,9 +18,13 @@ SG_BEGIN_DECL()
 /** @addtogroup XBT_parmap
   * @brief Parallel map.
   *
-  * A function is applied to all the elements of a dynar in parallel
-  * using threads. The threads are persistent until the destruction
-  * of the parmap object.
+  * A function is applied to the n first elements of a dynar in parallel,
+  * where n is the number of threads. The threads are persistent until the
+  * destruction of the parmap object.
+  *
+  * If there are more than n elements in the dynar, the worker threads should
+  * fetch themselves remaining work with xbt_parmap_next() and execute it.
+  *
   * @{
   */
   /** \brief Queue data type (opaque type) */
@@ -28,12 +32,13 @@ SG_BEGIN_DECL()
 typedef struct s_xbt_parmap *xbt_parmap_t;
 
 XBT_PUBLIC(xbt_parmap_t) xbt_parmap_new(unsigned int num_workers);
+XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap);
 
 XBT_PUBLIC(void) xbt_parmap_apply(xbt_parmap_t parmap,
                                   void_f_pvoid_t fun,
                                   xbt_dynar_t data);
-
-XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap);
+XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap);
+XBT_PUBLIC(unsigned long) xbt_parmap_get_worker_id(xbt_parmap_t parmap);
 
 /** @} */
 
index 984be81..626c9ec 100644 (file)
@@ -10,7 +10,6 @@
 
 #include "smx_context_sysv_private.h"
 #include "xbt/parmap.h"
-#include "xbt/parmap_private.h"
 #include "simix/private.h"
 #include "gras_config.h"
 
@@ -28,10 +27,12 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context);
 
 #ifdef CONTEXT_THREADS
 static xbt_parmap_t parmap;
+static ucontext_t* smx_ctx_sysv_local_maestro_uc;    /* space to save maestro's stack in each thead */
 #endif
 static unsigned long smx_ctx_sysv_process_index = 0; /* index of the next process to run in the
-                                         * list of runnable processes */
-static smx_ctx_sysv_t maestro_context;
+                                                      * list of runnable processes */
+static smx_ctx_sysv_t smx_ctx_sysv_maestro_context;
+
 
 static smx_context_t
 smx_ctx_sysv_create_context(xbt_main_func_t code, int argc, char **argv,
@@ -41,13 +42,14 @@ static void smx_ctx_sysv_wrapper(int count, ...);
 
 static void smx_ctx_sysv_stop_serial(smx_context_t context);
 static void smx_ctx_sysv_suspend_serial(smx_context_t context);
+static void smx_ctx_sysv_resume_serial(smx_process_t first_process);
 static void smx_ctx_sysv_runall_serial(void);
 
 static void smx_ctx_sysv_stop_parallel(smx_context_t context);
 static void smx_ctx_sysv_suspend_parallel(smx_context_t context);
+static void smx_ctx_sysv_resume_parallel(smx_process_t first_process);
 static void smx_ctx_sysv_runall_parallel(void);
 
-static void smx_ctx_sysv_resume(smx_process_t first_process);
 
 /* This is a bit paranoid about SIZEOF_VOIDP not being a multiple of SIZEOF_INT,
  * but it doesn't harm. */
@@ -77,7 +79,9 @@ void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory)
 
   if (SIMIX_context_is_parallel()) {
 #ifdef CONTEXT_THREADS /* To use parallel ucontexts a thread pool is needed */
-    parmap = xbt_parmap_new(2);
+    int nthreads = SIMIX_context_get_nthreads();
+    parmap = xbt_parmap_new(nthreads);
+    smx_ctx_sysv_local_maestro_uc = xbt_new(ucontext_t, nthreads);
     (*factory)->stop = smx_ctx_sysv_stop_parallel;
     (*factory)->suspend = smx_ctx_sysv_suspend_parallel;
     (*factory)->runall = smx_ctx_sysv_runall_parallel;
@@ -97,6 +101,7 @@ int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory)
 #ifdef CONTEXT_THREADS
   if (parmap)
     xbt_parmap_destroy(parmap);
+  xbt_free(smx_ctx_sysv_local_maestro_uc);
 #endif
   return smx_ctx_base_factory_finalize(factory);
 }
@@ -143,7 +148,7 @@ smx_ctx_sysv_create_context_sized(size_t size, xbt_main_func_t code,
     makecontext(&context->uc, (void (*)())smx_ctx_sysv_wrapper,
                 CTX_ADDR_LEN, CTX_ADDR_SPLIT(ctx_addr));
   } else {
-    maestro_context = context;
+    smx_ctx_sysv_maestro_context = context;
   }
 
   return (smx_context_t) context;
@@ -208,28 +213,26 @@ static void smx_ctx_sysv_suspend_serial(smx_context_t context)
   unsigned long int i = smx_ctx_sysv_process_index++;
 
   if (i < xbt_dynar_length(simix_global->process_to_run)) {
-    XBT_DEBUG("Run next process");
-
     /* execute the next process */
+    XBT_DEBUG("Run next process");
     next_context = xbt_dynar_get_as(
         simix_global->process_to_run,i, smx_process_t)->context;
   }
   else {
     /* all processes were run, return to maestro */
     XBT_DEBUG("No more process to run");
-
-    next_context = (smx_context_t) maestro_context;
+    next_context = (smx_context_t) smx_ctx_sysv_maestro_context;
   }
   SIMIX_context_set_current(next_context);
   swapcontext(&((smx_ctx_sysv_t) context)->uc,
       &((smx_ctx_sysv_t) next_context)->uc);
 }
 
-static void smx_ctx_sysv_resume(smx_process_t first_process)
+static void smx_ctx_sysv_resume_serial(smx_process_t first_process)
 {
   smx_context_t context = first_process->context;
   SIMIX_context_set_current(context);
-  swapcontext(&maestro_context->uc,
+  swapcontext(&smx_ctx_sysv_maestro_context->uc,
       &((smx_ctx_sysv_t) context)->uc);
 }
 
@@ -241,7 +244,7 @@ static void smx_ctx_sysv_runall_serial(void)
     smx_ctx_sysv_process_index = 1;
 
     /* execute the first process */
-    smx_ctx_sysv_resume(first_process);
+    smx_ctx_sysv_resume_serial(first_process);
   }
 }
 
@@ -255,30 +258,45 @@ static void smx_ctx_sysv_suspend_parallel(smx_context_t context)
 {
 #ifdef CONTEXT_THREADS
   /* determine the next context */
+  smx_process_t next_work = xbt_parmap_next(parmap);
   smx_context_t next_context;
-  unsigned long int i = __sync_fetch_and_add(&parmap->index, 1);
+  ucontext_t* next_uc;
 
-  if (i < xbt_dynar_length(simix_global->process_to_run)) {
-    /* execute the next process */
+  if (next_work != NULL) {
+    /* there is a next process to resume */
     XBT_DEBUG("Run next process");
-    next_context = xbt_dynar_get_as(simix_global->process_to_run, i, smx_process_t)->context;
+    next_context = next_work->context;
+    next_uc = &((smx_ctx_sysv_t) next_context)->uc;
   }
   else {
     /* all processes were run, go to the barrier */
     XBT_DEBUG("No more processes to run");
-    next_context = (smx_context_t) maestro_context;
+    next_context = (smx_context_t) smx_ctx_sysv_maestro_context;
+    unsigned long worker_id = (unsigned long) xbt_os_thread_get_extra_data();
+    next_uc = &smx_ctx_sysv_local_maestro_uc[worker_id];
   }
 
   SIMIX_context_set_current(next_context);
-  swapcontext(&((smx_ctx_sysv_t) context)->uc,
-      &((smx_ctx_sysv_t) next_context)->uc);
+  swapcontext(&((smx_ctx_sysv_t) context)->uc, next_uc);
+#endif
+}
+
+static void smx_ctx_sysv_resume_parallel(smx_process_t first_process)
+{
+#ifdef CONTEXT_THREADS
+  unsigned long worker_id = (unsigned long) xbt_os_thread_get_extra_data();
+  ucontext_t* local_maestro_uc = &smx_ctx_sysv_local_maestro_uc[worker_id];
+
+  smx_context_t context = first_process->context;
+  SIMIX_context_set_current(context);
+  swapcontext(local_maestro_uc, &((smx_ctx_sysv_t) context)->uc);
 #endif
 }
 
 static void smx_ctx_sysv_runall_parallel(void)
 {
 #ifdef CONTEXT_THREADS
-  xbt_parmap_apply(parmap, (void_f_pvoid_t) smx_ctx_sysv_resume,
+  xbt_parmap_apply(parmap, (void_f_pvoid_t) smx_ctx_sysv_resume_parallel,
       simix_global->process_to_run);
 #endif
 }
index dccc15d..19dcb58 100644 (file)
@@ -55,7 +55,6 @@ xbt_parmap_t xbt_parmap_new(unsigned int num_workers)
 
 void xbt_parmap_destroy(xbt_parmap_t parmap)
 { 
-  XBT_DEBUG("Destroy parmap %p", parmap);
   parmap->status = PARMAP_DESTROY;
 #ifdef HAVE_FUTEX_H
   xbt_event_signal(parmap->sync_event);
@@ -76,14 +75,27 @@ void xbt_parmap_destroy(xbt_parmap_t parmap)
   XBT_DEBUG("Job done");
 }
 
+void* xbt_parmap_next(xbt_parmap_t parmap) {
+
+  unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
+  if (index < xbt_dynar_length(parmap->data)) {
+    return xbt_dynar_get_as(parmap->data, index, void*);
+  }
+  return NULL;
+}
+
+unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap) {
+  return (unsigned long) xbt_os_thread_get_extra_data();
+}
+
 static void *_xbt_parmap_worker_main(void *arg)
 {
   unsigned int worker_id;
-  xbt_parmap_t parmap = (xbt_parmap_t)arg;
+  xbt_parmap_t parmap = (xbt_parmap_t) arg;
 
   /* Fetch a worker id */
   worker_id = __sync_fetch_and_add(&parmap->workers_max_id, 1);
-  xbt_os_thread_set_extra_data((void *)(unsigned long)worker_id);
+  xbt_os_thread_set_extra_data((void*) (unsigned long) worker_id);
 
   XBT_DEBUG("New worker thread created (%u)", worker_id);
   
@@ -96,9 +108,9 @@ static void *_xbt_parmap_worker_main(void *arg)
 
       XBT_DEBUG("Worker %u got a job", worker_id);
 
-      unsigned int i = __sync_fetch_and_add(&parmap->index, 1);
-      if (i < xbt_dynar_length(parmap->data)) {
-        parmap->fun(xbt_dynar_get_as(parmap->data, i, void*));
+      void* work = xbt_parmap_next(parmap);
+      if (work != NULL) {
+        parmap->fun(work);
       }
 
       XBT_DEBUG("Worker %u has finished", worker_id);