X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/d2bb33858171f062e6bd6abd3d6982d37cd51de9..1ae612ef301a26795068505cf761169ed29a5451:/src/simix/smx_process.c diff --git a/src/simix/smx_process.c b/src/simix/smx_process.c index e182d8dfb5..bd3413769b 100644 --- a/src/simix/smx_process.c +++ b/src/simix/smx_process.c @@ -8,15 +8,12 @@ #include "xbt/sysdep.h" #include "xbt/log.h" #include "xbt/dict.h" -#include "msg/mailbox.h" #include "mc/mc.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_process, simix, "Logging specific to SIMIX (process)"); unsigned long simix_process_maxpid = 0; -/* FIXME: Ugly hack!*/ -extern double NOW; /** * \brief Returns the current agent. @@ -33,11 +30,68 @@ XBT_INLINE smx_process_t SIMIX_process_self(void) } /** - * \brief Move a process to the list of processes to destroy. + * \brief Returns whether a process has pending asynchronous communications. + * \return true if there are asynchronous communications in this process + */ +int SIMIX_process_has_pending_comms(smx_process_t process) { + + return xbt_fifo_size(process->comms) > 0; +} + +/** + * \brief Moves a process to the list of processes to destroy. */ void SIMIX_process_cleanup(smx_process_t process) { - DEBUG1("Cleanup process %s", process->name); + XBT_DEBUG("Cleanup process %s (%p), waiting action %p", + process->name, process, process->waiting_action); + + /* cancel non-blocking communications */ + smx_action_t action; + while ((action = xbt_fifo_pop(process->comms))) { + + /* make sure no one will finish the comm after this process is destroyed, + * because src_proc or dst_proc would be an invalid pointer */ + SIMIX_comm_cancel(action); + + if (action->comm.src_proc == process) { + XBT_DEBUG("Found an unfinished send comm %p (detached = %d), state %d, src = %p, dst = %p", + action, action->comm.detached, action->state, action->comm.src_proc, action->comm.dst_proc); + action->comm.src_proc = NULL; + + if (action->comm.detached) { + if (action->comm.refcount == 0) { + XBT_DEBUG("Increase the refcount before destroying it since it's detached"); + /* I'm not supposed to destroy a detached comm from the sender side, + * unless there is no receiver matching the rdv */ + action->comm.refcount++; + SIMIX_comm_destroy(action); + } + else { + XBT_DEBUG("Don't destroy it since its refcount is %d", action->comm.refcount); + } + } else { + SIMIX_comm_destroy(action); + } + } + else if (action->comm.dst_proc == process){ + XBT_DEBUG("Found an unfinished recv comm %p, state %d, src = %p, dst = %p", + action, action->state, action->comm.src_proc, action->comm.dst_proc); + action->comm.dst_proc = NULL; + + if (action->comm.detached && action->comm.refcount == 1 + && action->comm.src_proc != NULL) { + /* the comm will be freed right now, remove it from the sender */ + xbt_fifo_remove(action->comm.src_proc->comms, action); + } + SIMIX_comm_destroy(action); + } + else { + xbt_die("Communication action %p is in my list but I'm not the sender " + "or the receiver", action); + } + } + /*xbt_swag_remove(process, simix_global->process_to_run);*/ xbt_swag_remove(process, simix_global->process_list); xbt_swag_remove(process, process->smx_host->process_list); @@ -58,13 +112,12 @@ void SIMIX_process_empty_trash(void) SIMIX_context_free(process->context); /* Free the exception allocated at creation time */ - if (process->running_ctx) - free(process->running_ctx); - if (process->properties) - xbt_dict_free(&process->properties); + free(process->running_ctx); + xbt_dict_free(&process->properties); + + xbt_fifo_free(process->comms); free(process->name); - process->name = NULL; free(process); } } @@ -96,27 +149,15 @@ void SIMIX_create_maestro_process() smx_process_t SIMIX_process_create_from_wrapper(smx_process_arg_t args) { smx_process_t process; - - if (simix_global->create_process_function) { - process = simix_global->create_process_function(args->name, - args->code, - args->data, - args->hostname, - args->argc, - args->argv, - args->properties); - } - else { - process = SIMIX_process_create(args->name, - args->code, - args->data, - args->hostname, - args->argc, - args->argv, - args->properties); - } - // FIXME: to simplify this, simix_global->create_process_function could just - // be SIMIX_process_create() by default (and the same thing in smx_deployment.c) + simix_global->create_process_function( + &process, + args->name, + args->code, + args->data, + args->hostname, + args->argc, + args->argv, + args->properties); return process; } @@ -130,55 +171,75 @@ smx_process_t SIMIX_process_create_from_wrapper(smx_process_arg_t args) { * * \return the process created */ -smx_process_t SIMIX_process_create(const char *name, - xbt_main_func_t code, - void *data, - const char *hostname, - int argc, char **argv, - xbt_dict_t properties) { - - smx_process_t process = NULL; +void SIMIX_process_create(smx_process_t *process, + const char *name, + xbt_main_func_t code, + void *data, + const char *hostname, + int argc, char **argv, + xbt_dict_t properties) { + + *process = NULL; smx_host_t host = SIMIX_host_get_by_name(hostname); - DEBUG2("Start process %s on host %s", name, hostname); + XBT_DEBUG("Start process %s on host %s", name, hostname); if (!SIMIX_host_get_state(host)) { - WARN2("Cannot launch process '%s' on failed host '%s'", name, + XBT_WARN("Cannot launch process '%s' on failed host '%s'", name, hostname); } else { - process = xbt_new0(s_smx_process_t, 1); + *process = xbt_new0(s_smx_process_t, 1); - xbt_assert0(((code != NULL) && (host != NULL)), "Invalid parameters"); + xbt_assert(((code != NULL) && (host != NULL)), "Invalid parameters"); /* Process data */ - process->pid = simix_process_maxpid++; - process->name = xbt_strdup(name); - process->smx_host = host; - process->data = data; + (*process)->pid = simix_process_maxpid++; + (*process)->name = xbt_strdup(name); + (*process)->smx_host = host; + (*process)->data = data; + (*process)->comms = xbt_fifo_new(); + (*process)->request.issuer = *process; - VERB1("Create context %s", process->name); - process->context = SIMIX_context_new(code, argc, argv, - simix_global->cleanup_process_function, process); + XBT_VERB("Create context %s", (*process)->name); + (*process)->context = SIMIX_context_new(code, argc, argv, + simix_global->cleanup_process_function, *process); - process->running_ctx = xbt_new(xbt_running_ctx_t, 1); - XBT_RUNNING_CTX_INITIALIZE(process->running_ctx); + (*process)->running_ctx = xbt_new(xbt_running_ctx_t, 1); + XBT_RUNNING_CTX_INITIALIZE((*process)->running_ctx); /* Add properties */ - process->properties = properties; + (*process)->properties = properties; /* Add the process to it's host process list */ - xbt_swag_insert(process, host->process_list); + xbt_swag_insert(*process, host->process_list); - DEBUG1("Start context '%s'", process->name); + XBT_DEBUG("Start context '%s'", (*process)->name); /* Now insert it in the global process list and in the process to run list */ - xbt_swag_insert(process, simix_global->process_list); - DEBUG2("Inserting %s(%s) in the to_run list", process->name, host->name); - xbt_dynar_push_as(simix_global->process_to_run, smx_process_t, process); + xbt_swag_insert(*process, simix_global->process_list); + XBT_DEBUG("Inserting %s(%s) in the to_run list", (*process)->name, host->name); + xbt_dynar_push_as(simix_global->process_to_run, smx_process_t, *process); } +} - return process; +/** + * \brief Executes the processes from simix_global->process_to_run. + * + * The processes of simix_global->process_to_run are run (in parallel if + * possible). On exit, simix_global->process_to_run is empty, and + * simix_global->process_that_ran contains the list of processes that just ran. + * The two lists are swapped so, be careful when using them before and after a + * call to this function. + */ +void SIMIX_process_runall(void) +{ + SIMIX_context_runall(); + + xbt_dynar_t tmp = simix_global->process_that_ran; + simix_global->process_that_ran = simix_global->process_to_run; + simix_global->process_to_run = tmp; + xbt_dynar_reset(simix_global->process_to_run); } /** @@ -189,15 +250,16 @@ smx_process_t SIMIX_process_create(const char *name, * * \param process poor victim */ -void SIMIX_process_kill(smx_process_t process, smx_process_t killer) { +void SIMIX_process_kill(smx_process_t process) { - DEBUG2("Killing process %s on %s", process->name, process->smx_host->name); + XBT_DEBUG("Killing process %s on %s", process->name, process->smx_host->name); process->context->iwannadie = 1; process->blocked = 0; process->suspended = 0; /* FIXME: set doexception to 0 also? */ + /* destroy the blocking action if any */ if (process->waiting_action) { switch (process->waiting_action->type) { @@ -226,26 +288,22 @@ void SIMIX_process_kill(smx_process_t process, smx_process_t killer) { } } - /* If I'm killing myself then stop, otherwise schedule the process to kill. */ - if (process == killer) { - SIMIX_context_stop(process->context); - } - else { - xbt_dynar_push_as(simix_global->process_to_run, smx_process_t, process); - } + xbt_dynar_push_as(simix_global->process_to_run, smx_process_t, process); } /** * \brief Kills all running processes. - * - * Only maestro can kill everyone. + * \param issuer this one will not be killed */ -void SIMIX_process_killall(void) +void SIMIX_process_killall(smx_process_t issuer) { smx_process_t p = NULL; - while ((p = xbt_swag_extract(simix_global->process_list))) - SIMIX_process_kill(p, SIMIX_process_self()); + while ((p = xbt_swag_extract(simix_global->process_list))) { + if (p != issuer) { + SIMIX_process_kill(p); + } + } SIMIX_context_runall(simix_global->process_to_run); @@ -253,16 +311,17 @@ void SIMIX_process_killall(void) } void SIMIX_process_change_host(smx_process_t process, - const char *source, const char *dest) + smx_host_t dest) { - smx_host_t h1 = NULL; - smx_host_t h2 = NULL; - xbt_assert0((process != NULL), "Invalid parameters"); - h1 = SIMIX_host_get_by_name(source); - h2 = SIMIX_host_get_by_name(dest); - process->smx_host = h2; - xbt_swag_remove(process, h1->process_list); - xbt_swag_insert(process, h2->process_list); + xbt_assert((process != NULL), "Invalid parameters"); + xbt_swag_remove(process, process->smx_host->process_list); + process->smx_host = dest; + xbt_swag_insert(process, dest->process_list); +} + +void SIMIX_pre_process_change_host(smx_process_t process, smx_host_t dest) +{ + process->new_host = dest; } void SIMIX_pre_process_suspend(smx_req_t req) @@ -278,12 +337,17 @@ void SIMIX_pre_process_suspend(smx_req_t req) void SIMIX_process_suspend(smx_process_t process, smx_process_t issuer) { - xbt_assert0(process, "Invalid parameters"); + xbt_assert((process != NULL), "Invalid parameters"); + + if (process->suspended) { + XBT_DEBUG("Process '%s' is already suspended", process->name); + return; + } process->suspended = 1; /* If we are suspending another process, and it is waiting on an action, - suspend it's action. */ + suspend its action. */ if (process != issuer) { if (process->waiting_action) { @@ -304,7 +368,8 @@ void SIMIX_process_suspend(smx_process_t process, smx_process_t issuer) break; default: - THROW_IMPOSSIBLE; + xbt_die("Internal error in SIMIX_process_suspend: unexpected action type %d", + process->waiting_action->type); } } } @@ -312,7 +377,12 @@ void SIMIX_process_suspend(smx_process_t process, smx_process_t issuer) void SIMIX_process_resume(smx_process_t process, smx_process_t issuer) { - xbt_assert0((process != NULL), "Invalid parameters"); + xbt_assert((process != NULL), "Invalid parameters"); + + if (!process->suspended) { + XBT_DEBUG("Process '%s' is not suspended", process->name); + return; + } process->suspended = 0; @@ -338,7 +408,8 @@ void SIMIX_process_resume(smx_process_t process, smx_process_t issuer) break; default: - THROW_IMPOSSIBLE; + xbt_die("Internal error in SIMIX_process_resume: unexpected action type %d", + process->waiting_action->type); } } else { @@ -350,23 +421,27 @@ void SIMIX_process_resume(smx_process_t process, smx_process_t issuer) int SIMIX_process_get_maxpid(void) { return simix_process_maxpid; } + int SIMIX_process_count(void) { return xbt_swag_size(simix_global->process_list); } -void* SIMIX_process_self_get_data(void) +void* SIMIX_process_self_get_data(smx_process_t self) { - smx_process_t me = SIMIX_process_self(); - if (!me) { + xbt_assert(self == SIMIX_process_self(), "This is not the current process"); + + if (!self) { return NULL; } - return SIMIX_process_get_data(me); + return SIMIX_process_get_data(self); } -void SIMIX_process_self_set_data(void *data) +void SIMIX_process_self_set_data(smx_process_t self, void *data) { - SIMIX_process_set_data(SIMIX_process_self(), data); + xbt_assert(self == SIMIX_process_self(), "This is not the current process"); + + SIMIX_process_set_data(self, data); } void* SIMIX_process_get_data(smx_process_t process) @@ -400,17 +475,21 @@ const char* SIMIX_process_get_name(smx_process_t process) return process->name; } -int SIMIX_process_is_suspended(smx_process_t process) +smx_process_t SIMIX_process_get_by_name(const char* name) { - return process->suspended; + smx_process_t proc; + + xbt_swag_foreach(proc, simix_global->process_list) + { + if(!strcmp(name, proc->name)) + return proc; + } + return NULL; } -int SIMIX_process_is_enabled(smx_process_t process) +int SIMIX_process_is_suspended(smx_process_t process) { - if (process->request.call != REQ_NO_REQ && SIMIX_request_is_enabled(&process->request)) - return TRUE; - - return FALSE; + return process->suspended; } xbt_dict_t SIMIX_process_get_properties(smx_process_t process) @@ -421,9 +500,10 @@ xbt_dict_t SIMIX_process_get_properties(smx_process_t process) void SIMIX_pre_process_sleep(smx_req_t req) { if (MC_IS_ENABLED) { - NOW += req->process_sleep.duration; + MC_process_clock_add(req->issuer, req->process_sleep.duration); req->process_sleep.result = SIMIX_DONE; SIMIX_request_answer(req); + return; } smx_action_t action = SIMIX_process_sleep(req->issuer, req->process_sleep.duration); xbt_fifo_push(action->request_list, req); @@ -438,14 +518,13 @@ smx_action_t SIMIX_process_sleep(smx_process_t process, double duration) /* check if the host is active */ if (surf_workstation_model->extension. workstation.get_state(host->host) != SURF_RESOURCE_ON) { - THROW1(host_error, 0, "Host %s failed, you cannot call this function", + THROWF(host_error, 0, "Host %s failed, you cannot call this function", host->name); } - action = xbt_new0(s_smx_action_t, 1); + action = xbt_mallocator_get(simix_global->action_mallocator); action->type = SIMIX_ACTION_SLEEP; - action->request_list = xbt_fifo_new(); - action->name = xbt_strdup("sleep"); + action->name = NULL; #ifdef HAVE_TRACING action->category = NULL; #endif @@ -455,37 +534,44 @@ smx_action_t SIMIX_process_sleep(smx_process_t process, double duration) surf_workstation_model->extension.workstation.sleep(host->host, duration); surf_workstation_model->action_data_set(action->sleep.surf_sleep, action); - DEBUG1("Create sleep action %p", action); + XBT_DEBUG("Create sleep action %p", action); return action; } void SIMIX_post_process_sleep(smx_action_t action) { - e_smx_state_t state = SIMIX_action_map_state(surf_workstation_model->action_state_get(action->sleep.surf_sleep)); smx_req_t req; + e_smx_state_t state; while ((req = xbt_fifo_shift(action->request_list))) { + + switch(surf_workstation_model->action_state_get(action->sleep.surf_sleep)){ + case SURF_ACTION_FAILED: + state = SIMIX_SRC_HOST_FAILURE; + break; + + case SURF_ACTION_DONE: + state = SIMIX_DONE; + break; + + default: + THROW_IMPOSSIBLE; + break; + } req->process_sleep.result = state; req->issuer->waiting_action = NULL; SIMIX_request_answer(req); } - SIMIX_process_sleep_destroy(action); } void SIMIX_process_sleep_destroy(smx_action_t action) { - DEBUG1("Destroy action %p", action); - if (action->name) - xbt_free(action->name); + XBT_DEBUG("Destroy action %p", action); if (action->sleep.surf_sleep) action->sleep.surf_sleep->model_type->action_unref(action->sleep.surf_sleep); -#ifdef HAVE_TRACING - TRACE_smx_action_destroy(action); -#endif - xbt_fifo_free(action->request_list); - xbt_free(action); + xbt_mallocator_release(simix_global->action_mallocator, action); } void SIMIX_process_sleep_suspend(smx_action_t action) @@ -499,31 +585,38 @@ void SIMIX_process_sleep_resume(smx_action_t action) } /** - * Calling this function makes the process to yield. - * Only the processes can call this function, giving back the control to maestro + * \brief Calling this function makes the process to yield. + * + * Only the current process can call this function, giving back the control to + * maestro. + * + * \param self the current process */ -void SIMIX_process_yield(void) +void SIMIX_process_yield(smx_process_t self) { - smx_process_t self = SIMIX_process_self(); - - DEBUG1("Yield process '%s'", self->name); + XBT_DEBUG("Yield process '%s'", self->name); /* Go into sleep and return control to maestro */ SIMIX_context_suspend(self->context); /* Ok, maestro returned control to us */ - DEBUG1("Maestro returned control to me: '%s'", self->name); + XBT_DEBUG("Control returned to me: '%s'", self->name); if (self->context->iwannadie){ - DEBUG0("I wanna die!"); + XBT_DEBUG("I wanna die!"); SIMIX_context_stop(self->context); } if (self->doexception) { - DEBUG0("Wait, maestro left me an exception"); + XBT_DEBUG("Wait, maestro left me an exception"); self->doexception = 0; RETHROW; } + + if (self->new_host) { + SIMIX_process_change_host(self, self->new_host); + self->new_host = NULL; + } } /* callback: context fetching */ @@ -542,6 +635,15 @@ void SIMIX_process_exception_terminate(xbt_ex_t * e) smx_context_t SIMIX_process_get_context(smx_process_t p) { return p->context; } + void SIMIX_process_set_context(smx_process_t p,smx_context_t c) { p->context = c; } + +/** + * \brief Returns the list of processes to run. + */ +XBT_INLINE xbt_dynar_t SIMIX_process_get_runnable(void) +{ + return simix_global->process_to_run; +}