A
lgorithmique
N
umérique
D
istribuée
Public GIT Repository
projects
/
simgrid.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
| inline |
side by side
Fixed a race condition in msg for communication between two processes. The
[simgrid.git]
/
src
/
msg
/
msg_mailbox.c
diff --git
a/src/msg/msg_mailbox.c
b/src/msg/msg_mailbox.c
index
5a12459
..
a6a4444
100644
(file)
--- a/
src/msg/msg_mailbox.c
+++ b/
src/msg/msg_mailbox.c
@@
-42,7
+42,7
@@
void MSG_mailbox_free(void *mailbox)
{
msg_mailbox_t _mailbox = (msg_mailbox_t) mailbox;
- if (
NULL != (_mailbox->hostname)
)
+ if (
_mailbox->hostname
)
free(_mailbox->hostname);
xbt_fifo_free(_mailbox->tasks);
@@
-51,11
+51,6
@@
void MSG_mailbox_free(void *mailbox)
free(_mailbox);
}
-void MSG_mailbox_put(msg_mailbox_t mailbox, m_task_t task)
-{
- xbt_fifo_push(mailbox->tasks, task);
-}
-
smx_cond_t MSG_mailbox_get_cond(msg_mailbox_t mailbox)
{
return mailbox->cond;
@@
-80,7
+75,7
@@
m_task_t MSG_mailbox_get_head(msg_mailbox_t mailbox)
{
xbt_fifo_item_t item;
- if (
NULL ==
(item = xbt_fifo_get_first_item(mailbox->tasks)))
+ if (
!
(item = xbt_fifo_get_first_item(mailbox->tasks)))
return NULL;
return (m_task_t) xbt_fifo_get_item_content(item);
@@
-171,7
+166,7
@@
MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task,
m_host_t h = NULL;
simdata_task_t t_simdata = NULL;
simdata_host_t h_simdata = NULL;
-
int first_time = 1
;
+
double start_time = SIMIX_get_clock()
;
smx_cond_t cond = NULL; //conditional wait if the task isn't on the channel yet
@@
-206,51
+201,46
@@
MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task,
break;
} else {
/* get the first task of the host */
- if (
NULL !=
(t = MSG_mailbox_get_first_host_task(mailbox, host)))
+ if ((t = MSG_mailbox_get_first_host_task(mailbox, host)))
break;
}
}
- if (timeout > 0) {
- if (!first_time) {
- SIMIX_mutex_unlock(h->simdata->mutex);
- /* set the simix condition of the mailbox to NULL */
- MSG_mailbox_set_cond(mailbox, NULL);
- SIMIX_cond_destroy(cond);
- MSG_RETURN(MSG_TRANSFER_FAILURE);
- }
+ if ((timeout > 0) && (SIMIX_get_clock()-start_time>=timeout)) {
+ SIMIX_mutex_unlock(h->simdata->mutex);
+ MSG_mailbox_set_cond(mailbox, NULL);
+ SIMIX_cond_destroy(cond);
+ MSG_RETURN(MSG_TRANSFER_FAILURE);
}
- cond = SIMIX_cond_init();
-
- /* set the condition of the mailbox */
- MSG_mailbox_set_cond(mailbox, cond);
+ if(!cond) {
+ cond = SIMIX_cond_init();
+ MSG_mailbox_set_cond(mailbox, cond);
+ }
if (timeout > 0)
- SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout);
+ SIMIX_cond_wait_timeout(cond, h->simdata->mutex, timeout
-start_time
);
else
SIMIX_cond_wait(MSG_mailbox_get_cond(mailbox), h->simdata->mutex);
-
if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
+ SIMIX_mutex_unlock(h->simdata->mutex);
MSG_mailbox_set_cond(mailbox, NULL);
SIMIX_cond_destroy(cond);
MSG_RETURN(MSG_HOST_FAILURE);
}
-
- first_time = 0;
}
- SIMIX_mutex_unlock(h->simdata->mutex);
DEBUG1("OK, got a task (%s)", t->name);
/* clean conditional */
if (cond) {
- SIMIX_cond_destroy(cond);
-
MSG_mailbox_set_cond(mailbox, NULL);
+ SIMIX_cond_destroy(cond);
}
+ SIMIX_mutex_unlock(h->simdata->mutex);
+
t_simdata = t->simdata;
t_simdata->receiver = process;
*task = t;
@@
-266,6
+256,16
@@
MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task,
t->name, t_simdata->message_size,
t_simdata->rate);
+ /* This is a hack. We know that both the receiver and the sender will
+ need to look at the content of t_simdata->comm. And it needs to be
+ destroyed. However, we don't known whether the receiver or the sender
+ will get to it first. So by setting whit refcount to 2 we can enforce
+ that things happen correctly. An alternative would be to only do ++ and
+ -- on this refcount and to sprinkle them judiciously throughout the code,
+ which appears perhaps worse? Or perhaps the refcount field of
+ task->simdata can be used for this? At any rate, this will do for now */
+ t_simdata->comm->refcount = 2;
+
/* if the process is suspend, create the action but stop its execution, it will be restart when the sender process resume */
if (MSG_process_is_suspended(t_simdata->sender)) {
DEBUG1("Process sender (%s) suspended", t_simdata->sender->name);
@@
-296,19
+296,31
@@
MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, m_task_t * task,
if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
- SIMIX_action_destroy(t_simdata->comm);
- t_simdata->comm = NULL;
- t_simdata->using--;
+ if (t_simdata->comm->refcount == 1) {
+ SIMIX_action_destroy(t_simdata->comm);
+ t_simdata->comm = NULL;
+ } else {
+ t_simdata->comm->refcount --;
+ }
+ t_simdata->refcount --;
MSG_RETURN(MSG_OK);
} else if (SIMIX_host_get_state(h_simdata->smx_host) == 0) {
- SIMIX_action_destroy(t_simdata->comm);
- t_simdata->comm = NULL;
- t_simdata->using--;
+ if (t_simdata->comm->refcount == 1) {
+ SIMIX_action_destroy(t_simdata->comm);
+ t_simdata->comm = NULL;
+ } else {
+ t_simdata->comm->refcount --;
+ }
+ t_simdata->refcount --;
MSG_RETURN(MSG_HOST_FAILURE);
} else {
- SIMIX_action_destroy(t_simdata->comm);
- t_simdata->comm = NULL;
- t_simdata->using--;
+ if (t_simdata->comm->refcount ==1 ) {
+ SIMIX_action_destroy(t_simdata->comm);
+ t_simdata->comm = NULL;
+ } else {
+ t_simdata->comm->refcount --;
+ }
+ t_simdata->refcount --;
MSG_RETURN(MSG_TRANSFER_FAILURE);
}
}
@@
-319,54
+331,53
@@
MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
{
m_process_t process = MSG_process_self();
const char *hostname;
- simdata_task_t t
ask
_simdata = NULL;
+ simdata_task_t t_simdata = NULL;
m_host_t local_host = NULL;
m_host_t remote_host = NULL;
smx_cond_t cond = NULL;
CHECK_HOST();
- t
ask
_simdata = task->simdata;
- t
ask
_simdata->sender = process;
- t
ask
_simdata->source = MSG_process_get_host(process);
+ t_simdata = task->simdata;
+ t_simdata->sender = process;
+ t_simdata->source = MSG_process_get_host(process);
- xbt_assert0(t
ask_simdata->using
== 1,
+ xbt_assert0(t
_simdata->refcount
== 1,
"This task is still being used somewhere else. You cannot send it now. Go fix your code!");
- t
ask
_simdata->comm = NULL;
+ t_simdata->comm = NULL;
- t
ask_simdata->using
++;
+ t
_simdata->refcount
++;
local_host = ((simdata_process_t) process->simdata)->m_host;
+ msg_global->sent_msg++;
/* get the host name containing the mailbox */
hostname = MSG_mailbox_get_hostname(mailbox);
remote_host = MSG_get_host_by_name(hostname);
- if (
NULL ==
remote_host)
+ if (
!
remote_host)
THROW1(not_found_error, 0, "Host %s not fount", hostname);
DEBUG4
("Trying to send a task (%g kB) from %s to %s on the channel aliased by the alias %s",
- t
ask->
simdata->message_size / 1000, local_host->name,
+ t
_
simdata->message_size / 1000, local_host->name,
remote_host->name, MSG_mailbox_get_alias(mailbox));
SIMIX_mutex_lock(remote_host->simdata->mutex);
/* put the task in the mailbox */
-
MSG_mailbox_put(mailbox
, task);
+
xbt_fifo_push(mailbox->tasks
, task);
- if (
NULL !=
(cond = MSG_mailbox_get_cond(mailbox))) {
+ if ((cond = MSG_mailbox_get_cond(mailbox))) {
DEBUG0("Somebody is listening. Let's wake him up!");
SIMIX_cond_signal(cond);
}
-
-
SIMIX_mutex_unlock(remote_host->simdata->mutex);
- SIMIX_mutex_lock(t
ask->
simdata->mutex);
+ SIMIX_mutex_lock(t
_
simdata->mutex);
process->simdata->waiting_task = task;
@@
-380,11
+391,11
@@
MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
/*verify if the action that ends is the correct. Call the wait_timeout with the new time. If the timeout occurs, an exception is raised */
while (1) {
time_elapsed = SIMIX_get_clock() - time;
- SIMIX_cond_wait_timeout(t
ask->simdata->cond, task->
simdata->mutex,
+ SIMIX_cond_wait_timeout(t
_simdata->cond, t_
simdata->mutex,
timeout - time_elapsed);
- if ((t
ask->
simdata->comm != NULL)
- && (SIMIX_action_get_state(t
ask->
simdata->comm) !=
+ if ((t
_
simdata->comm != NULL)
+ && (SIMIX_action_get_state(t
_
simdata->comm) !=
SURF_ACTION_RUNNING))
break;
}
@@
-393,19
+404,19
@@
MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
if (e.category == timeout_error) {
xbt_ex_free(e);
/* verify if the timeout happened and the communication didn't started yet */
- if (t
ask->
simdata->comm == NULL) {
+ if (t
_
simdata->comm == NULL) {
process->simdata->waiting_task = NULL;
/* remove the task from the mailbox */
MSG_mailbox_remove(mailbox, task);
- if (t
ask->
simdata->receiver) {
- t
ask->
simdata->receiver->simdata->waiting_task = NULL;
+ if (t
_
simdata->receiver) {
+ t
_
simdata->receiver->simdata->waiting_task = NULL;
}
- t
ask->
simdata->sender = NULL;
+ t
_
simdata->sender = NULL;
- SIMIX_mutex_unlock(t
ask->
simdata->mutex);
+ SIMIX_mutex_unlock(t
_
simdata->mutex);
MSG_RETURN(MSG_TRANSFER_FAILURE);
}
} else {
@@
-414,9
+425,9
@@
MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
}
} else {
while (1) {
- SIMIX_cond_wait(t
ask->simdata->cond, task->
simdata->mutex);
+ SIMIX_cond_wait(t
_simdata->cond, t_
simdata->mutex);
- if (SIMIX_action_get_state(t
ask->
simdata->comm) !=
+ if (SIMIX_action_get_state(t
_
simdata->comm) !=
SURF_ACTION_RUNNING)
break;
}
@@
-426,19
+437,36
@@
MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, m_task_t task,
process->simdata->waiting_task = NULL;
/* the task has already finished and the pointer must be null */
- if (t
ask->
simdata->receiver) {
- t
ask->
simdata->receiver->simdata->waiting_task = NULL;
+ if (t
_
simdata->receiver) {
+ t
_
simdata->receiver->simdata->waiting_task = NULL;
}
- t
ask->
simdata->sender = NULL;
+ t
_
simdata->sender = NULL;
SIMIX_mutex_unlock(task->simdata->mutex);
-
- if (SIMIX_action_get_state(task->simdata->comm) == SURF_ACTION_DONE) {
+ if (SIMIX_action_get_state(t_simdata->comm) == SURF_ACTION_DONE) {
+ if (t_simdata->comm->refcount == 1) {
+ SIMIX_action_destroy(t_simdata->comm);
+ t_simdata->comm = NULL;
+ } else {
+ t_simdata->comm->refcount --;
+ }
MSG_RETURN(MSG_OK);
} else if (SIMIX_host_get_state(local_host->simdata->smx_host) == 0) {
+ if (t_simdata->comm->refcount == 1) {
+ SIMIX_action_destroy(t_simdata->comm);
+ t_simdata->comm = NULL;
+ } else {
+ t_simdata->comm->refcount --;
+ }
MSG_RETURN(MSG_HOST_FAILURE);
} else {
+ if (t_simdata->comm->refcount == 1) {
+ SIMIX_action_destroy(t_simdata->comm);
+ t_simdata->comm = NULL;
+ } else {
+ t_simdata->comm->refcount --;
+ }
MSG_RETURN(MSG_TRANSFER_FAILURE);
}
}