Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bugfix. Remove the task from the remote host fifo when there is a
[simgrid.git] / src / msg / gos.c
index d255d1d..2d12f65 100644 (file)
@@ -24,6 +24,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
   m_process_t process = MSG_process_self();
   m_task_t t = NULL;
   m_host_t h = NULL;
+  m_task_t task_to_wait_for;
   simdata_task_t t_simdata = NULL;
   simdata_host_t h_simdata = NULL;
   int first_time = 1;
@@ -63,7 +64,6 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
     if(max_duration>0) {
       if(!first_time) {
        PAJE_PROCESS_POP_STATE(process);
-       PAJE_COMM_STOP(process,t,channel);
        MSG_RETURN(MSG_TRANSFER_FAILURE);
       }
     }
@@ -98,7 +98,7 @@ static MSG_error_t __MSG_task_get_with_time_out_from_host(m_task_t * task,
   while(MSG_process_is_suspended(t_simdata->sender)) {
     DEBUG1("Oooups, the sender (%s) has been suspended in the meantime. Let's wait for him", 
           t_simdata->sender->name);
-    m_task_t task_to_wait_for = t_simdata->sender->simdata->waiting_task;
+    task_to_wait_for = t_simdata->sender->simdata->waiting_task;
     if(__MSG_process_isBlocked(t_simdata->sender)) {
       DEBUG0("He's blocked. Let's wait for him to go in the suspended state");
       __MSG_process_unblock(t_simdata->sender);
@@ -238,14 +238,12 @@ MSG_error_t MSG_task_get_from_host(m_task_t * task, int channel,
 int MSG_task_Iprobe(m_channel_t channel)
 {
   m_host_t h = NULL;
-  simdata_host_t h_simdata = NULL;
 
   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
   CHECK_HOST();
   h = MSG_host_self();
-  h_simdata = h->simdata;
-  return(xbt_fifo_get_first_item(h_simdata->mbox[channel])!=NULL);
+  return(xbt_fifo_get_first_item(h->simdata->mbox[channel])!=NULL);
 }
 
 /** \ingroup msg_gos_functions
@@ -260,14 +258,12 @@ int MSG_task_Iprobe(m_channel_t channel)
 int MSG_task_probe_from(m_channel_t channel)
 {
   m_host_t h = NULL;
-  simdata_host_t h_simdata = NULL;
   xbt_fifo_item_t item;
   m_task_t t;
 
   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   CHECK_HOST();
   h = MSG_host_self();
-  h_simdata = h->simdata;
 
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
    
@@ -367,7 +363,6 @@ MSG_error_t MSG_channel_select_from(m_channel_t channel, double max_duration,
  */
 int MSG_task_probe_from_host(int channel, m_host_t host)
 {
-  simdata_host_t h_simdata = NULL;
   xbt_fifo_item_t item;
   m_task_t t;
   int count = 0;
@@ -376,7 +371,6 @@ int MSG_task_probe_from_host(int channel, m_host_t host)
   xbt_assert1((channel>=0) && (channel < msg_global->max_channel),"Invalid channel %d",channel);
   CHECK_HOST();
   h = MSG_host_self();
-  h_simdata = h->simdata;
 
   DEBUG2("Probing on channel %d (%s)", channel,h->name);
    
@@ -458,6 +452,10 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
   while(!(task_simdata->comm)) {
     if(max_duration>0) {
       if(!first_time) {
+       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
+                       task);
+       PAJE_PROCESS_POP_STATE(process);
+       PAJE_COMM_STOP(process,task,channel);
        MSG_RETURN(MSG_TRANSFER_FAILURE);
       }
     }
@@ -473,6 +471,8 @@ MSG_error_t MSG_task_put_with_timeout(m_task_t task, m_host_t dest,
        get_state(local_host->simdata->host) == SURF_CPU_OFF) {
       xbt_fifo_remove(((simdata_host_t) remote_host->simdata)->mbox[channel],
                      task);
+      PAJE_PROCESS_POP_STATE(process);
+      PAJE_COMM_STOP(process,task,channel);
       MSG_task_destroy(task);
       MSG_RETURN(MSG_HOST_FAILURE);
     }
@@ -596,7 +596,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
   simdata = task->simdata;
   xbt_assert0((!simdata->compute)&&(task->simdata->using==1),
              "This taks is executed somewhere else. Go fix your code!");
-  task->simdata->using++;
+  simdata->using++;
   simdata->compute = surf_workstation_resource->extension_public->
     execute(MSG_process_get_host(process)->simdata->host,
            simdata->computation_amount);
@@ -604,7 +604,7 @@ void __MSG_task_execute(m_process_t process, m_task_t task)
     set_priority(simdata->compute, simdata->priority);
 
   surf_workstation_resource->common_public->action_set_data(simdata->compute,task);
-  task->simdata->using--;
+  simdata->using--;
 }
 
 MSG_error_t __MSG_wait_for_computation(m_process_t process, m_task_t task)
@@ -747,6 +747,8 @@ MSG_error_t MSG_process_sleep(double nb_sec)
   m_process_t process = MSG_process_self();
   m_task_t dummy = NULL;
   simdata_task_t simdata = NULL;
+  
+  xbt_assert1(nb_sec>=0,"Invalid duration %g",nb_sec);
 
   CHECK_HOST();
   dummy = MSG_task_create("MSG_sleep", nb_sec, 0.0, NULL);