Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Adding more informations when blocking. Should help debugging.
[simgrid.git] / src / msg / gos.c
index 76d4dac..cd12369 100644 (file)
@@ -8,7 +8,7 @@
 #include "private.h"
 #include "xbt/sysdep.h"
 #include "xbt/log.h"
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(gos, msg,
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg,
                                "Logging specific to MSG (gos)");
 
 /** \defgroup msg_gos_functions MSG Operating System Functions
@@ -53,7 +53,10 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
        xbt_fifo_foreach(h->simdata->mbox[channel],item,t,m_task_t) {
          if(t->simdata->source==host) break;
        }
-       if(item) break;
+       if(item) {
+         xbt_fifo_remove_item(h->simdata->mbox[channel],item);
+         break;
+       } 
       }
     }
                                                       
@@ -62,21 +65,22 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
        MSG_RETURN(MSG_OK);
       }
     }
-    xbt_assert2(!(h_simdata->sleeping[channel]),
-               "A process (%s(%d)) is already blocked on this channel",
+    xbt_assert3(!(h_simdata->sleeping[channel]),
+               "A process (%s(%d)) is already blocked on channel %d",
                h_simdata->sleeping[channel]->name,
-               h_simdata->sleeping[channel]->simdata->PID);
+               h_simdata->sleeping[channel]->simdata->PID,
+               channel);
     h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
     if(max_duration>0) {
-      __MSG_process_block(max_duration);
+      __MSG_process_block(max_duration,"");
     } else {
-      __MSG_process_block(-1);
+      __MSG_process_block(-1,"");
     }
+    h_simdata->sleeping[channel] = NULL;
+    first_time = 0;
     if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
        == SURF_CPU_OFF)
       MSG_RETURN(MSG_HOST_FAILURE);
-    h_simdata->sleeping[channel] = NULL;
-    first_time = 0;
     /* OK, we should both be ready now. Are you there ? */
   }
 
@@ -89,6 +93,26 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
   /* Transfer */
   t_simdata->using++;
 
+  while(MSG_process_is_suspended(t_simdata->sender)) {
+    DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
+          t_simdata->sender->name);
+    m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
+    if(__MSG_process_isBlocked(t_simdata->sender)) {
+      DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
+      __MSG_process_unblock(t_simdata->sender);
+      task_to_wait_for->simdata->using++;
+      __MSG_task_wait_event(process, task_to_wait_for);
+      MSG_task_destroy(task_to_wait_for);
+    } else {
+      DEBUG0("He's suspended. Let's wait for him to go in the resumed state");
+      task_to_wait_for->simdata->using++;
+      __MSG_task_wait_event(process, task_to_wait_for);
+      MSG_task_destroy(task_to_wait_for);
+      DEBUG0("He's resumed. He should block again. So let's free him.");
+      __MSG_process_unblock(t_simdata->sender);
+      break;
+    }
+  }
   DEBUG0("Calling SURF for communication creation");
   t_simdata->comm = surf_workstation_resource->extension_public->
     communicate(MSG_process_get_host(t_simdata->sender)->simdata->host,
@@ -96,10 +120,12 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
   
   surf_workstation_resource->common_public->action_set_data(t_simdata->comm,t);
 
-  if(__MSG_process_isBlocked(t_simdata->sender))
+  if(__MSG_process_isBlocked(t_simdata->sender)) {
+    DEBUG1("Unblocking %s",t_simdata->sender->name);
     __MSG_process_unblock(t_simdata->sender);
+  }
 
-  PAJE_PROCESS_PUSH_STATE(process,"C");  
+  PAJE_PROCESS_PUSH_STATE(process,"C",t);  
 
   do {
     DEBUG0("Waiting for action termination");
@@ -252,7 +278,7 @@ int MSG_task_probe_from(m_channel_t channel)
 /** \ingroup msg_gos_functions
  * \brief Wait for at most \a max_duration second for a task reception
    on \a channel. *\a PID is updated with the PID of the first process
-   that triggered this event is any.
+   that triggered this event if any.
  *
  * It takes three parameters:
  * \param channel the channel on which the agent should be
@@ -281,7 +307,8 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
   }
 
   if(max_duration==0.0) {
-    return MSG_task_probe_from(channel);
+    *PID = MSG_task_probe_from(channel);
+    MSG_RETURN(MSG_OK);
   } else {
     CHECK_HOST();
     h = MSG_host_self();
@@ -300,9 +327,9 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
                  h_simdata->sleeping[channel]->simdata->PID);
       h_simdata->sleeping[channel] = process; /* I'm waiting. Wake me up when you're ready */
       if(max_duration>0) {
-       __MSG_process_block(max_duration);
+       __MSG_process_block(max_duration,"");
       } else {
-       __MSG_process_block(-1);
+       __MSG_process_block(-1,"");
       }
       if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
         == SURF_CPU_OFF) {
@@ -396,7 +423,8 @@ MSG_error_t MSG_task_put(m_task_t task,
   task_simdata = task->simdata;
   task_simdata->sender = process;
   task_simdata->source = MSG_process_get_host(process);
-  xbt_assert0(task_simdata->using==1,"Gargl!");
+  xbt_assert0(task_simdata->using==1,
+             "This taks is still being used somewhere else. You cannot send it now. Go fix your code!");
   task_simdata->comm = NULL;
   
   local_host = ((simdata_process_t) process->simdata)->host;
@@ -419,7 +447,14 @@ MSG_error_t MSG_task_put(m_task_t task,
   process->simdata->put_channel = channel;
   while(!(task_simdata->comm)) {
     DEBUG0("Communication not initiated yet. Let's block!");
-    __MSG_process_block(-1);
+    __MSG_process_block(-1,task->name);
+    if(surf_workstation_resource->extension_public->
+       get_state(local_host->simdata->host) == SURF_CPU_OFF) {
+      xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
+                     task);
+      MSG_task_destroy(task);
+      MSG_RETURN(MSG_HOST_FAILURE);
+    }
   }
   DEBUG0("Registering to this communication");
   surf_workstation_resource->common_public->action_use(task_simdata->comm);
@@ -427,7 +462,7 @@ MSG_error_t MSG_task_put(m_task_t task,
   process->simdata->put_channel = -1;
 
 
-  PAJE_PROCESS_PUSH_STATE(process,"C");  
+  PAJE_PROCESS_PUSH_STATE(process,"C",task);  
 
   state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
   while (state==SURF_ACTION_RUNNING) {
@@ -436,6 +471,7 @@ MSG_error_t MSG_task_put(m_task_t task,
     state=surf_workstation_resource->common_public->action_get_state(task_simdata->comm);
   }
   DEBUG0("Action terminated");
+  task->simdata->rate=-1.0; /* Sets the rate back to default */
 
   PAJE_PROCESS_POP_STATE(process);  
 
@@ -471,7 +507,6 @@ MSG_error_t MSG_task_put_bounded(m_task_t task,
   MSG_error_t res = MSG_OK;
   task->simdata->rate=max_rate;
   res = MSG_task_put(task, dest, channel);
-  task->simdata->rate=-1.0;
   return(res);
 }
 
@@ -494,7 +529,7 @@ MSG_error_t MSG_task_execute(m_task_t task)
 
   __MSG_task_execute(process, task);
 
-  PAJE_PROCESS_PUSH_STATE(process,"E");  
+  PAJE_PROCESS_PUSH_STATE(process,"E",task);  
   res = __MSG_wait_for_computation(process,task);
   PAJE_PROCESS_POP_STATE(process);
   return res;
@@ -507,7 +542,9 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
   CHECK_HOST();
 
   simdata = task->simdata;
-
+  xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
+             "This taks is executed somewhere else. Go fix your code!");
+  task->simdata->using++;
   simdata->compute = surf_workstation_resource->extension_public->
     execute(MSG_process_get_host(process)->simdata->host,
            simdata->computation_amount);
@@ -515,6 +552,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
     set_priority(simdata->compute, simdata->priority);
 
   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
+  task->simdata->using--;
 }
 
 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
@@ -522,6 +560,7 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   e_surf_action_state_t state = SURF_ACTION_NOT_IN_THE_SYSTEM;
   simdata_task_t simdata = task->simdata;
 
+  XBT_IN4("(%p(%s) %p(%s))",process,process->name,task,task->name);
   simdata->using++;
   do {
     __MSG_task_wait_event(process, task);
@@ -534,16 +573,19 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
       simdata->compute = NULL;
     simdata->computation_amount = 0.0;
+    XBT_OUT;
     MSG_RETURN(MSG_OK);
   } else if(surf_workstation_resource->extension_public->
            get_state(MSG_process_get_host(process)->simdata->host) 
            == SURF_CPU_OFF) {
     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
       simdata->compute = NULL;
+    XBT_OUT;
     MSG_RETURN(MSG_HOST_FAILURE);
   } else {
     if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
       simdata->compute = NULL;
+    XBT_OUT;
     MSG_RETURN(MSG_TASK_CANCELLED);
   }
 }
@@ -555,11 +597,11 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
  * \param name a name for the object. It is for user-level information
    and can be NULL.
  * \param host_nb the number of hosts implied in the parallel task.
- * \param host_list an array of #host_nb m_host_t.
- * \param computation_amount an array of #host_nb
+ * \param host_list an array of \p host_nb m_host_t.
+ * \param computation_amount an array of \p host_nb
    doubles. computation_amount[i] is the total number of operations
    that have to be performed on host_list[i].
- * \param communication_amount an array of #host_nb*#host_nb doubles.
+ * \param communication_amount an array of \p host_nb* \p host_nb doubles.
  * \param data a pointer to any data may want to attach to the new
    object.  It is for user-level information and can be NULL. It can
    be retrieved with the function \ref MSG_task_get_data.
@@ -697,7 +739,7 @@ MSG_error_t MSG_process_sleep(double nb_sec)
 }
 
 /** \ingroup msg_gos_functions
- * \brief Return the number of MSG tasks currently running on a
+ * \brief Return the number of MSG tasks currently running on
  * the host of the current running process.
  */
 static int MSG_get_msgload(void) 
@@ -713,7 +755,7 @@ static int MSG_get_msgload(void)
 
 /** \ingroup msg_gos_functions
  *
- * \brief Return the the last value returned by a MSG function (except
+ * \brief Return the last value returned by a MSG function (except
  * MSG_get_errno...).
  */
 MSG_error_t MSG_get_errno(void)