Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
make msg_vm_migrate robust (i.e. now SRC or DST can be turned off during the migratio...
[simgrid.git] / src / msg / msg_vm.c
index 6412601..51516da 100644 (file)
@@ -360,9 +360,9 @@ static int migration_rx_fun(int argc, char *argv[])
 {
   XBT_DEBUG("mig: rx_start");
 
+  // The structure has been created in the do_migration function and should only be freed in the same place ;)
   struct migration_session *ms = MSG_process_get_data(MSG_process_self());
 
-
   s_ws_params_t params;
   simcall_host_get_params(ms->vm, &params);
   const double xfer_cpu_overhead = params.xfer_cpu_overhead;
@@ -371,16 +371,20 @@ static int migration_rx_fun(int argc, char *argv[])
 
   char *finalize_task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 3);
 
+  int ret = 0; 
   for (;;) {
     msg_task_t task = NULL;
-    MSG_task_recv(&task, ms->mbox);
+    ret = MSG_task_recv(&task, ms->mbox);
     {
-       double received ;
-      // TODO Adrien Clean the code (destroy task, free memory etc..
-      if (task)
+      double received ;
+      if (ret == MSG_OK)
        received = MSG_task_get_data_size(task);
-      else
-        return 0;
+      else{
+       // An error occured, clean the code and return
+        // The owner did not change, hence the task should be only destroyed on the other side
+        xbt_free(finalize_task_name);
+         return 0;
+      }
       /* TODO: clean up */
       // const double alpha = 0.22L * 1.0E8 / (80L * 1024 * 1024);
       launch_deferred_exec_process(ms->vm, received * xfer_cpu_overhead, 1);
@@ -395,31 +399,60 @@ static int migration_rx_fun(int argc, char *argv[])
       break;
   }
 
+  // Here Stage 1, 2  and 3 have been performed. 
+  // Hence complete the migration
+
+  // Copy the reference to the vm (if SRC crashes now, do_migration will free ms)
+  // This is clearly ugly but I (Adrien) need more time to do something cleaner (actually we should copy the whole ms structure at the begining and free it at the end of each function)
+   msg_vm_t vm = ms->vm; 
+   msg_host_t src_pm = ms->src_pm; 
+   msg_host_t dst_pm = ms-> dst_pm; 
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
 
+// TODO: we have an issue, if the DST node is turning off during the three next calls, then the VM is in an inconsistent state
+// I should check with Takahiro in order to make this portion of code atomic
   /* deinstall the current affinity setting for the CPU */
-  simcall_vm_set_affinity(ms->vm, ms->src_pm, 0);
+  simcall_vm_set_affinity(vm, src_pm, 0);
 
   /* Update the vm location */
-  simcall_vm_migrate(ms->vm, ms->dst_pm);
+  simcall_vm_migrate(vm, dst_pm);
   
   /* Resume the VM */
-  simcall_vm_resume(ms->vm);
+  simcall_vm_resume(vm);
 
   /* install the affinity setting of the VM on the destination pm */
   {
-    msg_host_priv_t priv = msg_host_resource_priv(ms->vm);
 
-    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *) ms->dst_pm, sizeof(msg_host_t));
-    simcall_vm_set_affinity(ms->vm, ms->dst_pm, affinity_mask);
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(ms->dst_pm), MSG_host_get_name(ms->vm));
+    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *)dst_pm, sizeof(msg_host_t));
+    simcall_vm_set_affinity(vm, dst_pm, affinity_mask);
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(dst_pm), MSG_host_get_name(vm));
   }
 
   {
-    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
 
+   // Now the VM is running on the new host (the migration is completed) (even if the SRC crash)
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
+   priv->is_migrating = 0;
+   XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", ms->vm->key, ms->src_pm->key, ms->dst_pm->key);
+   #ifdef HAVE_TRACING
+    TRACE_msg_vm_change_host(ms->vm, ms->src_pm, ms->dst_pm);
+   #endif
+  
+  }
+  // Inform the SRC that the migration has been correctly performed
+  {
+    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
     msg_task_t task = MSG_task_create(task_name, 0, 0, NULL);
     msg_error_t ret = MSG_task_send(task, ms->mbox_ctl);
-    xbt_assert(ret == MSG_OK);
+    // xbt_assert(ret == MSG_OK);
+    if(ret == MSG_HOST_FAILURE){
+    // The DST has crashed, this is a problem has the VM since we are not sure whether SRC is considering that the VM has been correctly migrated on the DST node
+    // TODO What does it mean ? What should we do ? 
+     MSG_task_destroy(task);
+    } else if(ret == MSG_TRANSFER_FAILURE){
+    // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+       MSG_task_destroy(task);
+     }
 
     xbt_free(task_name);
   }
@@ -673,7 +706,8 @@ static void shutdown_overhead_process(msg_task_t comm_task)
 
   // XBT_INFO("micro shutdown: mbox %s", mbox);
   msg_error_t ret = MSG_task_send(task, mbox);
-  xbt_assert(ret == MSG_OK);
+  if(ret != MSG_OK)
+    xbt_die("shutdown error - task not sent");
 
   xbt_free(mbox);
   // XBT_INFO("shutdown done");
@@ -687,7 +721,8 @@ static void request_overhead(msg_task_t comm_task, double computation)
 
   // XBT_INFO("req overhead");
   msg_error_t ret = MSG_task_send(task, mbox);
-  xbt_assert(ret == MSG_OK);
+  if(ret != MSG_OK)
+    xbt_die("req overhead error - task not sent");
 
   xbt_free(mbox);
 }
@@ -728,7 +763,8 @@ static void task_send_bounded_with_cpu_overhead(msg_task_t comm_task, char *mbox
       request_overhead(comm_task, data_size * alpha);
 
       msg_error_t ret = MSG_task_send(mtask, mbox);
-      xbt_assert(ret == MSG_OK);
+      if(ret != MSG_OK)
+        xbt_die("migration error - task not sent");
 
       xbt_free(mtask_name);
     }
@@ -845,11 +881,20 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
     ret = MSG_task_send_bounded(task, mbox, mig_speed);
   else
     ret = MSG_task_send(task, mbox);
+
 //  xbt_assert(ret == MSG_OK);
-    xbt_free(task_name);
-    if(ret == MSG_HOST_FAILURE){
-       THROWF(host_error, 0, "host failed during migration of %s", sg_host_name(vm));
-       }
+  xbt_free(task_name);
+  if(ret == MSG_HOST_FAILURE){
+       //XBT_INFO("SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+   }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+  }
+//else
+//   XBT_INFO("Ret != FAILURE !!!!"); 
 #endif
 
   double clock_end = MSG_get_clock();
@@ -861,7 +906,6 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
   double cpu_utilization = 0;
 #endif
 
-// TODO - adsein, WTF with the following code ? 
   if (stage == 2){
     XBT_DEBUG("mig-stage%d.%d: sent %llu duration %f actual_speed %f (target %f) cpu %f", stage, stage2_round, size, duration, actual_speed, mig_speed, cpu_utilization);}
   else{
@@ -869,8 +913,6 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
   }
 
 
-
-
 #ifdef USE_MICRO_TASK
   /* The name of a micro task starts with __micro, which does not match the
    * special name that finalizes the receiver loop. Thus, we send the special task.
@@ -883,11 +925,18 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
 //      xbt_assert(ret == MSG_OK);
       xbt_free(task_name);
       if(ret == MSG_HOST_FAILURE){
-       //THROWF(host_error, 0, "host failed", sg_host_name(vm));
-       XBT_INFO("host failed during migration of %s (stage 3)", sg_host_name(vm));
-       //MSG_task_destroy(task);
+       //XBT_INFO("SRC host failed during migration of %s (stage 3)", sg_host_name(vm));
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "SRC host failed during migration of VM %s (stage 3)", sg_host_name(vm));
+        // The owner of the task did not change so destroy the task 
        return; 
-      }
+      }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+       return; 
+     }
+
     }
   }
 #endif
@@ -921,7 +970,6 @@ static double send_stage1(struct migration_session *ms,
       datasize = remaining;
 
     remaining -= datasize;
-
     send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, datasize, ms->mbox, 1, 0, mig_speed, xfer_cpu_overhead);
     double computed = lookup_computed_flop_counts(ms->vm, 1, 0);
     computed_total += computed;
@@ -952,7 +1000,7 @@ static int migration_tx_fun(int argc, char *argv[])
 {
   XBT_DEBUG("mig: tx_start");
 
-  
+  // Note that the ms structure has been allocated in do_migration and hence should be freed in the same function ;) 
   struct migration_session *ms = MSG_process_get_data(MSG_process_self());
 
   s_ws_params_t params;
@@ -967,6 +1015,8 @@ static int migration_tx_fun(int argc, char *argv[])
   const double xfer_cpu_overhead = params.xfer_cpu_overhead;
   const double dpt_cpu_overhead = params.dpt_cpu_overhead;
 
+  msg_vm_t vm=ms->vm; 
+
   double remaining_size = ramsize + devsize;
 
   double max_downtime = params.max_downtime;
@@ -984,10 +1034,10 @@ static int migration_tx_fun(int argc, char *argv[])
     XBT_WARN("migrate a VM, but ramsize is zero");
 
 
-  XBT_INFO("mig-stage1: remaining_size %f", remaining_size);
+  XBT_DEBUG("mig-stage1: remaining_size %f", remaining_size);
 
   /* Stage1: send all memory pages to the destination. */
-  start_dirty_page_tracking(ms->vm);
+  start_dirty_page_tracking(vm);
 
   double computed_during_stage1 = 0;
   if (!skip_stage1) {
@@ -999,8 +1049,9 @@ static int migration_tx_fun(int argc, char *argv[])
     TRY{
        computed_during_stage1 = send_stage1(ms, ramsize, mig_speed, xfer_cpu_overhead, dp_rate, dp_cap, dpt_cpu_overhead);
     } CATCH_ANONYMOUS{
-      //hostfailure
-      // TODO adsein, we should probably clean a bit the memory ? 
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
+      // Stop the dirty page tracking an return (there is no memory space to release) 
+      stop_dirty_page_tracking(vm);
       return 0; 
     }
     remaining_size -= ramsize;
@@ -1008,7 +1059,7 @@ static int migration_tx_fun(int argc, char *argv[])
     double clock_post_send = MSG_get_clock();
     double bandwidth = ramsize / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
   }
 
 
@@ -1034,7 +1085,7 @@ static int migration_tx_fun(int argc, char *argv[])
       updated_size = get_updated_size(computed, dp_rate, dp_cap);
     }
 
-    XBT_INFO("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
+    XBT_DEBUG("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
         stage2_round, updated_size, computed_during_stage1, dp_rate, dp_cap);
 
 
@@ -1049,7 +1100,7 @@ static int migration_tx_fun(int argc, char *argv[])
     {
       remaining_size += updated_size;
 
-      XBT_INFO("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
+      XBT_DEBUG("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
           remaining_size, (remaining_size < threshold) ? "<" : ">", threshold);
 
       if (remaining_size < threshold)
@@ -1057,14 +1108,19 @@ static int migration_tx_fun(int argc, char *argv[])
     }
 
     double clock_prev_send = MSG_get_clock();
-
-    send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, updated_size, ms->mbox, 2, stage2_round, mig_speed, xfer_cpu_overhead);
-
+    TRY{
+      send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, updated_size, ms->mbox, 2, stage2_round, mig_speed, xfer_cpu_overhead);
+    }CATCH_ANONYMOUS{
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
+      // Stop the dirty page tracking an return (there is no memory space to release) 
+      stop_dirty_page_tracking(vm);
+      return 0; 
+    }
     double clock_post_send = MSG_get_clock();
 
     double bandwidth = updated_size / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
 
 
     remaining_size -= updated_size;
@@ -1074,12 +1130,20 @@ static int migration_tx_fun(int argc, char *argv[])
 
 stage3:
   /* Stage3: stop the VM and copy the rest of states. */
-  XBT_INFO("mig-stage3: remaining_size %f", remaining_size);
-  simcall_vm_suspend(ms->vm);
-  stop_dirty_page_tracking(ms->vm);
-
-  send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, remaining_size, ms->mbox, 3, 0, mig_speed, xfer_cpu_overhead);
-
+  XBT_DEBUG("mig-stage3: remaining_size %f", remaining_size);
+  simcall_vm_suspend(vm);
+  stop_dirty_page_tracking(vm);
+ TRY{
+    send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, remaining_size, ms->mbox, 3, 0, mig_speed, xfer_cpu_overhead);
+  }CATCH_ANONYMOUS{
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
+      // Stop the dirty page tracking an return (there is no memory space to release) 
+      simcall_vm_resume(vm);
+      return 0; 
+    }
+  
+ // At that point the Migration is considered valid for the SRC node but remind that the DST side should relocate effectively the VM on the DST node. 
 
   XBT_DEBUG("mig: tx_done");
 
@@ -1088,7 +1152,7 @@ stage3:
 
 
 
-static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
+static int do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
 {
   struct migration_session *ms = xbt_new(struct migration_session, 1);
   ms->vm = vm;
@@ -1118,33 +1182,42 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
  }
 #endif
 
-
-
-
-  /* wait until the migration have finished */
+  /* wait until the migration have finished or on error has occured */
   {
+    XBT_DEBUG("wait for reception of the final ACK (i.e. migration has been correctly performed");
     msg_task_t task = NULL;
-    msg_error_t ret = MSG_task_recv(&task, ms->mbox_ctl);
-
+    msg_error_t ret = MSG_TIMEOUT; 
+    while (ret == MSG_TIMEOUT && MSG_host_is_avail(dst_pm)) //Wait while you receive the message o
+     ret = MSG_task_receive_with_timeout(&task, ms->mbox_ctl, 10);
+
+    xbt_free(ms->mbox_ctl);
+    xbt_free(ms->mbox);
+    xbt_free(ms);
+    
     //xbt_assert(ret == MSG_OK);
     if(ret == MSG_HOST_FAILURE){
-       //MSG_task_destroy(task);
-       THROWF(host_error, 0, "host failed during migration of %s", sg_host_name(vm));
-       } 
-       // TODO clean the code
+        // Note that since the communication failed, the owner did not change and the task should be destroyed on the other side.
+        // Hence, just throw the execption
+        //XBT_INFO("SRC crashes, throw an exception (m-control)");
+        return -1; 
+    } 
+    else if((ret == MSG_TRANSFER_FAILURE) || (ret == MSG_TIMEOUT)){ // MSG_TIMEOUT here means that MSG_host_is_avail() returned false.
+        //XBT_INFO("DST crashes, throw an exception (m-control)");
+        return -2;  
+    }
 
+    
     char *expected_task_name = get_mig_task_name(vm, src_pm, dst_pm, 4);
     xbt_assert(strcmp(task->name, expected_task_name) == 0);
     xbt_free(expected_task_name);
     MSG_task_destroy(task);
+    return 0; 
   }
-
-  xbt_free(ms->mbox_ctl);
-  xbt_free(ms->mbox);
-  xbt_free(ms);
 }
 
 
+
+
 /** @brief Migrate the VM to the given host.
  *  @ingroup msg_VMs
  *
@@ -1186,19 +1259,25 @@ void MSG_vm_migrate(msg_vm_t vm, msg_host_t new_pm)
   msg_host_priv_t priv = msg_host_resource_priv(vm);
   priv->is_migrating = 1;
 
-  TRY{
-   do_migration(vm, old_pm, new_pm);
-  } CATCH_ANONYMOUS{
-   RETHROW; 
-   // TODO clean the code Adrien
+  {
+   
+    int ret = do_migration(vm, old_pm, new_pm); 
+    if (ret == -1){
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "SRC host failed during migration");
+    }
+    else if(ret == -2){ 
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "DST host failed during migration");
+    }
   }
-  priv->is_migrating = 0;
 
-  XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
-
-  #ifdef HAVE_TRACING
-  TRACE_msg_vm_change_host(vm, old_pm, new_pm);
-  #endif
+  // This part is done in the RX code, to handle the corner case where SRC can crash just at the end of the migration process
+  // In that case, the VM has been already assigned to the DST node.
+  //XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
+  //#ifdef HAVE_TRACING
+  //TRACE_msg_vm_change_host(vm, old_pm, new_pm);
+  //#endif
 }
 
 
@@ -1344,8 +1423,8 @@ void MSG_vm_set_affinity(msg_vm_t vm, msg_host_t pm, unsigned long mask)
 
   msg_host_t pm_now = MSG_vm_get_pm(vm);
   if (pm_now == pm) {
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
     simcall_vm_set_affinity(vm, pm, mask);
   } else
-    XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
 }