Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
make msg_vm_migrate robust (i.e. now SRC or DST can be turned off during the migratio...
[simgrid.git] / src / msg / msg_vm.c
index 64c3a49..51516da 100644 (file)
@@ -371,12 +371,13 @@ static int migration_rx_fun(int argc, char *argv[])
 
   char *finalize_task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 3);
 
+  int ret = 0; 
   for (;;) {
     msg_task_t task = NULL;
-    MSG_task_recv(&task, ms->mbox);
+    ret = MSG_task_recv(&task, ms->mbox);
     {
       double received ;
-      if (task)
+      if (ret == MSG_OK)
        received = MSG_task_get_data_size(task);
       else{
        // An error occured, clean the code and return
@@ -401,36 +402,58 @@ static int migration_rx_fun(int argc, char *argv[])
   // Here Stage 1, 2  and 3 have been performed. 
   // Hence complete the migration
 
+  // Copy the reference to the vm (if SRC crashes now, do_migration will free ms)
+  // This is clearly ugly but I (Adrien) need more time to do something cleaner (actually we should copy the whole ms structure at the begining and free it at the end of each function)
+   msg_vm_t vm = ms->vm; 
+   msg_host_t src_pm = ms->src_pm; 
+   msg_host_t dst_pm = ms-> dst_pm; 
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
+
 // TODO: we have an issue, if the DST node is turning off during the three next calls, then the VM is in an inconsistent state
 // I should check with Takahiro in order to make this portion of code atomic
   /* deinstall the current affinity setting for the CPU */
-  simcall_vm_set_affinity(ms->vm, ms->src_pm, 0);
+  simcall_vm_set_affinity(vm, src_pm, 0);
 
   /* Update the vm location */
-  simcall_vm_migrate(ms->vm, ms->dst_pm);
+  simcall_vm_migrate(vm, dst_pm);
   
   /* Resume the VM */
-  simcall_vm_resume(ms->vm);
+  simcall_vm_resume(vm);
 
   /* install the affinity setting of the VM on the destination pm */
   {
-    msg_host_priv_t priv = msg_host_resource_priv(ms->vm);
 
-    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *) ms->dst_pm, sizeof(msg_host_t));
-    simcall_vm_set_affinity(ms->vm, ms->dst_pm, affinity_mask);
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(ms->dst_pm), MSG_host_get_name(ms->vm));
+    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *)dst_pm, sizeof(msg_host_t));
+    simcall_vm_set_affinity(vm, dst_pm, affinity_mask);
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(dst_pm), MSG_host_get_name(vm));
   }
 
   {
-    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
 
+   // Now the VM is running on the new host (the migration is completed) (even if the SRC crash)
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
+   priv->is_migrating = 0;
+   XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", ms->vm->key, ms->src_pm->key, ms->dst_pm->key);
+   #ifdef HAVE_TRACING
+    TRACE_msg_vm_change_host(ms->vm, ms->src_pm, ms->dst_pm);
+   #endif
+  
+  }
+  // Inform the SRC that the migration has been correctly performed
+  {
+    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
     msg_task_t task = MSG_task_create(task_name, 0, 0, NULL);
     msg_error_t ret = MSG_task_send(task, ms->mbox_ctl);
     // xbt_assert(ret == MSG_OK);
     if(ret == MSG_HOST_FAILURE){
-    // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+    // The DST has crashed, this is a problem has the VM since we are not sure whether SRC is considering that the VM has been correctly migrated on the DST node
+    // TODO What does it mean ? What should we do ? 
      MSG_task_destroy(task);
-    }
+    } else if(ret == MSG_TRANSFER_FAILURE){
+    // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+       MSG_task_destroy(task);
+     }
+
     xbt_free(task_name);
   }
 
@@ -858,15 +881,20 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
     ret = MSG_task_send_bounded(task, mbox, mig_speed);
   else
     ret = MSG_task_send(task, mbox);
+
 //  xbt_assert(ret == MSG_OK);
-    xbt_free(task_name);
-    if(ret == MSG_HOST_FAILURE){
-       THROWF(host_error, 0, "host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
-       //XBT_INFO("host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+  xbt_free(task_name);
+  if(ret == MSG_HOST_FAILURE){
+       //XBT_INFO("SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        MSG_task_destroy(task);
-       return; 
-      
-     }
+       THROWF(host_error, 0, "SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+   }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+  }
+//else
+//   XBT_INFO("Ret != FAILURE !!!!"); 
 #endif
 
   double clock_end = MSG_get_clock();
@@ -897,12 +925,18 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
 //      xbt_assert(ret == MSG_OK);
       xbt_free(task_name);
       if(ret == MSG_HOST_FAILURE){
-       THROWF(host_error, 0, "host failed during migration of VM %s (stage 3)", sg_host_name(vm));
-       //XBT_INFO("host failed during migration of %s (stage 3)", sg_host_name(vm));
+       //XBT_INFO("SRC host failed during migration of %s (stage 3)", sg_host_name(vm));
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "SRC host failed during migration of VM %s (stage 3)", sg_host_name(vm));
         // The owner of the task did not change so destroy the task 
+       return; 
+      }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        return; 
-      }
+     }
+
     }
   }
 #endif
@@ -936,7 +970,6 @@ static double send_stage1(struct migration_session *ms,
       datasize = remaining;
 
     remaining -= datasize;
-
     send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, datasize, ms->mbox, 1, 0, mig_speed, xfer_cpu_overhead);
     double computed = lookup_computed_flop_counts(ms->vm, 1, 0);
     computed_total += computed;
@@ -982,6 +1015,8 @@ static int migration_tx_fun(int argc, char *argv[])
   const double xfer_cpu_overhead = params.xfer_cpu_overhead;
   const double dpt_cpu_overhead = params.dpt_cpu_overhead;
 
+  msg_vm_t vm=ms->vm; 
+
   double remaining_size = ramsize + devsize;
 
   double max_downtime = params.max_downtime;
@@ -999,10 +1034,10 @@ static int migration_tx_fun(int argc, char *argv[])
     XBT_WARN("migrate a VM, but ramsize is zero");
 
 
-  XBT_INFO("mig-stage1: remaining_size %f", remaining_size);
+  XBT_DEBUG("mig-stage1: remaining_size %f", remaining_size);
 
   /* Stage1: send all memory pages to the destination. */
-  start_dirty_page_tracking(ms->vm);
+  start_dirty_page_tracking(vm);
 
   double computed_during_stage1 = 0;
   if (!skip_stage1) {
@@ -1014,9 +1049,9 @@ static int migration_tx_fun(int argc, char *argv[])
     TRY{
        computed_during_stage1 = send_stage1(ms, ramsize, mig_speed, xfer_cpu_overhead, dp_rate, dp_cap, dpt_cpu_overhead);
     } CATCH_ANONYMOUS{
-      //hostfailure
-     // Stop the dirty page tracking an return (there is no memory space to release) 
-      stop_dirty_page_tracking(ms->vm);
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
+      // Stop the dirty page tracking an return (there is no memory space to release) 
+      stop_dirty_page_tracking(vm);
       return 0; 
     }
     remaining_size -= ramsize;
@@ -1024,7 +1059,7 @@ static int migration_tx_fun(int argc, char *argv[])
     double clock_post_send = MSG_get_clock();
     double bandwidth = ramsize / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
   }
 
 
@@ -1050,7 +1085,7 @@ static int migration_tx_fun(int argc, char *argv[])
       updated_size = get_updated_size(computed, dp_rate, dp_cap);
     }
 
-    XBT_INFO("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
+    XBT_DEBUG("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
         stage2_round, updated_size, computed_during_stage1, dp_rate, dp_cap);
 
 
@@ -1065,7 +1100,7 @@ static int migration_tx_fun(int argc, char *argv[])
     {
       remaining_size += updated_size;
 
-      XBT_INFO("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
+      XBT_DEBUG("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
           remaining_size, (remaining_size < threshold) ? "<" : ">", threshold);
 
       if (remaining_size < threshold)
@@ -1076,16 +1111,16 @@ static int migration_tx_fun(int argc, char *argv[])
     TRY{
       send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, updated_size, ms->mbox, 2, stage2_round, mig_speed, xfer_cpu_overhead);
     }CATCH_ANONYMOUS{
-      //hostfailure
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
       // Stop the dirty page tracking an return (there is no memory space to release) 
-      stop_dirty_page_tracking(ms->vm);
+      stop_dirty_page_tracking(vm);
       return 0; 
     }
     double clock_post_send = MSG_get_clock();
 
     double bandwidth = updated_size / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
 
 
     remaining_size -= updated_size;
@@ -1095,16 +1130,16 @@ static int migration_tx_fun(int argc, char *argv[])
 
 stage3:
   /* Stage3: stop the VM and copy the rest of states. */
-  XBT_INFO("mig-stage3: remaining_size %f", remaining_size);
-  simcall_vm_suspend(ms->vm);
-  stop_dirty_page_tracking(ms->vm);
+  XBT_DEBUG("mig-stage3: remaining_size %f", remaining_size);
+  simcall_vm_suspend(vm);
+  stop_dirty_page_tracking(vm);
  
  TRY{
     send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, remaining_size, ms->mbox, 3, 0, mig_speed, xfer_cpu_overhead);
   }CATCH_ANONYMOUS{
-      //hostfailure
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
       // Stop the dirty page tracking an return (there is no memory space to release) 
-      simcall_vm_resume(ms->vm);
+      simcall_vm_resume(vm);
       return 0; 
     }
   
@@ -1117,7 +1152,7 @@ stage3:
 
 
 
-static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
+static int do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
 {
   struct migration_session *ms = xbt_new(struct migration_session, 1);
   ms->vm = vm;
@@ -1149,8 +1184,11 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
 
   /* wait until the migration have finished or on error has occured */
   {
+    XBT_DEBUG("wait for reception of the final ACK (i.e. migration has been correctly performed");
     msg_task_t task = NULL;
-    msg_error_t ret = MSG_task_recv(&task, ms->mbox_ctl);
+    msg_error_t ret = MSG_TIMEOUT; 
+    while (ret == MSG_TIMEOUT && MSG_host_is_avail(dst_pm)) //Wait while you receive the message o
+     ret = MSG_task_receive_with_timeout(&task, ms->mbox_ctl, 10);
 
     xbt_free(ms->mbox_ctl);
     xbt_free(ms->mbox);
@@ -1160,13 +1198,20 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
     if(ret == MSG_HOST_FAILURE){
         // Note that since the communication failed, the owner did not change and the task should be destroyed on the other side.
         // Hence, just throw the execption
-       THROWF(host_error, 0, "DST host failed during the migration of %s", sg_host_name(vm));
+        //XBT_INFO("SRC crashes, throw an exception (m-control)");
+        return -1; 
     } 
+    else if((ret == MSG_TRANSFER_FAILURE) || (ret == MSG_TIMEOUT)){ // MSG_TIMEOUT here means that MSG_host_is_avail() returned false.
+        //XBT_INFO("DST crashes, throw an exception (m-control)");
+        return -2;  
+    }
 
+    
     char *expected_task_name = get_mig_task_name(vm, src_pm, dst_pm, 4);
     xbt_assert(strcmp(task->name, expected_task_name) == 0);
     xbt_free(expected_task_name);
     MSG_task_destroy(task);
+    return 0; 
   }
 }
 
@@ -1214,19 +1259,25 @@ void MSG_vm_migrate(msg_vm_t vm, msg_host_t new_pm)
   msg_host_priv_t priv = msg_host_resource_priv(vm);
   priv->is_migrating = 1;
 
-  TRY{
-   do_migration(vm, old_pm, new_pm);
-  } CATCH_ANONYMOUS{
-   RETHROW; 
-   // TODO clean the code Adrien
+  {
+   
+    int ret = do_migration(vm, old_pm, new_pm); 
+    if (ret == -1){
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "SRC host failed during migration");
+    }
+    else if(ret == -2){ 
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "DST host failed during migration");
+    }
   }
-  priv->is_migrating = 0;
-
-  XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
 
-  #ifdef HAVE_TRACING
-  TRACE_msg_vm_change_host(vm, old_pm, new_pm);
-  #endif
+  // This part is done in the RX code, to handle the corner case where SRC can crash just at the end of the migration process
+  // In that case, the VM has been already assigned to the DST node.
+  //XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
+  //#ifdef HAVE_TRACING
+  //TRACE_msg_vm_change_host(vm, old_pm, new_pm);
+  //#endif
 }
 
 
@@ -1372,8 +1423,8 @@ void MSG_vm_set_affinity(msg_vm_t vm, msg_host_t pm, unsigned long mask)
 
   msg_host_t pm_now = MSG_vm_get_pm(vm);
   if (pm_now == pm) {
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
     simcall_vm_set_affinity(vm, pm, mask);
   } else
-    XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
 }