Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Automatic update
[simgrid.git] / src / msg / gos.c
index 3831fa4..61be3c9 100644 (file)
@@ -61,7 +61,10 @@ MSG_error_t MSG_task_get(m_task_t * task,
   /* Get the task */
   h = MSG_host_self();
   h_simdata = h->simdata;
-  while ((t = xbt_fifo_pop(h_simdata->mbox[channel])) == NULL) {
+
+  DEBUG2("Waiting for a task on channel %d (%s)", channel,h->name);
+
+  while ((t = xbt_fifo_shift(h_simdata->mbox[channel])) == NULL) {
     xbt_assert2(!(h_simdata->sleeping[channel]),
                "A process (%s(%d)) is already blocked on this channel",
                h_simdata->sleeping[channel]->name,
@@ -103,14 +106,23 @@ MSG_error_t MSG_task_get(m_task_t * task,
     xbt_context_yield();
   }
 
-  PAJE_PROCESS_POP_STATE(process);  
+  PAJE_PROCESS_POP_STATE(process);
   PAJE_COMM_STOP(process,t,channel);
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->get_state(h_simdata->host) 
+         == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else {
+    if(surf_workstation_resource->common_public->action_free(t_simdata->comm)) 
+      t_simdata->comm = NULL;
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
 }
 
 /** \ingroup msg_gos_functions
@@ -127,6 +139,7 @@ int MSG_task_Iprobe(m_channel_t channel)
   m_host_t h = NULL;
   simdata_host_t h_simdata = NULL;
 
+  DEBUG2("Probing on channel %d (%s)", channel,h->name);
   CHECK_HOST();
   h = MSG_host_self();
   h_simdata = h->simdata;
@@ -152,12 +165,14 @@ int MSG_task_probe_from(m_channel_t channel)
   CHECK_HOST();
   h = MSG_host_self();
   h_simdata = h->simdata;
+
+  DEBUG2("Probing on channel %d (%s)", channel,h->name);
    
-  item = xbt_fifo_getFirstItem(((simdata_host_t)h->simdata)->mbox[channel]);
-  if (!item || !(t = xbt_fifo_get_item_content(item)) || (simdata_task_t)t->simdata)
+  item = xbt_fifo_getFirstItem(h->simdata->mbox[channel]);
+  if (!item || !(t = xbt_fifo_get_item_content(item)))
     return -1;
    
-  return MSG_process_get_PID(((simdata_task_t)t->simdata)->sender);
+  return MSG_process_get_PID(t->simdata->sender);
 }
 
 /** \ingroup msg_gos_functions
@@ -202,6 +217,9 @@ MSG_error_t MSG_task_put(m_task_t task,
   local_host = ((simdata_process_t) process->simdata)->host;
   remote_host = dest;
 
+  DEBUG4("Trying to send a task (%lg Mb) from %s to %s on channel %d", 
+        task->simdata->message_size,local_host->name, remote_host->name, channel);
+
   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
                mbox[channel], task);
 
@@ -214,6 +232,7 @@ MSG_error_t MSG_task_put(m_task_t task,
   process->simdata->put_channel = channel;
   while(!(task_simdata->comm)) 
     __MSG_process_block();
+  surf_workstation_resource->common_public->action_use(task_simdata->comm);
   process->simdata->put_host = NULL;
   process->simdata->put_channel = -1;
 
@@ -230,11 +249,20 @@ MSG_error_t MSG_task_put(m_task_t task,
 
   PAJE_PROCESS_POP_STATE(process);  
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->get_state(local_host->simdata->host) 
+           == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else { 
+    if(surf_workstation_resource->common_public->action_free(task_simdata->comm)) 
+      task_simdata->comm = NULL;
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
 }
 
 /** \ingroup msg_gos_functions
@@ -266,6 +294,9 @@ MSG_error_t MSG_task_execute(m_task_t task)
 {
   m_process_t process = MSG_process_self();
   MSG_error_t res;
+
+  DEBUG1("Computing on %s", process->simdata->host->name);
+
   __MSG_task_execute(process, task);
 
   PAJE_PROCESS_PUSH_STATE(process,"E");  
@@ -301,12 +332,21 @@ MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
   simdata->using--;
     
 
-  if(state == SURF_ACTION_DONE) MSG_RETURN(MSG_OK);
-  else if(surf_workstation_resource->extension_public->
-         get_state(MSG_process_get_host(process)->simdata->host) 
-         == SURF_CPU_OFF)
+  if(state == SURF_ACTION_DONE) {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
+    MSG_RETURN(MSG_OK);
+  } else if(surf_workstation_resource->extension_public->
+           get_state(MSG_process_get_host(process)->simdata->host) 
+           == SURF_CPU_OFF) {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
     MSG_RETURN(MSG_HOST_FAILURE);
-  else MSG_RETURN(MSG_TRANSFER_FAILURE);
+  } else {
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
+    MSG_RETURN(MSG_TRANSFER_FAILURE);
+  }
 }
 
 /** \ingroup msg_gos_functions
@@ -343,16 +383,23 @@ MSG_error_t MSG_process_sleep(double nb_sec)
   if(state == SURF_ACTION_DONE) {
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
-       == SURF_CPU_OFF)
+       == SURF_CPU_OFF) {
+      if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+       simdata->compute = NULL;
       MSG_RETURN(MSG_HOST_FAILURE);
-
+    }
     if(__MSG_process_isBlocked(process)) {
       __MSG_process_unblock(MSG_process_self());
     }
     if(surf_workstation_resource->extension_public->
        get_state(MSG_process_get_host(process)->simdata->host) 
-       == SURF_CPU_OFF)
+       == SURF_CPU_OFF) {
+      if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+       simdata->compute = NULL;
       MSG_RETURN(MSG_HOST_FAILURE);
+    }
+    if(surf_workstation_resource->common_public->action_free(simdata->compute)) 
+      simdata->compute = NULL;
     MSG_task_destroy(dummy);
     MSG_RETURN(MSG_OK);
   } else MSG_RETURN(MSG_HOST_FAILURE);
@@ -364,10 +411,12 @@ MSG_error_t MSG_process_sleep(double nb_sec)
  */
 int MSG_get_msgload(void) 
 {
+  m_process_t process;
+   
   CHECK_HOST();
-  xbt_assert0(0,"Not implemented yet!");
   
-  return 1;
+  process = MSG_process_self();
+  return xbt_fifo_size(process->simdata->host->simdata->process_list);
 }
 
 /** \ingroup msg_gos_functions