XBT_INFO("Hello!");
XBT_INFO("Suspend process");
MSG_process_suspend(MSG_process_self());
- MSG_task_execute(MSG_task_create("toto",10000000000000000,0,NULL));
+ MSG_task_execute(MSG_task_create("toto", 1e9, 0, NULL));
XBT_INFO("Bye!");
return 0;
} /* end_of_lazy_guy */
typedef void (*smx_pfn_context_start_t) (smx_context_t);
typedef void (*smx_pfn_context_stop_t) (smx_context_t);
typedef void (*smx_pfn_context_suspend_t) (smx_context_t context);
-typedef void (*smx_pfn_context_runall_t) (xbt_dynar_t processes);
+typedef void (*smx_pfn_context_runall_t) (void);
typedef smx_context_t (*smx_pfn_context_self_t) (void);
typedef void* (*smx_pfn_context_get_data_t) (smx_context_t context);
/** @addtogroup XBT_parmap
* @brief Parallel map.
*
- * A function is applied to all the elements of a dynar in parallel
- * using threads. The threads are persistent until the destruction
- * of the parmap object.
+ * A function is applied to the n first elements of a dynar in parallel,
+ * where n is the number of threads. The threads are persistent until the
+ * destruction of the parmap object.
+ *
+ * If there are more than n elements in the dynar, the worker threads should
+ * fetch themselves remaining work with xbt_parmap_next() and execute it.
+ *
* @{
*/
/** \brief Queue data type (opaque type) */
typedef struct s_xbt_parmap *xbt_parmap_t;
XBT_PUBLIC(xbt_parmap_t) xbt_parmap_new(unsigned int num_workers);
+XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap);
XBT_PUBLIC(void) xbt_parmap_apply(xbt_parmap_t parmap,
void_f_pvoid_t fun,
xbt_dynar_t data);
-
-XBT_PUBLIC(void) xbt_parmap_destroy(xbt_parmap_t parmap);
+XBT_PUBLIC(void*) xbt_parmap_next(xbt_parmap_t parmap);
+XBT_PUBLIC(unsigned long) xbt_parmap_get_worker_id(xbt_parmap_t parmap);
/** @} */
XBT_PUBLIC(void *) xbt_swag_extract(xbt_swag_t swag);
XBT_PUBLIC(int) xbt_swag_size(xbt_swag_t swag);
-#define xbt_swag_getPrev(obj,offset) (((xbt_swag_hookup_t)(((char *) (obj)) + (offset)))->prev)
-#define xbt_swag_getNext(obj,offset) (((xbt_swag_hookup_t)(((char *) (obj)) + (offset)))->next)
-
-static XBT_INLINE int xbt_swag_belongs(void *obj, xbt_swag_t swag)
-{
- return ((xbt_swag_getNext(obj, swag->offset))
- || (xbt_swag_getPrev(obj, swag->offset))
- || (swag->head == obj));
-}
+#define xbt_swag_getPrev(obj, offset) (((xbt_swag_hookup_t)(((char *) (obj)) + (offset)))->prev)
+#define xbt_swag_getNext(obj, offset) (((xbt_swag_hookup_t)(((char *) (obj)) + (offset)))->next)
+#define xbt_swag_belongs(obj, swag) (xbt_swag_getNext((obj), (swag)->offset) || (swag)->tail == (obj))
static XBT_INLINE void *xbt_swag_getFirst(xbt_swag_t swag)
{
double untiltimer;
unsigned int cpt;
- int volatile ran_ok;
+ volatile int ran_ok;
s_gras_msg_t msg;
ran_ok = 0;
TRY {
xbt_dynar_foreach(list->cbs, cpt, cb) {
+ volatile unsigned int cpt2 = cpt;
if (!ran_ok) {
XBT_DEBUG
("Use the callback #%d (@%p) for incomming msg '%s' (payload_size=%d)",
ran_ok = 1;
}
}
+ cpt = cpt2;
}
}
CATCH(e) {
unsigned int cursor;
- xbt_ex_t e;
xbt_set_elm_t elem;
if (!pd->libdata) {
}
xbt_dynar_foreach(_gras_procdata_fabrics, cursor, fab) {
- volatile int found = 0;
if (cursor + 1 <= xbt_set_length(pd->libdata)) {
XBT_DEBUG("Skip fabric %d: there is already %ld libdata",
xbt_assert(fab.name, "Name of fabric #%d is NULL!", cursor);
XBT_DEBUG("Create the procdata for %s", fab.name);
/* Check for our own errors */
- TRY {
- xbt_set_get_by_name(pd->libdata, fab.name);
- found = 1;
- }
- CATCH(e) {
- xbt_ex_free(e);
- found = 0;
- }
- if (found)
+
+ if (xbt_set_get_by_name_or_null(pd->libdata, fab.name) != NULL)
THROWF(unknown_error, 0,
"MayDay: two modules use '%s' as libdata name", fab.name);
}
/**
- \brief executes all the processes (in parallel if possible)
- \param processes the dynar of processes to execute
+ \brief Executes all the processes to run (in parallel if possible).
*/
-static XBT_INLINE void SIMIX_context_runall(xbt_dynar_t processes)
+static XBT_INLINE void SIMIX_context_runall()
{
- (*(simix_global->context_factory->runall)) (processes);
+ (*(simix_global->context_factory->runall)) ();
}
/**
-/* context_raw - context switching with ucontextes from System V */
+/* context_raw - fast context switching inspired from System V ucontextes */
/* Copyright (c) 2009, 2010. The SimGrid Team.
* All rights reserved. */
#include "simix/private.h"
#include "xbt/parmap.h"
-
#ifdef HAVE_VALGRIND_VALGRIND_H
# include <valgrind/valgrind.h>
#endif /* HAVE_VALGRIND_VALGRIND_H */
typedef void (*rawctx_entry_point_t)(void *);
typedef struct s_smx_ctx_raw {
- s_smx_ctx_base_t super; /* Fields of super implementation */
- char *malloced_stack; /* malloced area containing the stack */
- raw_stack_t stack_top; /* pointer to stack top (within previous area) */
- raw_stack_t old_stack_top; /* to whom I should return the control */
+ s_smx_ctx_base_t super; /* Fields of super implementation */
+ char *malloced_stack; /* malloced area containing the stack */
+ raw_stack_t stack_top; /* pointer to stack top (within previous area) */
#ifdef HAVE_VALGRIND_VALGRIND_H
- unsigned int valgrind_stack_id; /* the valgrind stack id */
+ unsigned int valgrind_stack_id; /* the valgrind stack id */
#endif
#ifdef TIME_BENCH
- unsigned int thread; /* Just for measuring purposes */
+ unsigned int thread; /* Just for measuring purposes */
#endif
} s_smx_ctx_raw_t, *smx_ctx_raw_t;
-smx_ctx_raw_t maestro_raw_context;
+#ifdef CONTEXT_THREADS
+static xbt_parmap_t raw_parmap;
+static raw_stack_t* raw_local_maestro_stacks; /* space to save maestro's stack in each thread */
+#endif
+
+static unsigned long raw_process_index = 0; /* index of the next process to run in the
+ * list of runnable processes */
+smx_ctx_raw_t raw_maestro_context;
extern raw_stack_t raw_makecontext(char* malloced_stack, int stack_size,
rawctx_entry_point_t entry_point, void* arg);
".text\n"
".globl raw_swapcontext\n"
".type raw_swapcontext,@function\n"
- "raw_swapcontext:\n" /* Calling convention sets the arguments in rdi and rsi, respectively */
+ "raw_swapcontext:\n" /* Calling convention sets the arguments in rdi and rsi, respectively */
#endif
" pushq %rdi\n"
" pushq %rsi\n"
#else
/* If you implement raw contextes for other processors, don't forget to
- update the definition of HAVE_RAWCTX in buildtools/Cmake/AddTests.cmake */
+ update the definition of HAVE_RAWCTX in buildtools/Cmake/CompleteInFiles.cmake */
raw_stack_t raw_makecontext(char* malloced_stack, int stack_size,
rawctx_entry_point_t entry_point, void* arg) {
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context);
-#ifdef CONTEXT_THREADS
-static xbt_parmap_t parmap;
-#endif
-
#ifdef TIME_BENCH
#include "xbt/xbt_os_time.h"
#define NUM_THREADS 4
#endif
static void smx_ctx_raw_wrapper(smx_ctx_raw_t context);
+static int smx_ctx_raw_factory_finalize(smx_context_factory_t *factory);
+static smx_context_t smx_ctx_raw_create_context(xbt_main_func_t code, int argc,
+ char **argv, void_pfn_smxprocess_t cleanup_func, void *data);
+static void smx_ctx_raw_free(smx_context_t context);
+static void smx_ctx_raw_wrapper(smx_ctx_raw_t context);
+static void smx_ctx_raw_stop(smx_context_t context);
+static void smx_ctx_raw_suspend_serial(smx_context_t context);
+static void smx_ctx_raw_resume_serial(smx_process_t first_process);
+static void smx_ctx_raw_runall_serial(void);
+static void smx_ctx_raw_suspend_parallel(smx_context_t context);
+static void smx_ctx_raw_resume_parallel(smx_process_t first_process);
+static void smx_ctx_raw_runall_parallel(void);
+static void smx_ctx_raw_runall(void);
+
+void SIMIX_ctx_raw_factory_init(smx_context_factory_t *factory)
+{
+ XBT_VERB("Using raw contexts. Because the glibc is just not good enough for us.");
+ smx_ctx_base_factory_init(factory);
+
+ (*factory)->finalize = smx_ctx_raw_factory_finalize;
+ (*factory)->create_context = smx_ctx_raw_create_context;
+ /* Do not overload that method (*factory)->finalize */
+ (*factory)->free = smx_ctx_raw_free;
+ (*factory)->stop = smx_ctx_raw_stop;
+ (*factory)->name = "smx_raw_context_factory";
+
+ if (SIMIX_context_is_parallel()) {
+#ifdef CONTEXT_THREADS
+ int nthreads = SIMIX_context_get_nthreads();
+ raw_parmap = xbt_parmap_new(nthreads);
+ raw_local_maestro_stacks = xbt_new(raw_stack_t, nthreads);
+#endif
+ if (SIMIX_context_get_parallel_threshold() > 1) {
+ /* choose dynamically */
+ (*factory)->runall = smx_ctx_raw_runall;
+ (*factory)->suspend = NULL;
+ }
+ else {
+ /* always parallel */
+ (*factory)->runall = smx_ctx_raw_runall_parallel;
+ (*factory)->suspend = smx_ctx_raw_suspend_parallel;
+ }
+ }
+ else {
+ /* always serial */
+ (*factory)->runall = smx_ctx_raw_runall_serial;
+ (*factory)->suspend = smx_ctx_raw_suspend_serial;
+ }
+#ifdef TIME_BENCH
+ timer = xbt_os_timer_new();
+#endif
+}
static int smx_ctx_raw_factory_finalize(smx_context_factory_t *factory)
{
#endif
#ifdef CONTEXT_THREADS
- if(parmap)
- xbt_parmap_destroy(parmap);
+ if (raw_parmap)
+ xbt_parmap_destroy(raw_parmap);
+ xbt_free(raw_local_maestro_stacks);
#endif
return smx_ctx_base_factory_finalize(factory);
}
-
static smx_context_t
smx_ctx_raw_create_context(xbt_main_func_t code, int argc, char **argv,
void_pfn_smxprocess_t cleanup_func,
cleanup_func,
data);
- /* If the user provided a function for the process then use it
- otherwise is the context for maestro */
+ /* if the user provided a function for the process then use it,
+ otherwise it is the context for maestro */
if (code) {
context->malloced_stack = xbt_malloc0(smx_context_stack_size);
context->stack_top =
context->malloced_stack + smx_context_stack_size);
#endif /* HAVE_VALGRIND_VALGRIND_H */
- }else{
- maestro_raw_context = context;
+ } else {
+ raw_maestro_context = context;
}
return (smx_context_t) context;
smx_ctx_base_free(context);
}
-static void smx_ctx_raw_suspend(smx_context_t context)
+static void smx_ctx_raw_wrapper(smx_ctx_raw_t context)
{
- SIMIX_context_set_current((smx_context_t) maestro_raw_context);
- raw_swapcontext(
- &((smx_ctx_raw_t) context)->stack_top,
- ((smx_ctx_raw_t) context)->old_stack_top);
+ (context->super.code) (context->super.argc, context->super.argv);
+
+ smx_ctx_raw_stop((smx_context_t) context);
}
static void smx_ctx_raw_stop(smx_context_t context)
{
smx_ctx_base_stop(context);
- smx_ctx_raw_suspend(context);
+ simix_global->context_factory->suspend(context);
}
-static void smx_ctx_raw_wrapper(smx_ctx_raw_t context)
-{
- (context->super.code) (context->super.argc, context->super.argv);
-
- smx_ctx_raw_stop((smx_context_t) context);
+static void smx_ctx_raw_suspend_serial(smx_context_t context)
+{
+ /* determine the next context */
+ smx_context_t next_context;
+ unsigned long int i = raw_process_index++;
+
+ if (i < xbt_dynar_length(simix_global->process_to_run)) {
+ /* execute the next process */
+ XBT_DEBUG("Run next process");
+ next_context = xbt_dynar_get_as(
+ simix_global->process_to_run,i, smx_process_t)->context;
+ }
+ else {
+ /* all processes were run, return to maestro */
+ XBT_DEBUG("No more process to run");
+ next_context = (smx_context_t) raw_maestro_context;
+ }
+ SIMIX_context_set_current(next_context);
+ raw_swapcontext(&((smx_ctx_raw_t) context)->stack_top,
+ ((smx_ctx_raw_t) next_context)->stack_top);
}
-static void smx_ctx_raw_resume(smx_process_t process)
+static void smx_ctx_raw_resume_serial(smx_process_t first_process)
{
- smx_ctx_raw_t context = (smx_ctx_raw_t)process->context;
+ smx_ctx_raw_t context = (smx_ctx_raw_t) first_process->context;
SIMIX_context_set_current((smx_context_t) context);
- raw_swapcontext(
- &((smx_ctx_raw_t) context)->old_stack_top,
+ raw_swapcontext(&raw_maestro_context->stack_top,
((smx_ctx_raw_t) context)->stack_top);
}
XBT_VERB("New scheduling round");
}
#else
-static void smx_ctx_raw_runall_serial(xbt_dynar_t processes)
+static void smx_ctx_raw_runall_serial(void)
{
- smx_process_t process;
- unsigned int cursor;
+ if (xbt_dynar_length(simix_global->process_to_run) > 0) {
+ smx_process_t first_process =
+ xbt_dynar_get_as(simix_global->process_to_run, 0, smx_process_t);
+ raw_process_index = 1;
- xbt_dynar_foreach(processes, cursor, process) {
- XBT_DEBUG("Schedule item %u of %lu",cursor,xbt_dynar_length(processes));
- smx_ctx_raw_resume(process);
+ /* execute the first process */
+ smx_ctx_raw_resume_serial(first_process);
}
}
#endif
-static void smx_ctx_raw_runall_parallel(xbt_dynar_t processes)
+static void smx_ctx_raw_stop_parallel(smx_context_t context)
{
-#ifdef CONTEXT_THREADS
- xbt_parmap_apply(parmap, (void_f_pvoid_t)smx_ctx_raw_resume, processes);
-#endif
+ smx_ctx_base_stop(context);
+ smx_ctx_raw_suspend_parallel(context);
}
-static void smx_ctx_raw_runall(xbt_dynar_t processes)
+static void smx_ctx_raw_suspend_parallel(smx_context_t context)
{
- if (xbt_dynar_length(processes) >= SIMIX_context_get_parallel_threshold()) {
- XBT_DEBUG("Runall // %lu", xbt_dynar_length(processes));
- smx_ctx_raw_runall_parallel(processes);
- } else {
- XBT_DEBUG("Runall serial %lu", xbt_dynar_length(processes));
- smx_ctx_raw_runall_serial(processes);
+#ifdef CONTEXT_THREADS
+ /* determine the next context */
+ smx_process_t next_work = xbt_parmap_next(raw_parmap);
+ smx_context_t next_context;
+ raw_stack_t next_stack;
+
+ if (next_work != NULL) {
+ /* there is a next process to resume */
+ XBT_DEBUG("Run next process");
+ next_context = next_work->context;
+ next_stack = ((smx_ctx_raw_t) next_context)->stack_top;
+ }
+ else {
+ /* all processes were run, go to the barrier */
+ XBT_DEBUG("No more processes to run");
+ next_context = (smx_context_t) raw_maestro_context;
+ unsigned long worker_id = xbt_parmap_get_worker_id(raw_parmap);
+ next_stack = raw_local_maestro_stacks[worker_id];
}
+
+ SIMIX_context_set_current(next_context);
+ raw_swapcontext(&((smx_ctx_raw_t) context)->stack_top, next_stack);
+#endif
}
-void SIMIX_ctx_raw_factory_init(smx_context_factory_t *factory)
+static void smx_ctx_raw_resume_parallel(smx_process_t first_process)
{
- XBT_VERB("Using raw contexts. Because the glibc is just not good enough for us.");
- smx_ctx_base_factory_init(factory);
+#ifdef CONTEXT_THREADS
+ unsigned long worker_id = xbt_parmap_get_worker_id(raw_parmap);
+ raw_stack_t* local_maestro_stack = &raw_local_maestro_stacks[worker_id];
- (*factory)->finalize = smx_ctx_raw_factory_finalize;
- (*factory)->create_context = smx_ctx_raw_create_context;
- /* Do not overload that method (*factory)->finalize */
- (*factory)->free = smx_ctx_raw_free;
- (*factory)->stop = smx_ctx_raw_stop;
- (*factory)->suspend = smx_ctx_raw_suspend;
- (*factory)->name = "smx_raw_context_factory";
+ smx_context_t context = first_process->context;
+ SIMIX_context_set_current(context);
+ raw_swapcontext(local_maestro_stack, ((smx_ctx_raw_t) context)->stack_top);
+#endif
+}
- if (SIMIX_context_is_parallel()) {
+static void smx_ctx_raw_runall_parallel(void)
+{
#ifdef CONTEXT_THREADS
- parmap = xbt_parmap_new(SIMIX_context_get_nthreads());
+ xbt_parmap_apply(raw_parmap, (void_f_pvoid_t) smx_ctx_raw_resume_parallel,
+ simix_global->process_to_run);
#endif
- if (SIMIX_context_get_parallel_threshold() > 1) {
- /* choose dynamically */
- (*factory)->runall = smx_ctx_raw_runall;
- }
- else {
- /* always parallel */
- (*factory)->runall = smx_ctx_raw_runall_parallel;
- }
- }
- else {
- /* always serial */
- (*factory)->runall = smx_ctx_raw_runall_serial;
+}
+
+static void smx_ctx_raw_runall(void)
+{
+ unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run);
+ if (nb_processes >= SIMIX_context_get_parallel_threshold()) {
+ XBT_DEBUG("Runall // %lu", nb_processes);
+ simix_global->context_factory->suspend = smx_ctx_raw_suspend_parallel;
+ smx_ctx_raw_runall_parallel();
+ } else {
+ XBT_DEBUG("Runall serial %lu", nb_processes);
+ simix_global->context_factory->suspend = smx_ctx_raw_suspend_serial;
+ smx_ctx_raw_runall_serial();
}
-#ifdef TIME_BENCH
- timer = xbt_os_timer_new();
-#endif
}
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context);
#ifdef CONTEXT_THREADS
-static xbt_parmap_t parmap;
+static xbt_parmap_t smx_ctx_sysv_parmap;
+static ucontext_t* smx_ctx_sysv_local_maestro_uc; /* space to save maestro's stack in each thread */
#endif
+static unsigned long smx_ctx_sysv_process_index = 0; /* index of the next process to run in the
+ * list of runnable processes */
+static smx_ctx_sysv_t smx_ctx_sysv_maestro_context;
+
static smx_context_t
smx_ctx_sysv_create_context(xbt_main_func_t code, int argc, char **argv,
static void smx_ctx_sysv_wrapper(int count, ...);
+static void smx_ctx_sysv_stop_serial(smx_context_t context);
+static void smx_ctx_sysv_suspend_serial(smx_context_t context);
+static void smx_ctx_sysv_resume_serial(smx_process_t first_process);
+static void smx_ctx_sysv_runall_serial(void);
+
+static void smx_ctx_sysv_stop_parallel(smx_context_t context);
+static void smx_ctx_sysv_suspend_parallel(smx_context_t context);
+static void smx_ctx_sysv_resume_parallel(smx_process_t first_process);
+static void smx_ctx_sysv_runall_parallel(void);
+
+
/* This is a bit paranoid about SIZEOF_VOIDP not being a multiple of SIZEOF_INT,
* but it doesn't harm. */
#define CTX_ADDR_LEN (SIZEOF_VOIDP / SIZEOF_INT + !!(SIZEOF_VOIDP % SIZEOF_INT))
(*factory)->create_context = smx_ctx_sysv_create_context;
/* Do not overload that method (*factory)->finalize */
(*factory)->free = smx_ctx_sysv_free;
- (*factory)->stop = smx_ctx_sysv_stop;
- (*factory)->suspend = smx_ctx_sysv_suspend;
(*factory)->name = "smx_sysv_context_factory";
if (SIMIX_context_is_parallel()) {
#ifdef CONTEXT_THREADS /* To use parallel ucontexts a thread pool is needed */
- parmap = xbt_parmap_new(2);
+ int nthreads = SIMIX_context_get_nthreads();
+ smx_ctx_sysv_parmap = xbt_parmap_new(nthreads);
+ smx_ctx_sysv_local_maestro_uc = xbt_new(ucontext_t, nthreads);
+ (*factory)->stop = smx_ctx_sysv_stop_parallel;
+ (*factory)->suspend = smx_ctx_sysv_suspend_parallel;
(*factory)->runall = smx_ctx_sysv_runall_parallel;
(*factory)->self = smx_ctx_sysv_self_parallel;
#else
THROWF(arg_error, 0, "No thread support for parallel context execution");
#endif
- }else{
- (*factory)->runall = smx_ctx_sysv_runall;
+ } else {
+ (*factory)->stop = smx_ctx_sysv_stop_serial;
+ (*factory)->suspend = smx_ctx_sysv_suspend_serial;
+ (*factory)->runall = smx_ctx_sysv_runall_serial;
}
}
int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory)
{
#ifdef CONTEXT_THREADS
- if(parmap)
- xbt_parmap_destroy(parmap);
+ if (smx_ctx_sysv_parmap)
+ xbt_parmap_destroy(smx_ctx_sysv_parmap);
+ xbt_free(smx_ctx_sysv_local_maestro_uc);
#endif
return smx_ctx_base_factory_finalize(factory);
}
cleanup_func,
data);
- /* If the user provided a function for the process then use it
- otherwise is the context for maestro */
+ /* if the user provided a function for the process then use it,
+ otherwise it is the context for maestro */
if (code) {
- int res;
- res = getcontext(&(context->uc));
- xbt_assert(res == 0,
- "Error in context saving: %d (%s)", errno,
- strerror(errno));
+ getcontext(&(context->uc));
context->uc.uc_link = NULL;
ctx_addr.addr = context;
makecontext(&context->uc, (void (*)())smx_ctx_sysv_wrapper,
CTX_ADDR_LEN, CTX_ADDR_SPLIT(ctx_addr));
- }else{
- maestro_context = context;
+ } else {
+ smx_ctx_sysv_maestro_context = context;
}
return (smx_context_t) context;
-
}
static smx_context_t
smx_ctx_base_free(context);
}
-void smx_ctx_sysv_stop(smx_context_t context)
-{
- smx_ctx_base_stop(context);
- smx_ctx_sysv_suspend(context);
-}
-
void smx_ctx_sysv_wrapper(int first, ...)
{
union u_ctx_addr ctx_addr;
va_list ap;
int i;
va_start(ap, first);
- for(i = 1; i < CTX_ADDR_LEN; i++)
+ for (i = 1; i < CTX_ADDR_LEN; i++)
ctx_addr.intv[i] = va_arg(ap, int);
va_end(ap);
}
context = ctx_addr.addr;
(context->super.code) (context->super.argc, context->super.argv);
- smx_ctx_sysv_stop((smx_context_t) context);
+ simix_global->context_factory->stop((smx_context_t) context);
}
-void smx_ctx_sysv_suspend(smx_context_t context)
+static void smx_ctx_sysv_stop_serial(smx_context_t context)
{
- SIMIX_context_set_current((smx_context_t) maestro_context);
- int rv;
- rv = swapcontext(&((smx_ctx_sysv_t) context)->uc, &((smx_ctx_sysv_t)context)->old_uc);
+ smx_ctx_base_stop(context);
+ smx_ctx_sysv_suspend_serial(context);
+}
- xbt_assert((rv == 0), "Context swapping failure");
+static void smx_ctx_sysv_suspend_serial(smx_context_t context)
+{
+ /* determine the next context */
+ smx_context_t next_context;
+ unsigned long int i = smx_ctx_sysv_process_index++;
+
+ if (i < xbt_dynar_length(simix_global->process_to_run)) {
+ /* execute the next process */
+ XBT_DEBUG("Run next process");
+ next_context = xbt_dynar_get_as(
+ simix_global->process_to_run,i, smx_process_t)->context;
+ }
+ else {
+ /* all processes were run, return to maestro */
+ XBT_DEBUG("No more process to run");
+ next_context = (smx_context_t) smx_ctx_sysv_maestro_context;
+ }
+ SIMIX_context_set_current(next_context);
+ swapcontext(&((smx_ctx_sysv_t) context)->uc,
+ &((smx_ctx_sysv_t) next_context)->uc);
}
-void smx_ctx_sysv_resume(smx_context_t context)
+static void smx_ctx_sysv_resume_serial(smx_process_t first_process)
{
+ smx_context_t context = first_process->context;
SIMIX_context_set_current(context);
- int rv;
- rv = swapcontext(&((smx_ctx_sysv_t)context)->old_uc, &((smx_ctx_sysv_t) context)->uc);
+ swapcontext(&smx_ctx_sysv_maestro_context->uc,
+ &((smx_ctx_sysv_t) context)->uc);
+}
+
+static void smx_ctx_sysv_runall_serial(void)
+{
+ if (xbt_dynar_length(simix_global->process_to_run) > 0) {
+ smx_process_t first_process =
+ xbt_dynar_get_as(simix_global->process_to_run, 0, smx_process_t);
+ smx_ctx_sysv_process_index = 1;
- xbt_assert((rv == 0), "Context swapping failure");
+ /* execute the first process */
+ smx_ctx_sysv_resume_serial(first_process);
+ }
}
-void smx_ctx_sysv_runall(xbt_dynar_t processes)
+static void smx_ctx_sysv_stop_parallel(smx_context_t context)
{
- smx_process_t process;
- unsigned int cursor;
+ smx_ctx_base_stop(context);
+ smx_ctx_sysv_suspend_parallel(context);
+}
- xbt_dynar_foreach(processes, cursor, process) {
- XBT_DEBUG("Schedule item %u of %lu",cursor,xbt_dynar_length(processes));
- smx_ctx_sysv_resume(process->context);
+static void smx_ctx_sysv_suspend_parallel(smx_context_t context)
+{
+#ifdef CONTEXT_THREADS
+ /* determine the next context */
+ smx_process_t next_work = xbt_parmap_next(smx_ctx_sysv_parmap);
+ smx_context_t next_context;
+ ucontext_t* next_uc;
+
+ if (next_work != NULL) {
+ /* there is a next process to resume */
+ XBT_DEBUG("Run next process");
+ next_context = next_work->context;
+ next_uc = &((smx_ctx_sysv_t) next_context)->uc;
+ }
+ else {
+ /* all processes were run, go to the barrier */
+ XBT_DEBUG("No more processes to run");
+ next_context = (smx_context_t) smx_ctx_sysv_maestro_context;
+ unsigned long worker_id = xbt_parmap_get_worker_id(smx_ctx_sysv_parmap);
+ next_uc = &smx_ctx_sysv_local_maestro_uc[worker_id];
}
+
+ SIMIX_context_set_current(next_context);
+ swapcontext(&((smx_ctx_sysv_t) context)->uc, next_uc);
+#endif
}
-void smx_ctx_sysv_resume_parallel(smx_process_t process)
+static void smx_ctx_sysv_resume_parallel(smx_process_t first_process)
{
- smx_context_t context = process->context;
- SIMIX_context_set_current((smx_context_t) context);
- int rv;
- rv = swapcontext(&((smx_ctx_sysv_t)context)->old_uc, &((smx_ctx_sysv_t) context)->uc);
- SIMIX_context_set_current((smx_context_t) maestro_context);
+#ifdef CONTEXT_THREADS
+ unsigned long worker_id = xbt_parmap_get_worker_id(smx_ctx_sysv_parmap);
+ ucontext_t* local_maestro_uc = &smx_ctx_sysv_local_maestro_uc[worker_id];
- xbt_assert((rv == 0), "Context swapping failure");
+ smx_context_t context = first_process->context;
+ SIMIX_context_set_current(context);
+ swapcontext(local_maestro_uc, &((smx_ctx_sysv_t) context)->uc);
+#endif
}
-void smx_ctx_sysv_runall_parallel(xbt_dynar_t processes)
+static void smx_ctx_sysv_runall_parallel(void)
{
#ifdef CONTEXT_THREADS
- xbt_parmap_apply(parmap, (void_f_pvoid_t)smx_ctx_sysv_resume_parallel, processes);
+ xbt_parmap_apply(smx_ctx_sysv_parmap, (void_f_pvoid_t) smx_ctx_sysv_resume_parallel,
+ simix_global->process_to_run);
#endif
}
typedef struct s_smx_ctx_sysv {
s_smx_ctx_base_t super; /* Fields of super implementation */
- ucontext_t uc; /* the thread that execute the code */
- ucontext_t old_uc; /* the context that was swapped with */
+ ucontext_t uc; /* the ucontext that executes the code */
#ifdef HAVE_VALGRIND_VALGRIND_H
unsigned int valgrind_stack_id; /* the valgrind stack id */
#endif
char stack[0]; /* the thread stack (must remain the last element of the structure) */
} s_smx_ctx_sysv_t, *smx_ctx_sysv_t;
-smx_ctx_sysv_t maestro_context;
-
void SIMIX_ctx_sysv_factory_init(smx_context_factory_t *factory);
int smx_ctx_sysv_factory_finalize(smx_context_factory_t *factory);
void_pfn_smxprocess_t cleanup_func,
void *data);
void smx_ctx_sysv_free(smx_context_t context);
-void smx_ctx_sysv_stop(smx_context_t context);
-void smx_ctx_sysv_suspend(smx_context_t context);
-void smx_ctx_sysv_resume(smx_context_t new_context);
-void smx_ctx_sysv_runall(xbt_dynar_t processes);
-void smx_ctx_sysv_resume_parallel(smx_process_t new_context);
-void smx_ctx_sysv_runall_parallel(xbt_dynar_t processes);
int smx_ctx_sysv_get_thread_id(void);
smx_context_t smx_ctx_sysv_self_parallel(void);
static void smx_ctx_thread_stop(smx_context_t context);
static void smx_ctx_thread_suspend(smx_context_t context);
static void smx_ctx_thread_resume(smx_context_t new_context);
-static void smx_ctx_thread_runall_serial(xbt_dynar_t processes);
-static void smx_ctx_thread_runall_parallel(xbt_dynar_t processes);
+static void smx_ctx_thread_runall_serial(void);
+static void smx_ctx_thread_runall_parallel(void);
static smx_context_t smx_ctx_thread_self(void);
static int smx_ctx_thread_factory_finalize(smx_context_factory_t *factory);
xbt_os_sem_acquire(smx_ctx_thread_sem);
}
-static void smx_ctx_thread_runall_serial(xbt_dynar_t processes)
+static void smx_ctx_thread_runall_serial(void)
{
smx_process_t process;
unsigned int cursor;
- xbt_dynar_foreach(processes, cursor, process) {
+ xbt_dynar_foreach(simix_global->process_to_run, cursor, process) {
xbt_os_sem_release(((smx_ctx_thread_t) process->context)->begin);
xbt_os_sem_acquire(((smx_ctx_thread_t) process->context)->end);
}
}
-static void smx_ctx_thread_runall_parallel(xbt_dynar_t processes)
+static void smx_ctx_thread_runall_parallel(void)
{
unsigned int index;
smx_process_t process;
- xbt_dynar_foreach(processes, index, process)
+ xbt_dynar_foreach(simix_global->process_to_run, index, process)
xbt_os_sem_release(((smx_ctx_thread_t) process->context)->begin);
- xbt_dynar_foreach(processes, index, process) {
+ xbt_dynar_foreach(simix_global->process_to_run, index, process) {
xbt_os_sem_acquire(((smx_ctx_thread_t) process->context)->end);
}
}
break;
default:
- THROW_IMPOSSIBLE;
+ xbt_die("Internal error in SIMIX_execution_finish: unexpected action state %d",
+ action->state);
}
req->issuer->waiting_action = NULL;
req->host_execution_wait.result = action->state;
*/
void SIMIX_process_runall(void)
{
- SIMIX_context_runall(simix_global->process_to_run);
+ SIMIX_context_runall();
+
xbt_dynar_t tmp = simix_global->process_that_ran;
simix_global->process_that_ran = simix_global->process_to_run;
simix_global->process_to_run = tmp;
process->suspended = 1;
/* If we are suspending another process, and it is waiting on an action,
- suspend it's action. */
+ suspend its action. */
if (process != issuer) {
if (process->waiting_action) {
break;
default:
- THROW_IMPOSSIBLE;
+ xbt_die("Internal error in SIMIX_process_suspend: unexpected action type %d",
+ process->waiting_action->type);
}
}
}
break;
default:
- THROW_IMPOSSIBLE;
+ xbt_die("Internal error in SIMIX_process_resume: unexpected action type %d",
+ process->waiting_action->type);
}
}
else {
SIMIX_context_suspend(self->context);
/* Ok, maestro returned control to us */
- XBT_DEBUG("Maestro returned control to me: '%s'", self->name);
+ XBT_DEBUG("Control returned to me: '%s'", self->name);
if (self->context->iwannadie){
XBT_DEBUG("I wanna die!");
xbt_mallocator_t variable_mallocator;
} s_lmm_system_t;
-#define extract_variable(sys) xbt_swag_remove(xbt_swag_getFirst(&(sys->variable_set)),&(sys->variable_set))
-#define extract_constraint(sys) xbt_swag_remove(xbt_swag_getFirst(&(sys->constraint_set)),&(sys->constraint_set))
+#define extract_variable(sys) xbt_swag_extract(&(sys->variable_set))
+#define extract_constraint(sys) xbt_swag_extract(&(sys->constraint_set))
#define insert_constraint(sys,cnst) xbt_swag_insert(cnst,&(sys->constraint_set))
#define remove_variable(sys,var) do {xbt_swag_remove(var,&(sys->variable_set));\
xbt_swag_remove(var,&(sys->saturated_variable_set));} while(0)
/* minimal number of user contexts to be run in parallel */
default_value_int = 1;
xbt_cfg_register(&_surf_cfg_set, "contexts/parallel_threshold",
- "Minimal number of user contexts to be run in parallel",
+ "Minimal number of user contexts to be run in parallel (raw contexts only)",
xbt_cfgelm_int, &default_value_int, 1, 1,
_surf_cfg_cb_contexts_parallel_threshold, NULL);
void xbt_parmap_destroy(xbt_parmap_t parmap)
{
- XBT_DEBUG("Destroy parmap %p", parmap);
parmap->status = PARMAP_DESTROY;
#ifdef HAVE_FUTEX_H
xbt_event_signal(parmap->sync_event);
XBT_DEBUG("Job done");
}
+void* xbt_parmap_next(xbt_parmap_t parmap) {
+
+ unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
+ if (index < xbt_dynar_length(parmap->data)) {
+ return xbt_dynar_get_as(parmap->data, index, void*);
+ }
+ return NULL;
+}
+
+unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap) {
+ return (unsigned long) xbt_os_thread_get_extra_data();
+}
+
static void *_xbt_parmap_worker_main(void *arg)
{
unsigned int worker_id;
- xbt_parmap_t parmap = (xbt_parmap_t)arg;
+ xbt_parmap_t parmap = (xbt_parmap_t) arg;
/* Fetch a worker id */
worker_id = __sync_fetch_and_add(&parmap->workers_max_id, 1);
- xbt_os_thread_set_extra_data((void *)(unsigned long)worker_id);
+ xbt_os_thread_set_extra_data((void*) (unsigned long) worker_id);
XBT_DEBUG("New worker thread created (%u)", worker_id);
/* Worker's main loop */
- while(1){
+ while (1) {
#ifdef HAVE_FUTEX_H
xbt_event_wait(parmap->sync_event);
#endif
- if(parmap->status == PARMAP_WORK){
- unsigned int i;
- unsigned int n = 0;
+ if (parmap->status == PARMAP_WORK) {
XBT_DEBUG("Worker %u got a job", worker_id);
- while ((i = __sync_fetch_and_add(&parmap->index, 1))
- < xbt_dynar_length(parmap->data)) {
- parmap->fun(xbt_dynar_get_as(parmap->data, i, void*));
- n++;
+ void* work = xbt_parmap_next(parmap);
+ if (work != NULL) {
+ parmap->fun(work);
}
- XBT_DEBUG("Worker %u processed %u tasks", worker_id, n);
+ XBT_DEBUG("Worker %u has finished", worker_id);
/* We are destroying the parmap */
- }else{
+ } else {
#ifdef HAVE_FUTEX_H
xbt_event_end(parmap->sync_event);
#endif
*/
XBT_INLINE void xbt_swag_insert_at_head(void *obj, xbt_swag_t swag)
{
-
- if (xbt_swag_belongs(obj, swag))
- return;
-
- (swag->count)++;
- if (swag->head == NULL) {
+ if (!swag->head) {
xbt_assert(!(swag->tail), "Inconsistent swag.");
swag->head = obj;
swag->tail = obj;
- return;
+ swag->count++;
+ }
+ else if (obj != swag->head && !xbt_swag_getPrev(obj, swag->offset)) {
+ xbt_swag_getNext(obj, swag->offset) = swag->head;
+ xbt_swag_getPrev(swag->head, swag->offset) = obj;
+ swag->head = obj;
+ swag->count++;
}
-
- xbt_swag_getNext(obj, swag->offset) = swag->head;
- xbt_swag_getPrev(swag->head, swag->offset) = obj;
- swag->head = obj;
}
/**
*/
XBT_INLINE void xbt_swag_insert_at_tail(void *obj, xbt_swag_t swag)
{
-
- if (xbt_swag_belongs(obj, swag))
+ if (xbt_swag_belongs(obj, swag)) {
return;
+ }
- (swag->count)++;
- if (swag->head == NULL) {
- xbt_assert(!(swag->tail), "Inconsistent swag.");
+ if (!swag->tail) {
+ xbt_assert(!(swag->head), "Inconsistent swag.");
swag->head = obj;
swag->tail = obj;
- return;
+ swag->count++;
+ }
+ else if (obj != swag->tail && !xbt_swag_getNext(obj, swag->offset)) {
+ xbt_swag_getPrev(obj, swag->offset) = swag->tail;
+ xbt_swag_getNext(swag->tail, swag->offset) = obj;
+ swag->tail = obj;
+ swag->count++;
}
-
- xbt_swag_getPrev(obj, swag->offset) = swag->tail;
- xbt_swag_getNext(swag->tail, swag->offset) = obj;
- swag->tail = obj;
}
/**
*/
XBT_INLINE void *xbt_swag_remove(void *obj, xbt_swag_t swag)
{
- size_t offset = swag->offset;
-
- if ((!obj) || (!swag))
- return NULL;
- if (!xbt_swag_belongs(obj, swag)) /* Trying to remove an object that
- was not in this swag */
+ if (!obj)
return NULL;
- if (swag->head == swag->tail) { /* special case */
- if (swag->head != obj) /* Trying to remove an object that was not in this swag */
- return NULL;
- swag->head = NULL;
- swag->tail = NULL;
- xbt_swag_getNext(obj, offset) = xbt_swag_getPrev(obj, offset) = NULL;
- } else if (obj == swag->head) { /* It's the head */
- swag->head = xbt_swag_getNext(obj, offset);
- xbt_swag_getPrev(swag->head, offset) = NULL;
- xbt_swag_getNext(obj, offset) = NULL;
- } else if (obj == swag->tail) { /* It's the tail */
- swag->tail = xbt_swag_getPrev(obj, offset);
- xbt_swag_getNext(swag->tail, offset) = NULL;
+ size_t offset = swag->offset;
+ void* prev = xbt_swag_getPrev(obj, offset);
+ void* next = xbt_swag_getNext(obj, offset);
+
+ if (prev) {
+ xbt_swag_getNext(prev, offset) = next;
xbt_swag_getPrev(obj, offset) = NULL;
- } else { /* It's in the middle */
- xbt_swag_getNext(xbt_swag_getPrev(obj, offset), offset) =
- xbt_swag_getNext(obj, offset);
- xbt_swag_getPrev(xbt_swag_getNext(obj, offset), offset) =
- xbt_swag_getPrev(obj, offset);
- xbt_swag_getPrev(obj, offset) = xbt_swag_getNext(obj, offset) = NULL;
+ if (next) {
+ xbt_swag_getPrev(next, offset) = prev;
+ xbt_swag_getNext(obj, offset) = NULL;
+ }
+ else {
+ swag->tail = prev;
+ }
+ swag->count--;
}
- (swag->count)--;
+ else if (next) {
+ xbt_swag_getPrev(next, offset) = NULL;
+ xbt_swag_getNext(obj, offset) = NULL;
+ swag->head = next;
+ swag->count--;
+ }
+ else if (obj == swag->head) {
+ swag->head = swag->tail = NULL;
+ swag->count--;
+ }
+
return obj;
}
*/
void *xbt_swag_extract(xbt_swag_t swag)
{
- size_t offset = swag->offset;
- void *obj = NULL;
-
- if ((!swag) || (!(swag->head)))
+ if (!swag->head)
return NULL;
- obj = swag->head;
+ size_t offset = swag->offset;
+ void* obj = swag->head;
- if (swag->head == swag->tail) { /* special case */
+ if (obj == swag->tail) { /* special case */
swag->head = swag->tail = NULL;
- xbt_swag_getPrev(obj, offset) = xbt_swag_getNext(obj, offset) = NULL;
} else {
swag->head = xbt_swag_getNext(obj, offset);
xbt_swag_getPrev(swag->head, offset) = NULL;