X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/7d4f544e3472cef981da4cc84db078b72c8c6920..80f442e05f4079b95d52af88f58374dcbc3a8f72:/src/msg/gos.c diff --git a/src/msg/gos.c b/src/msg/gos.c index a774a0fbf1..da0b42528d 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -92,6 +92,26 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, /* Transfer */ t_simdata->using++; + while(MSG_process_is_suspended(t_simdata->sender)) { + DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", + t_simdata->sender->name); + m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task; + if(__MSG_process_isBlocked(t_simdata->sender)) { + DEBUG0("He's blocked. Let's wait for him to go in the suspended state"); + __MSG_process_unblock(t_simdata->sender); + task_to_wait_for->simdata->using++; + __MSG_task_wait_event(process, task_to_wait_for); + MSG_task_destroy(task_to_wait_for); + } else { + DEBUG0("He's suspended. Let's wait for him to go in the resumed state"); + task_to_wait_for->simdata->using++; + __MSG_task_wait_event(process, task_to_wait_for); + MSG_task_destroy(task_to_wait_for); + DEBUG0("He's resumed. He should block again. So let's free him."); + __MSG_process_unblock(t_simdata->sender); + break; + } + } DEBUG0("Calling SURF for communication creation"); t_simdata->comm = surf_workstation_resource->extension_public-> communicate(MSG_process_get_host(t_simdata->sender)->simdata->host, @@ -99,8 +119,10 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task, surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t); - if(__MSG_process_isBlocked(t_simdata->sender)) + if(__MSG_process_isBlocked(t_simdata->sender)) { + DEBUG1("Unblocking %s",t_simdata->sender->name); __MSG_process_unblock(t_simdata->sender); + } PAJE_PROCESS_PUSH_STATE(process,"C"); @@ -255,7 +277,7 @@ int MSG_task_probe_from(m_channel_t channel) /** \ingroup msg_gos_functions * \brief Wait for at most \a max_duration second for a task reception on \a channel. *\a PID is updated with the PID of the first process - that triggered this event is any. + that triggered this event if any. * * It takes three parameters: * \param channel the channel on which the agent should be @@ -284,7 +306,8 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration, } if(max_duration==0.0) { - return MSG_task_probe_from(channel); + *PID = MSG_task_probe_from(channel); + MSG_RETURN(MSG_OK); } else { CHECK_HOST(); h = MSG_host_self(); @@ -439,6 +462,7 @@ MSG_error_t MSG_task_put(m_task_t task, state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm); } DEBUG0("Action terminated"); + task->simdata->rate=-1.0; /* Sets the rate back to default */ PAJE_PROCESS_POP_STATE(process); @@ -474,7 +498,6 @@ MSG_error_t MSG_task_put_bounded(m_task_t task, MSG_error_t res = MSG_OK; task->simdata->rate=max_rate; res = MSG_task_put(task, dest, channel); - task->simdata->rate=-1.0; return(res); } @@ -525,6 +548,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task) e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM; simdata_task_t simdata = task->simdata; + XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name); simdata->using++; do { __MSG_task_wait_event(process, task); @@ -537,16 +561,19 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task) if(surf_workstation_resource->common_public->action_free(simdata->compute)) simdata->compute = NULL; simdata->computation_amount = 0.0; + XBT_OUT; MSG_RETURN(MSG_OK); } else if(surf_workstation_resource->extension_public-> get_state(MSG_process_get_host(process)->simdata->host) == SURF_CPU_OFF) { if(surf_workstation_resource->common_public->action_free(simdata->compute)) simdata->compute = NULL; + XBT_OUT; MSG_RETURN(MSG_HOST_FAILURE); } else { if(surf_workstation_resource->common_public->action_free(simdata->compute)) simdata->compute = NULL; + XBT_OUT; MSG_RETURN(MSG_TASK_CANCELLED); } } @@ -558,11 +585,11 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task) * \param name a name for the object. It is for user-level information and can be NULL. * \param host_nb the number of hosts implied in the parallel task. - * \param host_list an array of #host_nb m_host_t. - * \param computation_amount an array of #host_nb + * \param host_list an array of \p host_nb m_host_t. + * \param computation_amount an array of \p host_nb doubles. computation_amount[i] is the total number of operations that have to be performed on host_list[i]. - * \param communication_amount an array of #host_nb*#host_nb doubles. + * \param communication_amount an array of \p host_nb* \p host_nb doubles. * \param data a pointer to any data may want to attach to the new object. It is for user-level information and can be NULL. It can be retrieved with the function \ref MSG_task_get_data. @@ -700,7 +727,7 @@ MSG_error_t MSG_process_sleep(double nb_sec) } /** \ingroup msg_gos_functions - * \brief Return the number of MSG tasks currently running on a + * \brief Return the number of MSG tasks currently running on * the host of the current running process. */ static int MSG_get_msgload(void) @@ -716,7 +743,7 @@ static int MSG_get_msgload(void) /** \ingroup msg_gos_functions * - * \brief Return the the last value returned by a MSG function (except + * \brief Return the last value returned by a MSG function (except * MSG_get_errno...). */ MSG_error_t MSG_get_errno(void)