X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/61ba1f56206697142f14085360e932c3463030fb..b83dd387b08acb4e9cd4f9ea13e634b97e615133:/src/msg/msg_mailbox.c diff --git a/src/msg/msg_mailbox.c b/src/msg/msg_mailbox.c index 5a12459c07..a6a4444fdf 100644 --- a/src/msg/msg_mailbox.c +++ b/src/msg/msg_mailbox.c @@ -42,7 +42,7 @@ void MSG_mailbox_free(void *mailbox) { msg_mailbox_t _mailbox = (msg_mailbox_t) mailbox; - if (NULL != (_mailbox->hostname)) + if (_mailbox->hostname) free(_mailbox->hostname); xbt_fifo_free(_mailbox->tasks); @@ -51,11 +51,6 @@ void MSG_mailbox_free(void *mailbox) free(_mailbox); } -void MSG_mailbox_put(msg_mailbox_t mailbox, m_task_t task) -{ - xbt_fifo_push(mailbox->tasks, task); -} - smx_cond_t MSG_mailbox_get_cond(msg_mailbox_t mailbox) { return mailbox->cond; @@ -80,7 +75,7 @@ m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox) { xbt_fifo_item_t item; - if (NULL == (item = xbt_fifo_get_first_item(mailbox->tasks))) + if (!(item = xbt_fifo_get_first_item(mailbox->tasks))) return NULL; return (m_task_t) xbt_fifo_get_item_content(item); @@ -171,7 +166,7 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task, m_host_t h = NULL; simdata_task_t t_simdata = NULL; simdata_host_t h_simdata = NULL; - int first_time = 1; + double start_time = SIMIX_get_clock(); smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet @@ -206,51 +201,46 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task, break; } else { /* get the first task of the host */ - if (NULL != (t = MSG_mailbox_get_first_host_task(mailbox, host))) + if ((t = MSG_mailbox_get_first_host_task(mailbox, host))) break; } } - if (timeout > 0) { - if (!first_time) { - SIMIX_mutex_unlock(h->simdata->mutex); - /* set the simix condition of the mailbox to NULL */ - MSG_mailbox_set_cond(mailbox, NULL); - SIMIX_cond_destroy(cond); - MSG_RETURN(MSG_TRANSFER_FAILURE); - } + if ((timeout > 0) && (SIMIX_get_clock()-start_time>=timeout)) { + SIMIX_mutex_unlock(h->simdata->mutex); + MSG_mailbox_set_cond(mailbox, NULL); + SIMIX_cond_destroy(cond); + MSG_RETURN(MSG_TRANSFER_FAILURE); } - cond = SIMIX_cond_init(); - - /* set the condition of the mailbox */ - MSG_mailbox_set_cond(mailbox, cond); + if(!cond) { + cond = SIMIX_cond_init(); + MSG_mailbox_set_cond(mailbox, cond); + } if (timeout > 0) - SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout); + SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout-start_time); else SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex); - if (SIMIX_host_get_state(h_simdata->smx_host) == 0) { + SIMIX_mutex_unlock(h->simdata->mutex); MSG_mailbox_set_cond(mailbox, NULL); SIMIX_cond_destroy(cond); MSG_RETURN(MSG_HOST_FAILURE); } - - first_time = 0; } - SIMIX_mutex_unlock(h->simdata->mutex); DEBUG1("OK, got a task (%s)", t->name); /* clean conditional */ if (cond) { - SIMIX_cond_destroy(cond); - MSG_mailbox_set_cond(mailbox, NULL); + SIMIX_cond_destroy(cond); } + SIMIX_mutex_unlock(h->simdata->mutex); + t_simdata = t->simdata; t_simdata->receiver = process; *task = t; @@ -266,6 +256,16 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task, t->name, t_simdata->message_size, t_simdata->rate); + /* This is a hack. We know that both the receiver and the sender will + need to look at the content of t_simdata->comm. And it needs to be + destroyed. However, we don't known whether the receiver or the sender + will get to it first. So by setting whit refcount to 2 we can enforce + that things happen correctly. An alternative would be to only do ++ and + -- on this refcount and to sprinkle them judiciously throughout the code, + which appears perhaps worse? Or perhaps the refcount field of + task->simdata can be used for this? At any rate, this will do for now */ + t_simdata->comm->refcount = 2; + /* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */ if (MSG_process_is_suspended(t_simdata->sender)) { DEBUG1("Process sender (%s) suspended", t_simdata->sender->name); @@ -296,19 +296,31 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task, if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) { - SIMIX_action_destroy(t_simdata->comm); - t_simdata->comm = NULL; - t_simdata->using--; + if (t_simdata->comm->refcount == 1) { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + } else { + t_simdata->comm->refcount --; + } + t_simdata->refcount --; MSG_RETURN(MSG_OK); } else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) { - SIMIX_action_destroy(t_simdata->comm); - t_simdata->comm = NULL; - t_simdata->using--; + if (t_simdata->comm->refcount == 1) { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + } else { + t_simdata->comm->refcount --; + } + t_simdata->refcount --; MSG_RETURN(MSG_HOST_FAILURE); } else { - SIMIX_action_destroy(t_simdata->comm); - t_simdata->comm = NULL; - t_simdata->using--; + if (t_simdata->comm->refcount ==1 ) { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + } else { + t_simdata->comm->refcount --; + } + t_simdata->refcount --; MSG_RETURN(MSG_TRANSFER_FAILURE); } } @@ -319,54 +331,53 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, { m_process_t process = MSG_process_self(); const char *hostname; - simdata_task_t task_simdata = NULL; + simdata_task_t t_simdata = NULL; m_host_t local_host = NULL; m_host_t remote_host = NULL; smx_cond_t cond = NULL; CHECK_HOST(); - task_simdata = task->simdata; - task_simdata->sender = process; - task_simdata->source = MSG_process_get_host(process); + t_simdata = task->simdata; + t_simdata->sender = process; + t_simdata->source = MSG_process_get_host(process); - xbt_assert0(task_simdata->using == 1, + xbt_assert0(t_simdata->refcount == 1, "This task is still being used somewhere else. You cannot send it now. Go fix your code!"); - task_simdata->comm = NULL; + t_simdata->comm = NULL; - task_simdata->using++; + t_simdata->refcount ++; local_host = ((simdata_process_t) process->simdata)->m_host; + msg_global->sent_msg++; /* get the host name containing the mailbox */ hostname = MSG_mailbox_get_hostname(mailbox); remote_host = MSG_get_host_by_name(hostname); - if (NULL == remote_host) + if (!remote_host) THROW1(not_found_error, 0, "Host %s not fount", hostname); DEBUG4 ("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s", - task->simdata->message_size / 1000, local_host->name, + t_simdata->message_size / 1000, local_host->name, remote_host->name, MSG_mailbox_get_alias(mailbox)); SIMIX_mutex_lock(remote_host->simdata->mutex); /* put the task in the mailbox */ - MSG_mailbox_put(mailbox, task); + xbt_fifo_push(mailbox->tasks, task); - if (NULL != (cond = MSG_mailbox_get_cond(mailbox))) { + if ((cond = MSG_mailbox_get_cond(mailbox))) { DEBUG0("Somebody is listening. Let's wake him up!"); SIMIX_cond_signal(cond); } - - SIMIX_mutex_unlock(remote_host->simdata->mutex); - SIMIX_mutex_lock(task->simdata->mutex); + SIMIX_mutex_lock(t_simdata->mutex); process->simdata->waiting_task = task; @@ -380,11 +391,11 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, /*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */ while (1) { time_elapsed = SIMIX_get_clock() - time; - SIMIX_cond_wait_timeout(task->simdata->cond, task->simdata->mutex, + SIMIX_cond_wait_timeout(t_simdata->cond, t_simdata->mutex, timeout - time_elapsed); - if ((task->simdata->comm != NULL) - && (SIMIX_action_get_state(task->simdata->comm) != + if ((t_simdata->comm != NULL) + && (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING)) break; } @@ -393,19 +404,19 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, if (e.category == timeout_error) { xbt_ex_free(e); /* verify if the timeout happened and the communication didn't started yet */ - if (task->simdata->comm == NULL) { + if (t_simdata->comm == NULL) { process->simdata->waiting_task = NULL; /* remove the task from the mailbox */ MSG_mailbox_remove(mailbox, task); - if (task->simdata->receiver) { - task->simdata->receiver->simdata->waiting_task = NULL; + if (t_simdata->receiver) { + t_simdata->receiver->simdata->waiting_task = NULL; } - task->simdata->sender = NULL; + t_simdata->sender = NULL; - SIMIX_mutex_unlock(task->simdata->mutex); + SIMIX_mutex_unlock(t_simdata->mutex); MSG_RETURN(MSG_TRANSFER_FAILURE); } } else { @@ -414,9 +425,9 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, } } else { while (1) { - SIMIX_cond_wait(task->simdata->cond, task->simdata->mutex); + SIMIX_cond_wait(t_simdata->cond, t_simdata->mutex); - if (SIMIX_action_get_state(task->simdata->comm) != + if (SIMIX_action_get_state(t_simdata->comm) != SURF_ACTION_RUNNING) break; } @@ -426,19 +437,36 @@ MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task, process->simdata->waiting_task = NULL; /* the task has already finished and the pointer must be null */ - if (task->simdata->receiver) { - task->simdata->receiver->simdata->waiting_task = NULL; + if (t_simdata->receiver) { + t_simdata->receiver->simdata->waiting_task = NULL; } - task->simdata->sender = NULL; + t_simdata->sender = NULL; SIMIX_mutex_unlock(task->simdata->mutex); - - if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) { + if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) { + if (t_simdata->comm->refcount == 1) { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + } else { + t_simdata->comm->refcount --; + } MSG_RETURN(MSG_OK); } else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) { + if (t_simdata->comm->refcount == 1) { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + } else { + t_simdata->comm->refcount --; + } MSG_RETURN(MSG_HOST_FAILURE); } else { + if (t_simdata->comm->refcount == 1) { + SIMIX_action_destroy(t_simdata->comm); + t_simdata->comm = NULL; + } else { + t_simdata->comm->refcount --; + } MSG_RETURN(MSG_TRANSFER_FAILURE); } }