Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add a new function: MSG_task_get_with_timeout. That should be very convenient to...
authoralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 29 Jun 2005 19:53:36 +0000 (19:53 +0000)
committeralegrand <alegrand@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Wed, 29 Jun 2005 19:53:36 +0000 (19:53 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1477 48e7efb5-ca39-0410-a469-dd3cf9ba447f

include/msg/msg.h
src/msg/gos.c
src/msg/m_process.c
src/msg/private.h

index 6cf8df0..b0399ca 100644 (file)
@@ -87,6 +87,8 @@ const char *MSG_task_get_name(m_task_t task);
 MSG_error_t MSG_task_destroy(m_task_t task);
 
 MSG_error_t MSG_task_get(m_task_t * task, m_channel_t channel);
+MSG_error_t MSG_task_get_with_time_out(m_task_t * task, m_channel_t channel,
+                                      double max_duration);
 MSG_error_t MSG_task_put(m_task_t task, m_host_t dest, 
                         m_channel_t channel);
 MSG_error_t MSG_task_put_bounded(m_task_t task,
index 61be3c9..0487c95 100644 (file)
@@ -29,7 +29,7 @@ MSG_error_t MSG_process_start(m_process_t process)
 /** \ingroup msg_gos_functions
  * \brief Listen on a channel and wait for receiving a task.
  *
- * It takes two parameter.
+ * It takes two parameters.
  * \param task a memory location for storing a #m_task_t. It will
    hold a task when this function will return. Thus \a task should not
    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
@@ -42,6 +42,31 @@ MSG_error_t MSG_process_start(m_process_t process)
  */
 MSG_error_t MSG_task_get(m_task_t * task,
                         m_channel_t channel)
+{
+  return MSG_task_get_with_time_out(task, channel, -1);
+}
+
+/** \ingroup msg_gos_functions
+ * \brief Listen on a channel and wait for receiving a task with a timeout.
+ *
+ * It takes three parameters.
+ * \param task a memory location for storing a #m_task_t. It will
+   hold a task when this function will return. Thus \a task should not
+   be equal to \c NULL and \a *task should be equal to \c NULL. If one of
+   those two condition does not hold, there will be a warning message.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \param timeout the maximum time to wait for a task before giving
+    up. In such a case, \a task will not be modified and will still be
+    equal to \c NULL when returning.
+ * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
+   if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
+ */
+
+MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
+                                      m_channel_t channel,
+                                      double max_duration)
 {
   m_process_t process = MSG_process_self();
   m_task_t t = NULL;
@@ -49,6 +74,7 @@ MSG_error_t MSG_task_get(m_task_t * task,
   simdata_task_t t_simdata = NULL;
   simdata_host_t h_simdata = NULL;
   int warning = 0;
+  int first_time = 1;
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
   
   CHECK_HOST();
@@ -65,16 +91,26 @@ MSG_error_t MSG_task_get(m_task_t * task,
   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
 
   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
+    if(max_duration>0) {
+      if(!first_time) {
+       MSG_RETURN(MSG_OK);
+      }
+    }
     xbt_assert2(!(h_simdata->sleeping[channel]),
                "A process (%s(%d)) is already blocked on this channel",
                h_simdata->sleeping[channel]->name,
                h_simdata->sleeping[channel]->simdata->PID);
     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
-    __MSG_process_block();
+    if(max_duration>0) {
+      __MSG_process_block(max_duration);
+    } else {
+      __MSG_process_block(-1);
+    }
     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
     h_simdata->sleeping[channel] = NULL;
+    first_time = 0;
     /* OK, we should both be ready now. Are you there ? */
   }
 
@@ -231,7 +267,7 @@ MSG_error_t MSG_task_put(m_task_t task,
   process->simdata->put_host = dest;
   process->simdata->put_channel = channel;
   while(!(task_simdata->comm)) 
-    __MSG_process_block();
+    __MSG_process_block(-1);
   surf_workstation_resource->common_public->action_use(task_simdata->comm);
   process->simdata->put_host = NULL;
   process->simdata->put_channel = -1;
index f43f114..ace1951 100644 (file)
@@ -441,7 +441,7 @@ int MSG_process_isSuspended(m_process_t process)
   return (process->simdata->suspended);
 }
 
-MSG_error_t __MSG_process_block(void)
+int __MSG_process_block(double max_duration)
 {
   m_process_t process = MSG_process_self();
 
@@ -453,6 +453,9 @@ MSG_error_t __MSG_process_block(void)
   process->simdata->blocked=1;
   __MSG_task_execute(process,dummy);
   surf_workstation_resource->common_public->suspend(dummy->simdata->compute);
+  if(max_duration>=0)
+    surf_workstation_resource->common_public->set_max_duration(dummy->simdata->compute, 
+                                                              max_duration);
   __MSG_wait_for_computation(process,dummy);
   process->simdata->blocked=0;
 
@@ -461,7 +464,7 @@ MSG_error_t __MSG_process_block(void)
   
   MSG_task_destroy(dummy);
 
-  return MSG_OK;
+  return 1;
 }
 
 MSG_error_t __MSG_process_unblock(m_process_t process)
index 6664878..c7732a6 100644 (file)
@@ -102,7 +102,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task);
 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task);
 MSG_error_t __MSG_task_wait_event(m_process_t process, m_task_t task);
 
-MSG_error_t __MSG_process_block(void);
+int __MSG_process_block(double max_duration);
 MSG_error_t __MSG_process_unblock(m_process_t process);
 int __MSG_process_isBlocked(m_process_t process);