#include "private.h"
#include "xbt/sysdep.h"
#include "xbt/log.h"
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
"Logging specific to MSG (gos)");
/** \defgroup msg_gos_functions MSG Operating System Functions
MSG_RETURN(MSG_OK);
}
}
- xbt_assert2(!(h_simdata->sleeping[channel]),
- "A process (%s(%d)) is already blocked on this channel",
+ xbt_assert3(!(h_simdata->sleeping[channel]),
+ "A process (%s(%d)) is already blocked on channel %d",
h_simdata->sleeping[channel]->name,
- h_simdata->sleeping[channel]->simdata->PID);
+ h_simdata->sleeping[channel]->simdata->PID,
+ channel);
h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
if(max_duration>0) {
__MSG_process_block(max_duration);
} else {
__MSG_process_block(-1);
}
+ h_simdata->sleeping[channel] = NULL;
+ first_time = 0;
if(surf_workstation_resource->extension_public->get_state(h_simdata->host)
== SURF_CPU_OFF)
MSG_RETURN(MSG_HOST_FAILURE);
- h_simdata->sleeping[channel] = NULL;
- first_time = 0;
/* OK, we should both be ready now. Are you there ? */
}
/* Transfer */
t_simdata->using++;
+ while(MSG_process_is_suspended(t_simdata->sender)) {
+ DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him",
+ t_simdata->sender->name);
+ m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
+ if(__MSG_process_isBlocked(t_simdata->sender)) {
+ DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
+ __MSG_process_unblock(t_simdata->sender);
+ task_to_wait_for->simdata->using++;
+ __MSG_task_wait_event(process, task_to_wait_for);
+ MSG_task_destroy(task_to_wait_for);
+ } else {
+ DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
+ task_to_wait_for->simdata->using++;
+ __MSG_task_wait_event(process, task_to_wait_for);
+ MSG_task_destroy(task_to_wait_for);
+ DEBUG0("He's resumed. He should block again. So let's free him.");
+ __MSG_process_unblock(t_simdata->sender);
+ break;
+ }
+ }
DEBUG0("Calling SURF for communication creation");
t_simdata->comm = surf_workstation_resource->extension_public->
communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
- if(__MSG_process_isBlocked(t_simdata->sender))
+ if(__MSG_process_isBlocked(t_simdata->sender)) {
+ DEBUG1("Unblocking %s",t_simdata->sender->name);
__MSG_process_unblock(t_simdata->sender);
+ }
- PAJE_PROCESS_PUSH_STATE(process,"C");
+ PAJE_PROCESS_PUSH_STATE(process,"C",t);
do {
DEBUG0("Waiting for action termination");
/** \ingroup msg_gos_functions
* \brief Wait for at most \a max_duration second for a task reception
on \a channel. *\a PID is updated with the PID of the first process
- that triggered this event is any.
+ that triggered this event if any.
*
* It takes three parameters:
* \param channel the channel on which the agent should be
}
if(max_duration==0.0) {
- return MSG_task_probe_from(channel);
+ *PID = MSG_task_probe_from(channel);
+ MSG_RETURN(MSG_OK);
} else {
CHECK_HOST();
h = MSG_host_self();
task_simdata = task->simdata;
task_simdata->sender = process;
task_simdata->source = MSG_process_get_host(process);
- xbt_assert0(task_simdata->using==1,"Gargl!");
+ xbt_assert0(task_simdata->using==1,
+ "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
task_simdata->comm = NULL;
local_host = ((simdata_process_t) process->simdata)->host;
while(!(task_simdata->comm)) {
DEBUG0("Communication not initiated yet. Let's block!");
__MSG_process_block(-1);
+ if(surf_workstation_resource->extension_public->
+ get_state(local_host->simdata->host) == SURF_CPU_OFF) {
+ xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
+ task);
+ MSG_task_destroy(task);
+ MSG_RETURN(MSG_HOST_FAILURE);
+ }
}
DEBUG0("Registering to this communication");
surf_workstation_resource->common_public->action_use(task_simdata->comm);
process->simdata->put_channel = -1;
- PAJE_PROCESS_PUSH_STATE(process,"C");
+ PAJE_PROCESS_PUSH_STATE(process,"C",task);
state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
while (state==SURF_ACTION_RUNNING) {
state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
}
DEBUG0("Action terminated");
+ task->simdata->rate=-1.0; /* Sets the rate back to default */
PAJE_PROCESS_POP_STATE(process);
MSG_error_t res = MSG_OK;
task->simdata->rate=max_rate;
res = MSG_task_put(task, dest, channel);
- task->simdata->rate=-1.0;
return(res);
}
__MSG_task_execute(process, task);
- PAJE_PROCESS_PUSH_STATE(process,"E");
+ PAJE_PROCESS_PUSH_STATE(process,"E",task);
res = __MSG_wait_for_computation(process,task);
PAJE_PROCESS_POP_STATE(process);
return res;
CHECK_HOST();
simdata = task->simdata;
-
+ xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
+ "This taks is executed somewhere else. Go fix your code!");
+ task->simdata->using++;
simdata->compute = surf_workstation_resource->extension_public->
execute(MSG_process_get_host(process)->simdata->host,
simdata->computation_amount);
set_priority(simdata->compute, simdata->priority);
surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
+ task->simdata->using--;
}
MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
simdata_task_t simdata = task->simdata;
+ XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
simdata->using++;
do {
__MSG_task_wait_event(process, task);
if(surf_workstation_resource->common_public->action_free(simdata->compute))
simdata->compute = NULL;
simdata->computation_amount = 0.0;
+ XBT_OUT;
MSG_RETURN(MSG_OK);
} else if(surf_workstation_resource->extension_public->
get_state(MSG_process_get_host(process)->simdata->host)
== SURF_CPU_OFF) {
if(surf_workstation_resource->common_public->action_free(simdata->compute))
simdata->compute = NULL;
+ XBT_OUT;
MSG_RETURN(MSG_HOST_FAILURE);
} else {
if(surf_workstation_resource->common_public->action_free(simdata->compute))
simdata->compute = NULL;
+ XBT_OUT;
MSG_RETURN(MSG_TASK_CANCELLED);
}
}
}
/** \ingroup msg_gos_functions
- * \brief Return the number of MSG tasks currently running on a
+ * \brief Return the number of MSG tasks currently running on
* the host of the current running process.
*/
static int MSG_get_msgload(void)
/** \ingroup msg_gos_functions
*
- * \brief Return the the last value returned by a MSG function (except
+ * \brief Return the last value returned by a MSG function (except
* MSG_get_errno...).
*/
MSG_error_t MSG_get_errno(void)