Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bugfix. Remove the task from the remote host fifo when there is a
[simgrid.git] / src / msg / gos.c
index 678e3af..2d12f65 100644 (file)
@@ -24,6 +24,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
   m_process_t process = MSG_process_self();
   m_task_t t = NULL;
   m_host_t h = NULL;
+  m_task_t task_to_wait_for;
   simdata_task_t t_simdata = NULL;
   simdata_host_t h_simdata = NULL;
   int first_time = 1;
@@ -62,6 +63,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
                                                       
     if(max_duration>0) {
       if(!first_time) {
+       PAJE_PROCESS_POP_STATE(process);
        MSG_RETURN(MSG_TRANSFER_FAILURE);
       }
     }
@@ -96,7 +98,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
   while(MSG_process_is_suspended(t_simdata->sender)) {
     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
           t_simdata->sender->name);
-    m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
+    task_to_wait_for = t_simdata->sender->simdata->waiting_task;
     if(__MSG_process_isBlocked(t_simdata->sender)) {
       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
       __MSG_process_unblock(t_simdata->sender);
@@ -236,14 +238,12 @@ MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
 int MSG_task_Iprobe(m_channel_t channel)
 {
   m_host_t h = NULL;
-  simdata_host_t h_simdata = NULL;
 
   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
   CHECK_HOST();
   h = MSG_host_self();
-  h_simdata = h->simdata;
-  return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
+  return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
 }
 
 /** \ingroup msg_gos_functions
@@ -258,14 +258,12 @@ int MSG_task_Iprobe(m_channel_t channel)
 int MSG_task_probe_from(m_channel_t channel)
 {
   m_host_t h = NULL;
-  simdata_host_t h_simdata = NULL;
   xbt_fifo_item_t item;
   m_task_t t;
 
   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   CHECK_HOST();
   h = MSG_host_self();
-  h_simdata = h->simdata;
 
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
    
@@ -365,7 +363,6 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
  */
 int MSG_task_probe_from_host(int channel, m_host_t host)
 {
-  simdata_host_t h_simdata = NULL;
   xbt_fifo_item_t item;
   m_task_t t;
   int count = 0;
@@ -374,7 +371,6 @@ int MSG_task_probe_from_host(int channel, m_host_t host)
   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   CHECK_HOST();
   h = MSG_host_self();
-  h_simdata = h->simdata;
 
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
    
@@ -438,8 +434,8 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
   local_host = ((simdata_process_t) process->simdata)->host;
   remote_host = dest;
 
-  DEBUG4("Trying to send a task (%g Mb) from %s to %s on channel %d", 
-        task->simdata->message_size,local_host->name, remote_host->name, channel);
+  DEBUG4("Trying to send a task (%g kB) from %s to %s on channel %d", 
+        task->simdata->message_size/1000,local_host->name, remote_host->name, channel);
 
   xbt_fifo_push(((simdata_host_t) remote_host->simdata)->
                mbox[channel], task);
@@ -456,6 +452,10 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
   while(!(task_simdata->comm)) {
     if(max_duration>0) {
       if(!first_time) {
+       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
+                       task);
+       PAJE_PROCESS_POP_STATE(process);
+       PAJE_COMM_STOP(process,task,channel);
        MSG_RETURN(MSG_TRANSFER_FAILURE);
       }
     }
@@ -471,6 +471,8 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
                      task);
+      PAJE_PROCESS_POP_STATE(process);
+      PAJE_COMM_STOP(process,task,channel);
       MSG_task_destroy(task);
       MSG_RETURN(MSG_HOST_FAILURE);
     }
@@ -594,7 +596,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
   simdata = task->simdata;
   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
              "This taks is executed somewhere else. Go fix your code!");
-  task->simdata->using++;
+  simdata->using++;
   simdata->compute = surf_workstation_resource->extension_public->
     execute(MSG_process_get_host(process)->simdata->host,
            simdata->computation_amount);
@@ -602,7 +604,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
     set_priority(simdata->compute, simdata->priority);
 
   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
-  task->simdata->using--;
+  simdata->using--;
 }
 
 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
@@ -745,6 +747,8 @@ MSG_error_t MSG_process_sleep(double nb_sec)
   m_process_t process = MSG_process_self();
   m_task_t dummy = NULL;
   simdata_task_t simdata = NULL;
+  
+  xbt_assert1(nb_sec>=0,"Invalid duration %g",nb_sec);
 
   CHECK_HOST();
   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);