From e5922b4dce28002422ca9d68a4c44521f67c0f6f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Christophe=20Thi=C3=A9ry?= Date: Fri, 14 Oct 2011 15:38:20 +0200 Subject: [PATCH] Simix: clean unfinished comms when terminating a process --- src/simix/process_private.h | 3 ++- src/simix/smx_network.c | 26 +++++++++++++++++-------- src/simix/smx_process.c | 39 ++++++++++++++++++++++++++++++++++--- src/simix/smx_user.c | 3 +-- 4 files changed, 57 insertions(+), 14 deletions(-) diff --git a/src/simix/process_private.h b/src/simix/process_private.h index c832f6d597..cfc9501a16 100644 --- a/src/simix/process_private.h +++ b/src/simix/process_private.h @@ -26,7 +26,8 @@ typedef struct s_smx_process { int blocked:1; int suspended:1; smx_host_t new_host; /* if not null, the host on which the process must migrate to */ - smx_action_t waiting_action; + smx_action_t waiting_action; /* the current blocking action if any */ + xbt_fifo_t comms; /* the current non-blocking communication actions */ xbt_dict_t properties; s_smx_req_t request; void *data; /* kept for compatibility, it should be replaced with moddata */ diff --git a/src/simix/smx_network.c b/src/simix/smx_network.c index 1325aac2ae..3c4440ebef 100644 --- a/src/simix/smx_network.c +++ b/src/simix/smx_network.c @@ -63,8 +63,7 @@ void SIMIX_rdv_destroy(smx_rdv_t rdv) void SIMIX_rdv_free(void *data) { smx_rdv_t rdv = (smx_rdv_t) data; - if (rdv->name) - xbt_free(rdv->name); + xbt_free(rdv->name); xbt_fifo_free(rdv->comm_fifo); xbt_free(rdv); } @@ -310,9 +309,10 @@ smx_action_t SIMIX_comm_isend(smx_process_t src_proc, smx_rdv_t rdv, action->state = SIMIX_READY; action->comm.type = SIMIX_COMM_READY; } + xbt_fifo_push(src_proc->comms, action); - /* If the communication action is detached then decrease the refcount - * by one, so it will be eliminated by the receivers destroy call */ + /* if the communication action is detached then decrease the refcount + * by one, so it will be eliminated by the receiver's destroy call */ if (detached) { action->comm.detached = 1; action->comm.refcount--; @@ -353,6 +353,7 @@ smx_action_t SIMIX_comm_irecv(smx_process_t dst_proc, smx_rdv_t rdv, action->state = SIMIX_READY; action->comm.type = SIMIX_COMM_READY; } + xbt_fifo_push(dst_proc->comms, action); /* Setup communication request */ action->comm.dst_proc = dst_proc; @@ -513,6 +514,7 @@ XBT_INLINE void SIMIX_comm_start(smx_action_t action) { /* If both the sender and the receiver are already there, start the communication */ if (action->state == SIMIX_READY) { + smx_host_t sender = action->comm.src_proc->smx_host; smx_host_t receiver = action->comm.dst_proc->smx_host; @@ -673,13 +675,21 @@ void SIMIX_post_comm(smx_action_t action) else action->state = SIMIX_DONE; - XBT_DEBUG("SIMIX_post_comm: action state = %d", action->state); + XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d", + action, action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached); + + /* remove the action from pending communications of both processes (if they still exist) */ + if (action->comm.src_proc) { + xbt_fifo_remove(action->comm.src_proc->comms, action); + } + if (action->comm.dst_proc) { + xbt_fifo_remove(action->comm.dst_proc->comms, action); + } - /* After this point the surf actions associated with the simix communicate - action are no longer needed, thus we delete them. */ + /* destroy the surf actions associated with the Simix communication */ SIMIX_comm_destroy_internal_actions(action); - /* If there are requests associated with the action, then answer them */ + /* if there are requests associated with the action, then answer them */ if (xbt_fifo_size(action->request_list)) SIMIX_comm_finish(action); } diff --git a/src/simix/smx_process.c b/src/simix/smx_process.c index 7a169dde29..7138375b3a 100644 --- a/src/simix/smx_process.c +++ b/src/simix/smx_process.c @@ -35,7 +35,38 @@ XBT_INLINE smx_process_t SIMIX_process_self(void) */ void SIMIX_process_cleanup(smx_process_t process) { - XBT_DEBUG("Cleanup process %s", process->name); + XBT_DEBUG("Cleanup process %s (%p), waiting action %p", + process->name, process, process->waiting_action); + + /* cancel non-blocking communications */ + smx_action_t action; + while ((action = xbt_fifo_pop(process->comms))) { + + /* make sure no one will finish the comm after this process is destroyed */ + SIMIX_comm_cancel(action); + + if (action->comm.src_proc == process) { + XBT_DEBUG("Found an unfinished send comm %p (detached = %d), state %d", + action, action->comm.detached, action->state); + action->comm.src_proc = NULL; + + if (action->comm.detached) { + /* the receiver was supposed to destroy the comm after completion, + * but the comm will actually never finish */ + action->comm.refcount++; + } + } + else if (action->comm.dst_proc == process){ + XBT_DEBUG("Found an unfinished recv comm %p, state %d", action, action->state); + action->comm.dst_proc = NULL; + } + else { + THROW_IMPOSSIBLE; + } + + SIMIX_comm_destroy(action); + } + /*xbt_swag_remove(process, simix_global->process_to_run);*/ xbt_swag_remove(process, simix_global->process_list); xbt_swag_remove(process, process->smx_host->process_list); @@ -61,6 +92,8 @@ void SIMIX_process_empty_trash(void) if (process->properties) xbt_dict_free(&process->properties); + xbt_fifo_free(process->comms); + free(process->name); process->name = NULL; free(process); @@ -143,6 +176,7 @@ void SIMIX_process_create(smx_process_t *process, (*process)->name = xbt_strdup(name); (*process)->smx_host = host; (*process)->data = data; + (*process)->comms = xbt_fifo_new(); XBT_VERB("Create context %s", (*process)->name); (*process)->context = SIMIX_context_new(code, argc, argv, @@ -200,9 +234,8 @@ void SIMIX_process_kill(smx_process_t process) { process->blocked = 0; process->suspended = 0; /* FIXME: set doexception to 0 also? */ - /* FIXME: asynchronous communication actions are not destroyed: - * another process may find a matching rdv with me after I'm freed */ + /* destroy the blocking action if any */ if (process->waiting_action) { switch (process->waiting_action->type) { diff --git a/src/simix/smx_user.c b/src/simix/smx_user.c index 243adf3082..464fc34895 100644 --- a/src/simix/smx_user.c +++ b/src/simix/smx_user.c @@ -807,8 +807,7 @@ void SIMIX_req_comm_destroy(smx_action_t comm) { xbt_assert(comm, "Invalid parameter"); - /* FIXME remove this request type: comms are auto-destroyed now, - * but what happens with unfinished comms? */ + /* FIXME remove this request type: comms are auto-destroyed now */ /* smx_req_t req = SIMIX_req_mine(); -- 2.20.1