Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
removing MSG_process_start from the documentation
[simgrid.git] / src / msg / gos.c
index 594cb22..4d17772 100644 (file)
@@ -16,7 +16,7 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
  *  by an agent for handling some task.
  */
 
-/** \ingroup msg_gos_functions
+/* \ingroup msg_gos_functions
  * \brief This function is now deprecated and useless. Please stop using it.
  */
 MSG_error_t MSG_process_start(m_process_t process)
@@ -29,7 +29,7 @@ MSG_error_t MSG_process_start(m_process_t process)
 /** \ingroup msg_gos_functions
  * \brief Listen on a channel and wait for receiving a task.
  *
- * It takes two parameter.
+ * It takes two parameters.
  * \param task a memory location for storing a #m_task_t. It will
    hold a task when this function will return. Thus \a task should not
    be equal to \c NULL and \a *task should be equal to \c NULL. If one of
@@ -42,6 +42,31 @@ MSG_error_t MSG_process_start(m_process_t process)
  */
 MSG_error_t MSG_task_get(m_task_t * task,
                         m_channel_t channel)
+{
+  return MSG_task_get_with_time_out(task, channel, -1);
+}
+
+/** \ingroup msg_gos_functions
+ * \brief Listen on a channel and wait for receiving a task with a timeout.
+ *
+ * It takes three parameters.
+ * \param task a memory location for storing a #m_task_t. It will
+   hold a task when this function will return. Thus \a task should not
+   be equal to \c NULL and \a *task should be equal to \c NULL. If one of
+   those two condition does not hold, there will be a warning message.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \param timeout the maximum time to wait for a task before giving
+    up. In such a case, \a task will not be modified and will still be
+    equal to \c NULL when returning.
+ * \return #MSG_FATAL if \a task is equal to \c NULL, #MSG_WARNING
+   if \a *task is not equal to \c NULL, and #MSG_OK otherwise.
+ */
+
+MSG_error_t MSG_task_get_with_time_out(m_task_t * task,
+                                      m_channel_t channel,
+                                      double max_duration)
 {
   m_process_t process = MSG_process_self();
   m_task_t t = NULL;
@@ -49,6 +74,7 @@ MSG_error_t MSG_task_get(m_task_t * task,
   simdata_task_t t_simdata = NULL;
   simdata_host_t h_simdata = NULL;
   int warning = 0;
+  int first_time = 1;
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
   
   CHECK_HOST();
@@ -65,16 +91,26 @@ MSG_error_t MSG_task_get(m_task_t * task,
   DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
 
   while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
+    if(max_duration>0) {
+      if(!first_time) {
+       MSG_RETURN(MSG_OK);
+      }
+    }
     xbt_assert2(!(h_simdata->sleeping[channel]),
                "A process (%s(%d)) is already blocked on this channel",
                h_simdata->sleeping[channel]->name,
                h_simdata->sleeping[channel]->simdata->PID);
     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
-    __MSG_process_block();
+    if(max_duration>0) {
+      __MSG_process_block(max_duration);
+    } else {
+      __MSG_process_block(-1);
+    }
     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
     h_simdata->sleeping[channel] = NULL;
+    first_time = 0;
     /* OK, we should both be ready now. Are you there ? */
   }
 
@@ -106,14 +142,23 @@ MSG_error_t MSG_task_get(m_task_t * task,
     xbt_context_yield();
   }
 
-  PAJE_PROCESS_POP_STATE(process);  
+  PAJE_PROCESS_POP_STATE(process);
   PAJE_COMM_STOP(process,t,channel);
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
+         == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
 }
 
 /** \ingroup msg_gos_functions
@@ -166,6 +211,60 @@ int MSG_task_probe_from(m_channel_t channel)
   return MSG_process_get_PID(t->simdata->sender);
 }
 
+MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
+                                   int *PID)
+{
+  m_host_t h = NULL;
+  simdata_host_t h_simdata = NULL;
+  xbt_fifo_item_t item;
+  m_task_t t;
+  int first_time = 1;
+  m_process_t process = MSG_process_self();
+
+  if(PID) {
+    *PID = -1;
+  }
+
+  if(max_duration==0.0) {
+    return MSG_task_probe_from(channel);
+  } else {
+    CHECK_HOST();
+    h = MSG_host_self();
+    h_simdata = h->simdata;
+    
+    DEBUG2("Probing on channel %d (%s)", channel,h->name);
+    while(!(item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]))) {
+      if(max_duration>0) {
+       if(!first_time) {
+         MSG_RETURN(MSG_OK);
+       }
+      }
+      xbt_assert2(!(h_simdata->sleeping[channel]),
+                 "A process (%s(%d)) is already blocked on this channel",
+                 h_simdata->sleeping[channel]->name,
+                 h_simdata->sleeping[channel]->simdata->PID);
+      h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
+      if(max_duration>0) {
+       __MSG_process_block(max_duration);
+      } else {
+       __MSG_process_block(-1);
+      }
+      if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
+        == SURF_CPU_OFF) {
+       MSG_RETURN(MSG_HOST_FAILURE);
+      }
+      h_simdata->sleeping[channel] = NULL;
+      first_time = 0;
+    }
+    if (!item || !(t = xbt_fifo_get_item_content(item))) {
+      MSG_RETURN(MSG_OK);
+    }
+    if(PID) {
+      *PID = MSG_process_get_PID(t->simdata->sender);
+    }
+    MSG_RETURN(MSG_OK);
+  }
+}
 /** \ingroup msg_gos_functions
  * \brief Put a task on a channel of an host and waits for the end of the
  * transmission.
@@ -222,7 +321,8 @@ MSG_error_t MSG_task_put(m_task_t task,
   process->simdata->put_host = dest;
   process->simdata->put_channel = channel;
   while(!(task_simdata->comm)) 
-    __MSG_process_block();
+    __MSG_process_block(-1);
+  surf_workstation_resource->common_public->action_use(task_simdata->comm);
   process->simdata->put_host = NULL;
   process->simdata->put_channel = -1;
 
@@ -239,11 +339,20 @@ MSG_error_t MSG_task_put(m_task_t task,
 
   PAJE_PROCESS_POP_STATE(process);  
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
+           == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else { 
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
 }
 
 /** \ingroup msg_gos_functions
@@ -313,12 +422,21 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   simdata->using--;
     
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->
-         get_state(MSG_process_get_host(process)->simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->
+           get_state(MSG_process_get_host(process)->simdata->host) 
+           == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
 }
 
 /** \ingroup msg_gos_functions
@@ -355,16 +473,23 @@ MSG_error_t MSG_process_sleep(double nb_sec)
   if(state == SURF_ACTION_DONE) {
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
-       == SURF_CPU_OFF)
+       == SURF_CPU_OFF) {
+      if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+       simdata->compute = NULL;
       MSG_RETURN(MSG_HOST_FAILURE);
-
+    }
     if(__MSG_process_isBlocked(process)) {
       __MSG_process_unblock(MSG_process_self());
     }
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
-       == SURF_CPU_OFF)
+       == SURF_CPU_OFF) {
+      if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+       simdata->compute = NULL;
       MSG_RETURN(MSG_HOST_FAILURE);
+    }
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
     MSG_task_destroy(dummy);
     MSG_RETURN(MSG_OK);
   } else MSG_RETURN(MSG_HOST_FAILURE);
@@ -374,12 +499,13 @@ MSG_error_t MSG_process_sleep(double nb_sec)
  * \brief Return the number of MSG tasks currently running on a
  * the host of the current running process.
  */
-int MSG_get_msgload(void) 
+static int MSG_get_msgload(void) 
 {
   m_process_t process;
    
   CHECK_HOST();
   
+  xbt_assert0(0, "This function is still to be specified correctly (what do you mean by 'load', exactly?). In the meantime, please don't use it");
   process = MSG_process_self();
   return xbt_fifo_size(process->simdata->host->simdata->process_list);
 }