XBT_PUBLIC(void) xbt_parmap_apply(xbt_parmap_t parmap,
void_f_pvoid_t fun,
- xbt_dynar_t data);
-XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap);
+ xbt_dynar_t data,
+ unsigned stride,
+ int once);
+XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap, unsigned *index);
#ifdef HAVE_MC
XBT_PUBLIC(xbt_parmap_t) xbt_parmap_mc_new(unsigned int num_workers,
s_smx_ctx_base_t super; /* Fields of super implementation */
char *malloced_stack; /* malloced area containing the stack */
raw_stack_t stack_top; /* pointer to stack top (within previous area) */
+ unsigned int parmap_index;
#ifdef HAVE_VALGRIND_VALGRIND_H
unsigned int valgrind_stack_id; /* the valgrind stack id */
#endif
{
#ifdef CONTEXT_THREADS
/* determine the next context */
- smx_process_t next_work = xbt_parmap_next(raw_parmap);
+ smx_process_t next_work;
smx_context_t next_context;
raw_stack_t next_stack;
+ next_work = xbt_parmap_next(raw_parmap,
+ &((smx_ctx_raw_t) context)->parmap_index);
if (next_work != NULL) {
/* there is a next process to resume */
XBT_DEBUG("Run next process");
next_context = next_work->context;
next_stack = ((smx_ctx_raw_t) next_context)->stack_top;
+ ((smx_ctx_raw_t) next_context)->parmap_index =
+ ((smx_ctx_raw_t) context)->parmap_index;
}
else {
/* all processes were run, go to the barrier */
raw_workers_context[worker_id] = worker_context;
XBT_DEBUG("Saving worker stack %lu", worker_id);
raw_stack_t* worker_stack = &(worker_context)->stack_top;
-
-
- smx_context_t context = first_process->context;
- SIMIX_context_set_current(context);
- raw_swapcontext(worker_stack, ((smx_ctx_raw_t) context)->stack_top);
+ unsigned int index = 0;
+
+ first_process = xbt_parmap_next(raw_parmap, &index);
+ if (first_process != NULL) {
+ smx_context_t context = first_process->context;
+ ((smx_ctx_raw_t) context)->parmap_index = index;
+ SIMIX_context_set_current(context);
+ raw_swapcontext(worker_stack, ((smx_ctx_raw_t) context)->stack_top);
+ }
#endif
}
#ifdef CONTEXT_THREADS
raw_threads_working = 0;
xbt_parmap_apply(raw_parmap, (void_f_pvoid_t) smx_ctx_raw_resume_parallel,
- simix_global->process_to_run);
+ simix_global->process_to_run,
+ 1 + SIMIX_context_get_parallel_threshold() / SIMIX_context_get_nthreads(), 1);
#else
xbt_die("You asked for a parallel execution, but you don't have any threads.");
#endif
typedef struct s_smx_ctx_sysv {
s_smx_ctx_base_t super; /* Fields of super implementation */
ucontext_t uc; /* the ucontext that executes the code */
+ unsigned int parmap_index;
#ifdef HAVE_VALGRIND_VALGRIND_H
unsigned int valgrind_stack_id; /* the valgrind stack id */
#endif
{
#ifdef CONTEXT_THREADS
/* determine the next context */
- smx_process_t next_work = xbt_parmap_next(sysv_parmap);
+ smx_process_t next_work;
smx_context_t next_context;
ucontext_t* next_stack;
+ next_work = xbt_parmap_next(sysv_parmap,
+ &((smx_ctx_sysv_t) context)->parmap_index);
if (next_work != NULL) {
/* there is a next process to resume */
XBT_DEBUG("Run next process");
next_context = next_work->context;
next_stack = &((smx_ctx_sysv_t) next_context)->uc;
+ ((smx_ctx_sysv_t) next_context)->parmap_index =
+ ((smx_ctx_sysv_t) context)->parmap_index;
}
else {
/* all processes were run, go to the barrier */
smx_ctx_sysv_t worker_context = (smx_ctx_sysv_t)SIMIX_context_self();
sysv_workers_context[worker_id] = worker_context;
ucontext_t* worker_stack = &worker_context->uc;
-
- smx_context_t context = first_process->context;
- SIMIX_context_set_current(context);
- swapcontext(worker_stack, &((smx_ctx_sysv_t) context)->uc);
+ unsigned int index = 0;
+
+ first_process = xbt_parmap_next(sysv_parmap, &index);
+ if (first_process != NULL) {
+ smx_context_t context = first_process->context;
+ ((smx_ctx_sysv_t) context)->parmap_index = index;
+ SIMIX_context_set_current(context);
+ swapcontext(worker_stack, &((smx_ctx_sysv_t) context)->uc);
+ }
#endif
}
#ifdef CONTEXT_THREADS
sysv_threads_working = 0;
xbt_parmap_apply(sysv_parmap, (void_f_pvoid_t) smx_ctx_sysv_resume_parallel,
- simix_global->process_to_run);
+ simix_global->process_to_run,
+ 1 + SIMIX_context_get_parallel_threshold() / SIMIX_context_get_nthreads(), 1);
#endif
}
if (surf_get_nthreads() > 1 && surf_do_par()) {
/* parallel version */
#ifdef CONTEXT_THREADS
- xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_share_resources, model_list);
+ xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_share_resources, model_list, 1, 0);
#else
xbt_die("Asked to run in parallel, but no thread at hand...");
#endif
if (surf_get_nthreads() > 1 && surf_do_par()) {
/* parallel version */
#ifdef CONTEXT_THREADS
- xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_update_actions_state, model_list);
+ xbt_parmap_apply(surf_parmap, (void_f_pvoid_t) surf_update_actions_state, model_list, 1, 0);
#endif
}
else {
void_f_pvoid_t fun; /**< function to run in parallel on each element of data */
xbt_dynar_t data; /**< parameters to pass to fun in parallel */
unsigned int index; /**< index of the next element of data to pick */
+ unsigned stride;
+ int apply_once;
#ifdef HAVE_MC
int finish;
* \param fun the function to call in parallel
* \param data each element of this dynar will be passed as an argument to fun
*/
-void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
+void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun,
+ xbt_dynar_t data, unsigned stride, int once)
{
/* Assign resources to worker threads */
parmap->fun = fun;
parmap->data = data;
parmap->index = 0;
+ parmap->stride = stride;
+ parmap->apply_once = once;
parmap->master_signal_f(parmap);
xbt_parmap_work(parmap);
parmap->master_wait_f(parmap);
*
* \return the next task to process, or NULL if there is no more work
*/
-void* xbt_parmap_next(xbt_parmap_t parmap)
+void* xbt_parmap_next(xbt_parmap_t parmap, unsigned *index)
{
- unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
- if (index < xbt_dynar_length(parmap->data)) {
- return xbt_dynar_get_as(parmap->data, index, void*);
- }
- return NULL;
+ unsigned i = *index;
+ if (i % parmap->stride == 0)
+ i = __sync_fetch_and_add(&parmap->index, parmap->stride);
+ *index = i + 1;
+ return i < xbt_dynar_length(parmap->data) ?
+ xbt_dynar_get_as(parmap->data, i, void*) : NULL;
}
static void xbt_parmap_work(xbt_parmap_t parmap)
{
- unsigned index;
- while ((index = __sync_fetch_and_add(&parmap->index, 1))
- < xbt_dynar_length(parmap->data))
- parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
+ if (parmap->apply_once) {
+ parmap->fun(NULL);
+ } else {
+ int more = 1;
+ unsigned index;
+ unsigned next;
+ do {
+ index = __sync_fetch_and_add(&parmap->index, parmap->stride);
+ next = index + parmap->stride;
+ if (next >= xbt_dynar_length(parmap->data)) {
+ next = xbt_dynar_length(parmap->data);
+ more = 0;
+ }
+ while (index < next) {
+ parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
+ index++;
+ }
+ } while (more);
+ }
}
/**
}
for (i = 0; i < num; i++)
- xbt_parmap_apply(parmap, fun_double, data);
+ xbt_parmap_apply(parmap, fun_double, data, 10, 0);
for (i = 0; i < len; i++) {
unsigned expected = (1U << num) * (i + 1) - 1;
for (i = 0; i < len; i++)
xbt_dynar_push_as(data, void *, &a[i]);
- xbt_parmap_apply(parmap, fun_get_id, data);
+ xbt_parmap_apply(parmap, fun_get_id, data, 1, 0);
qsort(a, len, sizeof a[0], fun_compare);
count = 1;
}
}
-static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode)
+static void bench_parmap_full(int nthreads, e_xbt_parmap_mode_t mode, unsigned stride)
{
unsigned *a;
xbt_dynar_t data;
start_time = xbt_os_time();
do {
parmap = xbt_parmap_new(nthreads, mode);
- xbt_parmap_apply(parmap, fun_to_apply, data);
+ xbt_parmap_apply(parmap, fun_to_apply, data, stride, 0);
xbt_parmap_destroy(parmap);
elapsed_time = xbt_os_time() - start_time;
i++;
xbt_free(a);
}
-static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode)
+static void bench_parmap_apply(int nthreads, e_xbt_parmap_mode_t mode, unsigned stride)
{
unsigned *a;
xbt_dynar_t data;
i = 0;
start_time = xbt_os_time();
do {
- xbt_parmap_apply(parmap, fun_to_apply, data);
+ xbt_parmap_apply(parmap, fun_to_apply, data, stride, 0);
elapsed_time = xbt_os_time() - start_time;
i++;
} while (elapsed_time < TIMEOUT);
xbt_free(a);
}
-static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t),
- int nthreads, unsigned modes)
+static void bench_all_modes(void (*bench_fun)(int, e_xbt_parmap_mode_t, unsigned),
+ int nthreads, unsigned modes, unsigned stride)
{
e_xbt_parmap_mode_t all_modes[] = {
XBT_PARMAP_POSIX, XBT_PARMAP_FUTEX,
unsigned i;
for (i = 0 ; i < sizeof all_modes / sizeof all_modes[0] ; i++) {
if (1U << i & modes)
- bench_fun(nthreads, all_modes[i]);
+ bench_fun(nthreads, all_modes[i], stride);
}
}
{
int nthreads;
unsigned modes = MODES_DEFAULT;
+ unsigned stride = 1;
SIMIX_global_init(&argc, argv);
- if (argc != 2 && argc != 3) {
+ if (argc < 2 || argc > 4) {
fprintf(stderr,
- "Usage: %s nthreads [modes]\n"
+ "Usage: %s nthreads [modes [stride]]\n"
" nthreads - number of working threads\n"
- " modes - bitmask of modes to test\n",
+ " modes - bitmask of modes to test\n"
+ " stride - parmap stride\n",
argv[0]);
return EXIT_FAILURE;
}
fprintf(stderr, "ERROR: invalid thread count: %d\n", nthreads);
return EXIT_FAILURE;
}
- if (argc == 3)
+ if (argc >= 3)
modes = strtol(argv[2], NULL, 0);
+ if (argc >= 4)
+ stride = atoi(argv[3]);
- printf("Parmap benchmark with %d workers (modes = %#x)...\n\n",
- nthreads, modes);
+ printf("Parmap benchmark with %d workers (modes = %#x; stride = %d)...\n\n",
+ nthreads, modes, stride);
fun_to_apply = fun_small_comp;
printf("Benchmark for parmap create+apply+destroy (small comp):\n");
- bench_all_modes(bench_parmap_full, nthreads, modes);
+ bench_all_modes(bench_parmap_full, nthreads, modes, stride);
printf("\n");
printf("Benchmark for parmap apply only (small comp):\n");
- bench_all_modes(bench_parmap_apply, nthreads, modes);
+ bench_all_modes(bench_parmap_apply, nthreads, modes, stride);
printf("\n");
fun_to_apply = fun_big_comp;
printf("Benchmark for parmap create+apply+destroy (big comp):\n");
- bench_all_modes(bench_parmap_full, nthreads, modes);
+ bench_all_modes(bench_parmap_full, nthreads, modes, stride);
printf("\n");
printf("Benchmark for parmap apply only (big comp):\n");
- bench_all_modes(bench_parmap_apply, nthreads, modes);
+ bench_all_modes(bench_parmap_apply, nthreads, modes, stride);
printf("\n");
return EXIT_SUCCESS;