From: alegrand Date: Wed, 29 Jun 2005 19:53:36 +0000 (+0000) Subject: Add a new function: MSG_task_get_with_timeout. That should be very convenient to... X-Git-Tag: v3.3~3878 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/2d4f44983b405f88e6320243ef5781d0d654618c?hp=7b8507bc1dd3b18a2350a6d4eac309e87a4e13a8 Add a new function: MSG_task_get_with_timeout. That should be very convenient to do a select... git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1477 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/include/msg/msg.h b/include/msg/msg.h index 6cf8df0d93..b0399caee7 100644 --- a/include/msg/msg.h +++ b/include/msg/msg.h @@ -87,6 +87,8 @@ const char *MSG_task_get_name(m_task_t task); MSG_error_t MSG_task_destroy(m_task_t task); MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel); +MSG_error_t MSG_task_get_with_time_out(m_task_t * task, m_channel_t channel, + double max_duration); MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, m_channel_t channel); MSG_error_t MSG_task_put_bounded(m_task_t task, diff --git a/src/msg/gos.c b/src/msg/gos.c index 61be3c9a4e..0487c955de 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -29,7 +29,7 @@ MSG_error_t MSG_process_start(m_process_t process) /** \ingroup msg_gos_functions * \brief Listen on a channel and wait for receiving a task. * - * It takes two parameter. + * It takes two parameters. * \param task a memory location for storing a #m_task_t. It will hold a task when this function will return. Thus \a task should not be equal to \c NULL and \a *task should be equal to \c NULL. If one of @@ -42,6 +42,31 @@ MSG_error_t MSG_process_start(m_process_t process) */ MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel) +{ + return MSG_task_get_with_time_out(task, channel, -1); +} + +/** \ingroup msg_gos_functions + * \brief Listen on a channel and wait for receiving a task with a timeout. + * + * It takes three parameters. + * \param task a memory location for storing a #m_task_t. It will + hold a task when this function will return. Thus \a task should not + be equal to \c NULL and \a *task should be equal to \c NULL. If one of + those two condition does not hold, there will be a warning message. + * \param channel the channel on which the agent should be + listening. This value has to be >=0 and < than the maximal + number of channels fixed with MSG_set_channel_number(). + * \param timeout the maximum time to wait for a task before giving + up. In such a case, \a task will not be modified and will still be + equal to \c NULL when returning. + * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING + if \a *task is not equal to \c NULL, and #MSG_OK otherwise. + */ + +MSG_error_t MSG_task_get_with_time_out(m_task_t * task, + m_channel_t channel, + double max_duration) { m_process_t process = MSG_process_self(); m_task_t t = NULL; @@ -49,6 +74,7 @@ MSG_error_t MSG_task_get(m_task_t * task, simdata_task_t t_simdata = NULL; simdata_host_t h_simdata = NULL; int warning = 0; + int first_time = 1; e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM; CHECK_HOST(); @@ -65,16 +91,26 @@ MSG_error_t MSG_task_get(m_task_t * task, DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name); while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) { + if(max_duration>0) { + if(!first_time) { + MSG_RETURN(MSG_OK); + } + } xbt_assert2(!(h_simdata->sleeping[channel]), "A process (%s(%d)) is already blocked on this channel", h_simdata->sleeping[channel]->name, h_simdata->sleeping[channel]->simdata->PID); h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */ - __MSG_process_block(); + if(max_duration>0) { + __MSG_process_block(max_duration); + } else { + __MSG_process_block(-1); + } if(surf_workstation_resource->extension_public->get_state(h_simdata->host) == SURF_CPU_OFF) MSG_RETURN(MSG_HOST_FAILURE); h_simdata->sleeping[channel] = NULL; + first_time = 0; /* OK, we should both be ready now. Are you there ? */ } @@ -231,7 +267,7 @@ MSG_error_t MSG_task_put(m_task_t task, process->simdata->put_host = dest; process->simdata->put_channel = channel; while(!(task_simdata->comm)) - __MSG_process_block(); + __MSG_process_block(-1); surf_workstation_resource->common_public->action_use(task_simdata->comm); process->simdata->put_host = NULL; process->simdata->put_channel = -1; diff --git a/src/msg/m_process.c b/src/msg/m_process.c index f43f114201..ace1951188 100644 --- a/src/msg/m_process.c +++ b/src/msg/m_process.c @@ -441,7 +441,7 @@ int MSG_process_isSuspended(m_process_t process) return (process->simdata->suspended); } -MSG_error_t __MSG_process_block(void) +int __MSG_process_block(double max_duration) { m_process_t process = MSG_process_self(); @@ -453,6 +453,9 @@ MSG_error_t __MSG_process_block(void) process->simdata->blocked=1; __MSG_task_execute(process,dummy); surf_workstation_resource->common_public->suspend(dummy->simdata->compute); + if(max_duration>=0) + surf_workstation_resource->common_public->set_max_duration(dummy->simdata->compute, + max_duration); __MSG_wait_for_computation(process,dummy); process->simdata->blocked=0; @@ -461,7 +464,7 @@ MSG_error_t __MSG_process_block(void) MSG_task_destroy(dummy); - return MSG_OK; + return 1; } MSG_error_t __MSG_process_unblock(m_process_t process) diff --git a/src/msg/private.h b/src/msg/private.h index 666487864e..c7732a621f 100644 --- a/src/msg/private.h +++ b/src/msg/private.h @@ -102,7 +102,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task); MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task); MSG_error_t __MSG_task_wait_event(m_process_t process, m_task_t task); -MSG_error_t __MSG_process_block(void); +int __MSG_process_block(double max_duration); MSG_error_t __MSG_process_unblock(m_process_t process); int __MSG_process_isBlocked(m_process_t process);