Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
MSG_task_get is now fair...
[simgrid.git] / src / msg / gos.c
index 18094ea..9dcfb54 100644 (file)
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
                                "Logging specific to MSG (gos)");
 
+/** \defgroup msg_gos_functions MSG Operating System Functions
+ *  \brief This section describes the functions that can be used
+ *  by an agent for handling some task.
+ */
+
 /** \ingroup msg_gos_functions
  * \brief This function is now deprecated and useless. Please stop using it.
  */
-
 MSG_error_t MSG_process_start(m_process_t process)
 {
   xbt_assert0(0,"This function is now deprecated and useless. Please stop using it.");
@@ -57,11 +61,16 @@ MSG_error_t MSG_task_get(m_task_t * task,
   /* Get the task */
   h = MSG_host_self();
   h_simdata = h->simdata;
-  while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
-    xbt_assert0(!(h_simdata->sleeping[channel]),
-               "A process is already blocked on this channel");
+
+  DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
+
+  while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
+    xbt_assert2(!(h_simdata->sleeping[channel]),
+               "A process (%s(%d)) is already blocked on this channel",
+               h_simdata->sleeping[channel]->name,
+               h_simdata->sleeping[channel]->simdata->PID);
     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
-    MSG_process_suspend(process);
+    __MSG_process_block();
     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
@@ -75,16 +84,31 @@ MSG_error_t MSG_task_get(m_task_t * task,
 
   /* Transfer */
   t_simdata->using++;
+
   t_simdata->comm = surf_workstation_resource->extension_public->
     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
-               h->simdata->host, t_simdata->message_size);
+               h->simdata->host, t_simdata->message_size,t_simdata->rate);
+  
   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
 
+  if(__MSG_process_isBlocked(t_simdata->sender)) 
+    __MSG_process_unblock(t_simdata->sender);
+
+  PAJE_PROCESS_PUSH_STATE(process,"C");  
+
   do {
     __MSG_task_wait_event(process, t);
     state=surf_workstation_resource->common_public->action_get_state(t_simdata->comm);
   } while (state==SURF_ACTION_RUNNING);
 
+  if(t->simdata->using>1) {
+    xbt_fifo_unshift(msg_global->process_to_run,process);
+    xbt_context_yield();
+  }
+
+  PAJE_PROCESS_POP_STATE(process);  
+  PAJE_COMM_STOP(process,t,channel);
+
   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
   else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
          == SURF_CPU_OFF)
@@ -106,12 +130,42 @@ int MSG_task_Iprobe(m_channel_t channel)
   m_host_t h = NULL;
   simdata_host_t h_simdata = NULL;
 
+  DEBUG2("Probing on channel %d (%s)", channel,h->name);
   CHECK_HOST();
   h = MSG_host_self();
   h_simdata = h->simdata;
   return(xbt_fifo_getFirstItem(h_simdata->mbox[channel])!=NULL);
 }
 
+/** \ingroup msg_gos_functions
+ * \brief Test whether there is a pending communication on a channel, and who sent it.
+ *
+ * It takes one parameter.
+ * \param channel the channel on which the agent should be
+   listening. This value has to be >=0 and < than the maximal
+   number of channels fixed with MSG_set_channel_number().
+ * \return -1 if there is no pending communication and the PID of the process who sent it otherwise
+ */
+int MSG_task_probe_from(m_channel_t channel)
+{
+  m_host_t h = NULL;
+  simdata_host_t h_simdata = NULL;
+  xbt_fifo_item_t item;
+  m_task_t t;
+
+  CHECK_HOST();
+  h = MSG_host_self();
+  h_simdata = h->simdata;
+
+  DEBUG2("Probing on channel %d (%s)", channel,h->name);
+   
+  item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
+  if (!item || !(t = xbt_fifo_get_item_content(item)))
+    return -1;
+   
+  return MSG_process_get_PID(t->simdata->sender);
+}
+
 /** \ingroup msg_gos_functions
  * \brief Put a task on a channel of an host and waits for the end of the
  * transmission.
@@ -148,31 +202,43 @@ MSG_error_t MSG_task_put(m_task_t task,
 
   task_simdata = task->simdata;
   task_simdata->sender = process;
-
+  xbt_assert0(task_simdata->using==1,"Gargl!");
+  task_simdata->comm = NULL;
   
   local_host = ((simdata_process_t) process->simdata)->host;
   remote_host = dest;
 
+  DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d", 
+        task->simdata->message_size,local_host->name, remote_host->name, channel);
+
   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
                mbox[channel], task);
+
+  PAJE_COMM_START(process,task,channel);
     
   if(remote_host->simdata->sleeping[channel]) 
-    MSG_process_resume(remote_host->simdata->sleeping[channel]);
-  else {
-    process->simdata->put_host = dest;
-    process->simdata->put_channel = channel;
-    MSG_process_suspend(process);
-    process->simdata->put_host = NULL;
-    process->simdata->put_channel = -1;
-  }
+    __MSG_process_unblock(remote_host->simdata->sleeping[channel]);
 
-  do {
+  process->simdata->put_host = dest;
+  process->simdata->put_channel = channel;
+  while(!(task_simdata->comm)) 
+    __MSG_process_block();
+  process->simdata->put_host = NULL;
+  process->simdata->put_channel = -1;
+
+
+  PAJE_PROCESS_PUSH_STATE(process,"C");  
+
+  state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
+  while (state==SURF_ACTION_RUNNING) {
     __MSG_task_wait_event(process, task);
     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
-  } while (state==SURF_ACTION_RUNNING);
+  }
     
   MSG_task_destroy(task);
 
+  PAJE_PROCESS_POP_STATE(process);  
+
   if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
   else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
          == SURF_CPU_OFF)
@@ -180,6 +246,21 @@ MSG_error_t MSG_task_put(m_task_t task,
   else MSG_RETURN(MSG_TRANSFER_FAILURE);
 }
 
+/** \ingroup msg_gos_functions
+ * \brief Does exactly the same as MSG_task_put but with a bounded transmition 
+ * rate.
+ *
+ * \sa MSG_task_put
+ */
+MSG_error_t MSG_task_put_bounded(m_task_t task,
+                                m_host_t dest, m_channel_t channel,
+                                double max_rate)
+{
+  task->simdata->rate=max_rate;
+  return(MSG_task_put(task, dest, channel));
+  task->simdata->rate=-1.0;
+}
+
 /** \ingroup msg_gos_functions
  * \brief Executes a task and waits for its termination.
  *
@@ -193,9 +274,16 @@ MSG_error_t MSG_task_put(m_task_t task,
 MSG_error_t MSG_task_execute(m_task_t task)
 {
   m_process_t process = MSG_process_self();
+  MSG_error_t res;
+
+  DEBUG1("Computing on %s", process->simdata->host->name);
 
   __MSG_task_execute(process, task);
-  return __MSG_wait_for_computation(process,task);
+
+  PAJE_PROCESS_PUSH_STATE(process,"E");  
+  res = __MSG_wait_for_computation(process,task);
+  PAJE_PROCESS_POP_STATE(process);
+  return res;
 }
 
 void __MSG_task_execute(m_process_t process, m_task_t task)
@@ -240,7 +328,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
  *
  * \param nb_sec a number of second
  */
-MSG_error_t MSG_process_sleep(long double nb_sec)
+MSG_error_t MSG_process_sleep(double nb_sec)
 {
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
   m_process_t process = MSG_process_self();
@@ -270,8 +358,8 @@ MSG_error_t MSG_process_sleep(long double nb_sec)
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
 
-    if(MSG_process_isSuspended(process)) {
-      MSG_process_suspend(MSG_process_self());
+    if(__MSG_process_isBlocked(process)) {
+      __MSG_process_unblock(MSG_process_self());
     }
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
@@ -303,4 +391,3 @@ MSG_error_t MSG_get_errno(void)
 {
   return PROCESS_GET_ERRNO();
 }
-