Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Implement stride for parmap_apply.
authorArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Wed, 15 Feb 2012 20:38:26 +0000 (21:38 +0100)
committerArnaud Giersch <arnaud.giersch@iut-bm.univ-fcomte.fr>
Wed, 8 Jan 2014 10:32:29 +0000 (11:32 +0100)
include/xbt/parmap.h
src/simix/smx_context_raw.c
src/simix/smx_context_sysv.c
src/surf/surf.c
src/xbt/parmap.c
teshsuite/xbt/parmap_test.c
testsuite/xbt/parmap_bench.c

index 624d278..f575149 100644 (file)
@@ -49,8 +49,10 @@ XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap);
 
 XBT_PUBLIC(void) xbt_parmap_apply(xbt_parmap_t parmap,
                                   void_f_pvoid_t fun,
-                                  xbt_dynar_t data);
-XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap);
+                                  xbt_dynar_t data,
+                                  unsigned stride,
+                                  int once);
+XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap, unsigned *index);
 
 #ifdef HAVE_MC
 XBT_PUBLIC(xbt_parmap_t) xbt_parmap_mc_new(unsigned int num_workers,
index 829f47e..44a1278 100644 (file)
@@ -21,6 +21,7 @@ typedef struct s_smx_ctx_raw {
   s_smx_ctx_base_t super;         /* Fields of super implementation */
   char *malloced_stack;           /* malloced area containing the stack */
   raw_stack_t stack_top;          /* pointer to stack top (within previous area) */
+  unsigned int parmap_index;
 #ifdef HAVE_VALGRIND_VALGRIND_H
   unsigned int valgrind_stack_id; /* the valgrind stack id */
 #endif
@@ -512,15 +513,19 @@ static void smx_ctx_raw_suspend_parallel(smx_context_t context)
 {
 #ifdef CONTEXT_THREADS
   /* determine the next context */
-  smx_process_t next_work = xbt_parmap_next(raw_parmap);
+  smx_process_t next_work;
   smx_context_t next_context;
   raw_stack_t next_stack;
 
+  next_work = xbt_parmap_next(raw_parmap,
+                              &((smx_ctx_raw_t) context)->parmap_index);
   if (next_work != NULL) {
     /* there is a next process to resume */
     XBT_DEBUG("Run next process");
     next_context = next_work->context;
     next_stack = ((smx_ctx_raw_t) next_context)->stack_top;
+    ((smx_ctx_raw_t) next_context)->parmap_index =
+      ((smx_ctx_raw_t) context)->parmap_index;
   }
   else {
     /* all processes were run, go to the barrier */
@@ -554,11 +559,15 @@ static void smx_ctx_raw_resume_parallel(smx_process_t first_process)
   raw_workers_context[worker_id] = worker_context;
   XBT_DEBUG("Saving worker stack %lu", worker_id);
   raw_stack_t* worker_stack = &(worker_context)->stack_top;
-
-
-  smx_context_t context = first_process->context;
-  SIMIX_context_set_current(context);
-  raw_swapcontext(worker_stack, ((smx_ctx_raw_t) context)->stack_top);
+  unsigned int index = 0;
+
+  first_process = xbt_parmap_next(raw_parmap, &index);
+  if (first_process != NULL) {
+    smx_context_t context = first_process->context;
+    ((smx_ctx_raw_t) context)->parmap_index = index;
+    SIMIX_context_set_current(context);
+    raw_swapcontext(worker_stack, ((smx_ctx_raw_t) context)->stack_top);
+  }
 #endif
 }
 
@@ -570,7 +579,8 @@ static void smx_ctx_raw_runall_parallel(void)
 #ifdef CONTEXT_THREADS
   raw_threads_working = 0;
   xbt_parmap_apply(raw_parmap, (void_f_pvoid_t) smx_ctx_raw_resume_parallel,
-      simix_global->process_to_run);
+                   simix_global->process_to_run,
+                   1 + SIMIX_context_get_parallel_threshold() / SIMIX_context_get_nthreads(), 1);
 #else
   xbt_die("You asked for a parallel execution, but you don't have any threads.");
 #endif
index 3af7b3c..25ef8ec 100644 (file)
@@ -29,6 +29,7 @@ XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context);
 typedef struct s_smx_ctx_sysv {
   s_smx_ctx_base_t super;       /* Fields of super implementation */
   ucontext_t uc;                /* the ucontext that executes the code */
+  unsigned int parmap_index;
 #ifdef HAVE_VALGRIND_VALGRIND_H
   unsigned int valgrind_stack_id;       /* the valgrind stack id */
 #endif
@@ -278,15 +279,19 @@ static void smx_ctx_sysv_suspend_parallel(smx_context_t context)
 {
 #ifdef CONTEXT_THREADS
   /* determine the next context */
-  smx_process_t next_work = xbt_parmap_next(sysv_parmap);
+  smx_process_t next_work;
   smx_context_t next_context;
   ucontext_t* next_stack;
 
+  next_work = xbt_parmap_next(sysv_parmap,
+                              &((smx_ctx_sysv_t) context)->parmap_index);
   if (next_work != NULL) {
     /* there is a next process to resume */
     XBT_DEBUG("Run next process");
     next_context = next_work->context;
     next_stack = &((smx_ctx_sysv_t) next_context)->uc;
+    ((smx_ctx_sysv_t) next_context)->parmap_index =
+      ((smx_ctx_sysv_t) context)->parmap_index;
   }
   else {
     /* all processes were run, go to the barrier */
@@ -310,10 +315,15 @@ static void smx_ctx_sysv_resume_parallel(smx_process_t first_process)
   smx_ctx_sysv_t worker_context = (smx_ctx_sysv_t)SIMIX_context_self();
   sysv_workers_context[worker_id] = worker_context;
   ucontext_t* worker_stack = &worker_context->uc;
-
-  smx_context_t context = first_process->context;
-  SIMIX_context_set_current(context);
-  swapcontext(worker_stack, &((smx_ctx_sysv_t) context)->uc);
+  unsigned int index = 0;
+
+  first_process = xbt_parmap_next(sysv_parmap, &index);
+  if (first_process != NULL) {
+    smx_context_t context = first_process->context;
+    ((smx_ctx_sysv_t) context)->parmap_index = index;
+    SIMIX_context_set_current(context);
+    swapcontext(worker_stack, &((smx_ctx_sysv_t) context)->uc);
+  }
 #endif
 }
 
@@ -322,6 +332,7 @@ static void smx_ctx_sysv_runall_parallel(void)
 #ifdef CONTEXT_THREADS
   sysv_threads_working = 0;
   xbt_parmap_apply(sysv_parmap, (void_f_pvoid_t) smx_ctx_sysv_resume_parallel,
-      simix_global->process_to_run);
+                   simix_global->process_to_run,
+                   1 + SIMIX_context_get_parallel_threshold() / SIMIX_context_get_nthreads(), 1);
 #endif
 }
index 72a7542..d32f9e5 100644 (file)
@@ -593,7 +593,7 @@ double surf_solve(double max_date)
   if (surf_get_nthreads() > 1 && surf_do_par()) {
     /* parallel version */
 #ifdef CONTEXT_THREADS
-    xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_share_resources, model_list);
+    xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_share_resources, model_list, 1, 0);
 #else
     xbt_die("Asked to run in parallel, but no thread at hand...");
 #endif
@@ -684,7 +684,7 @@ double surf_solve(double max_date)
   if (surf_get_nthreads() > 1 && surf_do_par()) {
     /* parallel version */
 #ifdef CONTEXT_THREADS
-    xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_update_actions_state, model_list);
+    xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_update_actions_state, model_list, 1, 0);
 #endif
   }
   else {
index 3e56435..fd685b0 100644 (file)
@@ -71,6 +71,8 @@ typedef struct s_xbt_parmap {
   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
   unsigned int index;              /**< index of the next element of data to pick */
+  unsigned stride;
+  int apply_once;
 
 #ifdef HAVE_MC
   int finish;
@@ -269,12 +271,15 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
  * \param fun the function to call in parallel
  * \param data each element of this dynar will be passed as an argument to fun
  */
-void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
+void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun,
+                      xbt_dynar_t data, unsigned stride, int once)
 {
   /* Assign resources to worker threads */
   parmap->fun = fun;
   parmap->data = data;
   parmap->index = 0;
+  parmap->stride = stride;
+  parmap->apply_once = once;
   parmap->master_signal_f(parmap);
   xbt_parmap_work(parmap);
   parmap->master_wait_f(parmap);
@@ -288,21 +293,37 @@ void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
  *
  * \return the next task to process, or NULL if there is no more work
  */
-void* xbt_parmap_next(xbt_parmap_t parmap)
+void* xbt_parmap_next(xbt_parmap_t parmap, unsigned *index)
 {
-  unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
-  if (index < xbt_dynar_length(parmap->data)) {
-    return xbt_dynar_get_as(parmap->data, index, void*);
-  }
-  return NULL;
+  unsigned i = *index;
+  if (i % parmap->stride == 0)
+    i = __sync_fetch_and_add(&parmap->index, parmap->stride);
+  *index = i + 1;
+  return  i < xbt_dynar_length(parmap->data) ?
+    xbt_dynar_get_as(parmap->data, i, void*) : NULL;
 }
 
 static void xbt_parmap_work(xbt_parmap_t parmap)
 {
-  unsigned index;
-  while ((index = __sync_fetch_and_add(&parmap->index, 1))
-         < xbt_dynar_length(parmap->data))
-    parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
+  if (parmap->apply_once) {
+    parmap->fun(NULL);
+  } else {
+    int more = 1;
+    unsigned index;
+    unsigned next;
+    do {
+      index = __sync_fetch_and_add(&parmap->index, parmap->stride);
+      next = index + parmap->stride;
+      if (next >= xbt_dynar_length(parmap->data)) {
+        next = xbt_dynar_length(parmap->data);
+        more = 0;
+      }
+      while (index < next) {
+        parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
+        index++;
+      }
+    } while (more);
+  }
 }
 
 /**
index 52c4050..bc687da 100644 (file)
@@ -42,7 +42,7 @@ static int test_parmap_basic(e_xbt_parmap_mode_t mode)
     }
 
     for (i = 0; i < num; i++)
-      xbt_parmap_apply(parmap, fun_double, data);
+      xbt_parmap_apply(parmap, fun_double, data, 10, 0);
 
     for (i = 0; i < len; i++) {
       unsigned expected = (1U << num) * (i + 1) - 1;
@@ -94,7 +94,7 @@ static int test_parmap_extended(e_xbt_parmap_mode_t mode)
     for (i = 0; i < len; i++)
       xbt_dynar_push_as(data, void *, &a[i]);
 
-    xbt_parmap_apply(parmap, fun_get_id, data);
+    xbt_parmap_apply(parmap, fun_get_id, data, 1, 0);
 
     qsort(a, len, sizeof a[0], fun_compare);
     count = 1;
index e95d77d..1ce420a 100644 (file)
@@ -82,7 +82,7 @@ static void array_new(unsigned **a, xbt_dynar_t *data)
   }
 }
 
-static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode)
+static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode, unsigned stride)
 {
   unsigned *a;
   xbt_dynar_t data;
@@ -102,7 +102,7 @@ static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode)
   start_time = xbt_os_time();
   do {
     parmap = xbt_parmap_new(nthreads, mode);
-    xbt_parmap_apply(parmap, fun_to_apply, data);
+    xbt_parmap_apply(parmap, fun_to_apply, data, stride, 0);
     xbt_parmap_destroy(parmap);
     elapsed_time = xbt_os_time() - start_time;
     i++;
@@ -115,7 +115,7 @@ static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode)
   xbt_free(a);
 }
 
-static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode)
+static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode, unsigned stride)
 {
   unsigned *a;
   xbt_dynar_t data;
@@ -135,7 +135,7 @@ static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode)
   i = 0;
   start_time = xbt_os_time();
   do {
-    xbt_parmap_apply(parmap, fun_to_apply, data);
+    xbt_parmap_apply(parmap, fun_to_apply, data, stride, 0);
     elapsed_time = xbt_os_time() - start_time;
     i++;
   } while (elapsed_time < TIMEOUT);
@@ -148,8 +148,8 @@ static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode)
   xbt_free(a);
 }
 
-static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t),
-                            int nthreads, unsigned modes)
+static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t, unsigned),
+                            int nthreads, unsigned modes, unsigned stride)
 {
   e_xbt_parmap_mode_t all_modes[] = {
     XBT_PARMAP_POSIX, XBT_PARMAP_FUTEX,
@@ -158,7 +158,7 @@ static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t),
   unsigned i;
   for (i = 0 ; i < sizeof all_modes / sizeof all_modes[0] ; i++) {
     if (1U << i & modes)
-      bench_fun(nthreads, all_modes[i]);
+      bench_fun(nthreads, all_modes[i], stride);
   }
 }
 
@@ -166,14 +166,16 @@ int main(int argc, char *argv[])
 {
   int nthreads;
   unsigned modes = MODES_DEFAULT;
+  unsigned stride = 1;
 
   SIMIX_global_init(&argc, argv);
 
-  if (argc != 2 && argc != 3) {
+  if (argc < 2 || argc > 4) {
     fprintf(stderr,
-            "Usage: %s nthreads [modes]\n"
+            "Usage: %s nthreads [modes [stride]]\n"
             "    nthreads - number of working threads\n"
-            "    modes    - bitmask of modes to test\n",
+            "    modes    - bitmask of modes to test\n"
+            "    stride   - parmap stride\n",
             argv[0]);
     return EXIT_FAILURE;
   }
@@ -182,30 +184,32 @@ int main(int argc, char *argv[])
     fprintf(stderr, "ERROR: invalid thread count: %d\n", nthreads);
     return EXIT_FAILURE;
   }
-  if (argc == 3)
+  if (argc >= 3)
     modes = strtol(argv[2], NULL, 0);
+  if (argc >= 4)
+    stride = atoi(argv[3]);
 
-  printf("Parmap benchmark with %d workers (modes = %#x)...\n\n",
-         nthreads, modes);
+  printf("Parmap benchmark with %d workers (modes = %#x; stride = %d)...\n\n",
+         nthreads, modes, stride);
 
   fun_to_apply = fun_small_comp;
 
   printf("Benchmark for parmap create+apply+destroy (small comp):\n");
-  bench_all_modes(bench_parmap_full, nthreads, modes);
+  bench_all_modes(bench_parmap_full, nthreads, modes, stride);
   printf("\n");
 
   printf("Benchmark for parmap apply only (small comp):\n");
-  bench_all_modes(bench_parmap_apply, nthreads, modes);
+  bench_all_modes(bench_parmap_apply, nthreads, modes, stride);
   printf("\n");
 
   fun_to_apply = fun_big_comp;
 
   printf("Benchmark for parmap create+apply+destroy (big comp):\n");
-  bench_all_modes(bench_parmap_full, nthreads, modes);
+  bench_all_modes(bench_parmap_full, nthreads, modes, stride);
   printf("\n");
 
   printf("Benchmark for parmap apply only (big comp):\n");
-  bench_all_modes(bench_parmap_apply, nthreads, modes);
+  bench_all_modes(bench_parmap_apply, nthreads, modes, stride);
   printf("\n");
 
   return EXIT_SUCCESS;