int blocked:1;
int suspended:1;
smx_host_t new_host; /* if not null, the host on which the process must migrate to */
- smx_action_t waiting_action;
+ smx_action_t waiting_action; /* the current blocking action if any */
+ xbt_fifo_t comms; /* the current non-blocking communication actions */
xbt_dict_t properties;
s_smx_req_t request;
void *data; /* kept for compatibility, it should be replaced with moddata */
void SIMIX_rdv_free(void *data)
{
smx_rdv_t rdv = (smx_rdv_t) data;
- if (rdv->name)
- xbt_free(rdv->name);
+ xbt_free(rdv->name);
xbt_fifo_free(rdv->comm_fifo);
xbt_free(rdv);
}
action->state = SIMIX_READY;
action->comm.type = SIMIX_COMM_READY;
}
+ xbt_fifo_push(src_proc->comms, action);
- /* If the communication action is detached then decrease the refcount
- * by one, so it will be eliminated by the receivers destroy call */
+ /* if the communication action is detached then decrease the refcount
+ * by one, so it will be eliminated by the receiver's destroy call */
if (detached) {
action->comm.detached = 1;
action->comm.refcount--;
action->state = SIMIX_READY;
action->comm.type = SIMIX_COMM_READY;
}
+ xbt_fifo_push(dst_proc->comms, action);
/* Setup communication request */
action->comm.dst_proc = dst_proc;
{
/* If both the sender and the receiver are already there, start the communication */
if (action->state == SIMIX_READY) {
+
smx_host_t sender = action->comm.src_proc->smx_host;
smx_host_t receiver = action->comm.dst_proc->smx_host;
else
action->state = SIMIX_DONE;
- XBT_DEBUG("SIMIX_post_comm: action state = %d", action->state);
+ XBT_DEBUG("SIMIX_post_comm: comm %p, state %d, src_proc %p, dst_proc %p, detached: %d",
+ action, action->state, action->comm.src_proc, action->comm.dst_proc, action->comm.detached);
+
+ /* remove the action from pending communications of both processes (if they still exist) */
+ if (action->comm.src_proc) {
+ xbt_fifo_remove(action->comm.src_proc->comms, action);
+ }
+ if (action->comm.dst_proc) {
+ xbt_fifo_remove(action->comm.dst_proc->comms, action);
+ }
- /* After this point the surf actions associated with the simix communicate
- action are no longer needed, thus we delete them. */
+ /* destroy the surf actions associated with the Simix communication */
SIMIX_comm_destroy_internal_actions(action);
- /* If there are requests associated with the action, then answer them */
+ /* if there are requests associated with the action, then answer them */
if (xbt_fifo_size(action->request_list))
SIMIX_comm_finish(action);
}
*/
void SIMIX_process_cleanup(smx_process_t process)
{
- XBT_DEBUG("Cleanup process %s", process->name);
+ XBT_DEBUG("Cleanup process %s (%p), waiting action %p",
+ process->name, process, process->waiting_action);
+
+ /* cancel non-blocking communications */
+ smx_action_t action;
+ while ((action = xbt_fifo_pop(process->comms))) {
+
+ /* make sure no one will finish the comm after this process is destroyed */
+ SIMIX_comm_cancel(action);
+
+ if (action->comm.src_proc == process) {
+ XBT_DEBUG("Found an unfinished send comm %p (detached = %d), state %d",
+ action, action->comm.detached, action->state);
+ action->comm.src_proc = NULL;
+
+ if (action->comm.detached) {
+ /* the receiver was supposed to destroy the comm after completion,
+ * but the comm will actually never finish */
+ action->comm.refcount++;
+ }
+ }
+ else if (action->comm.dst_proc == process){
+ XBT_DEBUG("Found an unfinished recv comm %p, state %d", action, action->state);
+ action->comm.dst_proc = NULL;
+ }
+ else {
+ THROW_IMPOSSIBLE;
+ }
+
+ SIMIX_comm_destroy(action);
+ }
+
/*xbt_swag_remove(process, simix_global->process_to_run);*/
xbt_swag_remove(process, simix_global->process_list);
xbt_swag_remove(process, process->smx_host->process_list);
if (process->properties)
xbt_dict_free(&process->properties);
+ xbt_fifo_free(process->comms);
+
free(process->name);
process->name = NULL;
free(process);
(*process)->name = xbt_strdup(name);
(*process)->smx_host = host;
(*process)->data = data;
+ (*process)->comms = xbt_fifo_new();
XBT_VERB("Create context %s", (*process)->name);
(*process)->context = SIMIX_context_new(code, argc, argv,
process->blocked = 0;
process->suspended = 0;
/* FIXME: set doexception to 0 also? */
- /* FIXME: asynchronous communication actions are not destroyed:
- * another process may find a matching rdv with me after I'm freed */
+ /* destroy the blocking action if any */
if (process->waiting_action) {
switch (process->waiting_action->type) {
{
xbt_assert(comm, "Invalid parameter");
- /* FIXME remove this request type: comms are auto-destroyed now,
- * but what happens with unfinished comms? */
+ /* FIXME remove this request type: comms are auto-destroyed now */
/*
smx_req_t req = SIMIX_req_mine();