Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid
authorAdrien Lebre <alebre@adsein.local>
Thu, 9 Oct 2014 15:45:00 +0000 (17:45 +0200)
committerAdrien Lebre <alebre@adsein.local>
Thu, 9 Oct 2014 15:45:00 +0000 (17:45 +0200)
examples/java/cloud/migration/XVM.java
src/bindings/java/jmsg_vm.c
src/bindings/java/org/simgrid/msg/VM.java
src/bindings/java/smx_context_cojava.c
src/bindings/java/smx_context_java.c
src/msg/msg_vm.c
teshsuite/java/sleep_host_off/SleepHostOff.java

index 97467cb..46b2dc9 100644 (file)
@@ -8,6 +8,7 @@ package cloud.migration;
 
 import org.simgrid.msg.Host;
 import org.simgrid.msg.HostNotFoundException;
+import org.simgrid.msg.HostFailureException;
 import org.simgrid.msg.Msg;
 import org.simgrid.msg.VM;
 
@@ -65,10 +66,15 @@ public class XVM extends VM {
         return this.currentLoad;
     }
 
-    public void migrate(Host host){
+    public void migrate(Host host) throws HostFailureException {
         Msg.info("Start migration of VM " + this.getName() + " to " + host.getName());
         Msg.info("    currentLoad:" + this.currentLoad + "/ramSize:" + this.ramsize + "/dpIntensity:" + this.dpIntensity + "/remaining:" + this.daemon.getRemaining());
-        super.migrate(host);
+        try{
+               super.migrate(host);
+       } catch (Exception e){
+               Msg.info("Something wrong during the live migration of VM "+this.getName());
+               throw new HostFailureException(); 
+        }
         this.setLoad(this.currentLoad); //Fixed the fact that setBound is not propagated to the new node.
         Msg.info("End of migration of VM " + this.getName() + " to node " + host.getName());
     }
index d03010e..a86efda 100644 (file)
@@ -128,6 +128,7 @@ Java_org_simgrid_msg_VM_internalmig(JNIEnv *env, jobject jvm, jobject jhost) {
   TRY{
   MSG_vm_migrate(vm,host);
   } CATCH_ANONYMOUS{
+      XBT_INFO("CATCH EXCEPTION MIGRATION");
       jxbt_throw_host_failure(env, (char*)"during migration");
   } 
 }
index 4454960..c25f3b7 100644 (file)
@@ -147,16 +147,23 @@ public class VM extends Host{
        /**  
         * Invoke native migration routine
        */
-       public native void internalmig(Host destination);
+       public native void internalmig(Host destination) throws Exception; // TODO add throws DoubleMigrationException (i.e. when you call migrate on a VM that is already migrating);
+
 
        
        /** Change the host on which all processes are running
         * (pre-copy is implemented)
         */     
-       public void migrate(Host destination){
-               this.internalmig(destination);
-               // TODO we should test whether the migration has been correctly finalized. 
-               // If and only if it is ok, then we should change the currentHost value. 
+       public void migrate(Host destination) throws HostFailureException{
+               try {
+                       Msg.info("Migrate begins");
+                       this.internalmig(destination);
+                       Msg.info("Migrate ends");
+               } catch (Exception e){
+                 Msg.info("an exception occurs during the migration of VM "+this.getName());
+                 throw new HostFailureException();
+               }
+               // If the migration correcly returned, then we should change the currentHost value. 
                this.currentHost = destination; 
        }
        
index 3503536..bfafcb4 100644 (file)
@@ -180,7 +180,8 @@ void smx_ctx_cojava_stop(smx_context_t context)
   if (context->iwannadie) {
     context->iwannadie = 0;
     JNIEnv *env = get_current_thread_env();
-    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :)"));
+    // TODO it will be nice to have the name of the process to help the end-user to know which Process has been killed
+    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :) (file smx_context_cojava.c)"));
     THROWF(cancel_error, 0, "process cancelled");
   }
   else {
index b8dbfbb..a7677d5 100644 (file)
@@ -154,7 +154,11 @@ void smx_ctx_java_stop(smx_context_t context)
     context->iwannadie = 0;
     JNIEnv *env = get_current_thread_env();
     XBT_DEBUG("Gonnal launch Killed Error");
-    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :)"));
+    // TODO Adrien, if the process has not been created at the java layer, why should we raise the exception/error at the java level (this happens
+    // for instance during the migration process that creates at the C level two processes: one on the SRC node and one on the DST node, if the DST process is killed. 
+    // it is not required to raise an exception at the JAVA level, the low level should be able to manage such an issue correctly but this is not the case right now unfortunately ...
+    // TODO it will be nice to have the name of the process to help the end-user to know which Process has been killed
+    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :) (file smx_context_java.c)"));
     XBT_DEBUG("Trigger a cancel error at the C level");
     THROWF(cancel_error, 0, "process cancelled");
   } else {
index 64c3a49..51516da 100644 (file)
@@ -371,12 +371,13 @@ static int migration_rx_fun(int argc, char *argv[])
 
   char *finalize_task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 3);
 
+  int ret = 0; 
   for (;;) {
     msg_task_t task = NULL;
-    MSG_task_recv(&task, ms->mbox);
+    ret = MSG_task_recv(&task, ms->mbox);
     {
       double received ;
-      if (task)
+      if (ret == MSG_OK)
        received = MSG_task_get_data_size(task);
       else{
        // An error occured, clean the code and return
@@ -401,36 +402,58 @@ static int migration_rx_fun(int argc, char *argv[])
   // Here Stage 1, 2  and 3 have been performed. 
   // Hence complete the migration
 
+  // Copy the reference to the vm (if SRC crashes now, do_migration will free ms)
+  // This is clearly ugly but I (Adrien) need more time to do something cleaner (actually we should copy the whole ms structure at the begining and free it at the end of each function)
+   msg_vm_t vm = ms->vm; 
+   msg_host_t src_pm = ms->src_pm; 
+   msg_host_t dst_pm = ms-> dst_pm; 
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
+
 // TODO: we have an issue, if the DST node is turning off during the three next calls, then the VM is in an inconsistent state
 // I should check with Takahiro in order to make this portion of code atomic
   /* deinstall the current affinity setting for the CPU */
-  simcall_vm_set_affinity(ms->vm, ms->src_pm, 0);
+  simcall_vm_set_affinity(vm, src_pm, 0);
 
   /* Update the vm location */
-  simcall_vm_migrate(ms->vm, ms->dst_pm);
+  simcall_vm_migrate(vm, dst_pm);
   
   /* Resume the VM */
-  simcall_vm_resume(ms->vm);
+  simcall_vm_resume(vm);
 
   /* install the affinity setting of the VM on the destination pm */
   {
-    msg_host_priv_t priv = msg_host_resource_priv(ms->vm);
 
-    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *) ms->dst_pm, sizeof(msg_host_t));
-    simcall_vm_set_affinity(ms->vm, ms->dst_pm, affinity_mask);
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(ms->dst_pm), MSG_host_get_name(ms->vm));
+    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *)dst_pm, sizeof(msg_host_t));
+    simcall_vm_set_affinity(vm, dst_pm, affinity_mask);
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(dst_pm), MSG_host_get_name(vm));
   }
 
   {
-    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
 
+   // Now the VM is running on the new host (the migration is completed) (even if the SRC crash)
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
+   priv->is_migrating = 0;
+   XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", ms->vm->key, ms->src_pm->key, ms->dst_pm->key);
+   #ifdef HAVE_TRACING
+    TRACE_msg_vm_change_host(ms->vm, ms->src_pm, ms->dst_pm);
+   #endif
+  
+  }
+  // Inform the SRC that the migration has been correctly performed
+  {
+    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
     msg_task_t task = MSG_task_create(task_name, 0, 0, NULL);
     msg_error_t ret = MSG_task_send(task, ms->mbox_ctl);
     // xbt_assert(ret == MSG_OK);
     if(ret == MSG_HOST_FAILURE){
-    // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+    // The DST has crashed, this is a problem has the VM since we are not sure whether SRC is considering that the VM has been correctly migrated on the DST node
+    // TODO What does it mean ? What should we do ? 
      MSG_task_destroy(task);
-    }
+    } else if(ret == MSG_TRANSFER_FAILURE){
+    // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+       MSG_task_destroy(task);
+     }
+
     xbt_free(task_name);
   }
 
@@ -858,15 +881,20 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
     ret = MSG_task_send_bounded(task, mbox, mig_speed);
   else
     ret = MSG_task_send(task, mbox);
+
 //  xbt_assert(ret == MSG_OK);
-    xbt_free(task_name);
-    if(ret == MSG_HOST_FAILURE){
-       THROWF(host_error, 0, "host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
-       //XBT_INFO("host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+  xbt_free(task_name);
+  if(ret == MSG_HOST_FAILURE){
+       //XBT_INFO("SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        MSG_task_destroy(task);
-       return; 
-      
-     }
+       THROWF(host_error, 0, "SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+   }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+  }
+//else
+//   XBT_INFO("Ret != FAILURE !!!!"); 
 #endif
 
   double clock_end = MSG_get_clock();
@@ -897,12 +925,18 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
 //      xbt_assert(ret == MSG_OK);
       xbt_free(task_name);
       if(ret == MSG_HOST_FAILURE){
-       THROWF(host_error, 0, "host failed during migration of VM %s (stage 3)", sg_host_name(vm));
-       //XBT_INFO("host failed during migration of %s (stage 3)", sg_host_name(vm));
+       //XBT_INFO("SRC host failed during migration of %s (stage 3)", sg_host_name(vm));
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "SRC host failed during migration of VM %s (stage 3)", sg_host_name(vm));
         // The owner of the task did not change so destroy the task 
+       return; 
+      }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        return; 
-      }
+     }
+
     }
   }
 #endif
@@ -936,7 +970,6 @@ static double send_stage1(struct migration_session *ms,
       datasize = remaining;
 
     remaining -= datasize;
-
     send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, datasize, ms->mbox, 1, 0, mig_speed, xfer_cpu_overhead);
     double computed = lookup_computed_flop_counts(ms->vm, 1, 0);
     computed_total += computed;
@@ -982,6 +1015,8 @@ static int migration_tx_fun(int argc, char *argv[])
   const double xfer_cpu_overhead = params.xfer_cpu_overhead;
   const double dpt_cpu_overhead = params.dpt_cpu_overhead;
 
+  msg_vm_t vm=ms->vm; 
+
   double remaining_size = ramsize + devsize;
 
   double max_downtime = params.max_downtime;
@@ -999,10 +1034,10 @@ static int migration_tx_fun(int argc, char *argv[])
     XBT_WARN("migrate a VM, but ramsize is zero");
 
 
-  XBT_INFO("mig-stage1: remaining_size %f", remaining_size);
+  XBT_DEBUG("mig-stage1: remaining_size %f", remaining_size);
 
   /* Stage1: send all memory pages to the destination. */
-  start_dirty_page_tracking(ms->vm);
+  start_dirty_page_tracking(vm);
 
   double computed_during_stage1 = 0;
   if (!skip_stage1) {
@@ -1014,9 +1049,9 @@ static int migration_tx_fun(int argc, char *argv[])
     TRY{
        computed_during_stage1 = send_stage1(ms, ramsize, mig_speed, xfer_cpu_overhead, dp_rate, dp_cap, dpt_cpu_overhead);
     } CATCH_ANONYMOUS{
-      //hostfailure
-     // Stop the dirty page tracking an return (there is no memory space to release) 
-      stop_dirty_page_tracking(ms->vm);
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
+      // Stop the dirty page tracking an return (there is no memory space to release) 
+      stop_dirty_page_tracking(vm);
       return 0; 
     }
     remaining_size -= ramsize;
@@ -1024,7 +1059,7 @@ static int migration_tx_fun(int argc, char *argv[])
     double clock_post_send = MSG_get_clock();
     double bandwidth = ramsize / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
   }
 
 
@@ -1050,7 +1085,7 @@ static int migration_tx_fun(int argc, char *argv[])
       updated_size = get_updated_size(computed, dp_rate, dp_cap);
     }
 
-    XBT_INFO("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
+    XBT_DEBUG("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
         stage2_round, updated_size, computed_during_stage1, dp_rate, dp_cap);
 
 
@@ -1065,7 +1100,7 @@ static int migration_tx_fun(int argc, char *argv[])
     {
       remaining_size += updated_size;
 
-      XBT_INFO("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
+      XBT_DEBUG("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
           remaining_size, (remaining_size < threshold) ? "<" : ">", threshold);
 
       if (remaining_size < threshold)
@@ -1076,16 +1111,16 @@ static int migration_tx_fun(int argc, char *argv[])
     TRY{
       send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, updated_size, ms->mbox, 2, stage2_round, mig_speed, xfer_cpu_overhead);
     }CATCH_ANONYMOUS{
-      //hostfailure
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
       // Stop the dirty page tracking an return (there is no memory space to release) 
-      stop_dirty_page_tracking(ms->vm);
+      stop_dirty_page_tracking(vm);
       return 0; 
     }
     double clock_post_send = MSG_get_clock();
 
     double bandwidth = updated_size / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
 
 
     remaining_size -= updated_size;
@@ -1095,16 +1130,16 @@ static int migration_tx_fun(int argc, char *argv[])
 
 stage3:
   /* Stage3: stop the VM and copy the rest of states. */
-  XBT_INFO("mig-stage3: remaining_size %f", remaining_size);
-  simcall_vm_suspend(ms->vm);
-  stop_dirty_page_tracking(ms->vm);
+  XBT_DEBUG("mig-stage3: remaining_size %f", remaining_size);
+  simcall_vm_suspend(vm);
+  stop_dirty_page_tracking(vm);
  
  TRY{
     send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, remaining_size, ms->mbox, 3, 0, mig_speed, xfer_cpu_overhead);
   }CATCH_ANONYMOUS{
-      //hostfailure
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
       // Stop the dirty page tracking an return (there is no memory space to release) 
-      simcall_vm_resume(ms->vm);
+      simcall_vm_resume(vm);
       return 0; 
     }
   
@@ -1117,7 +1152,7 @@ stage3:
 
 
 
-static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
+static int do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
 {
   struct migration_session *ms = xbt_new(struct migration_session, 1);
   ms->vm = vm;
@@ -1149,8 +1184,11 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
 
   /* wait until the migration have finished or on error has occured */
   {
+    XBT_DEBUG("wait for reception of the final ACK (i.e. migration has been correctly performed");
     msg_task_t task = NULL;
-    msg_error_t ret = MSG_task_recv(&task, ms->mbox_ctl);
+    msg_error_t ret = MSG_TIMEOUT; 
+    while (ret == MSG_TIMEOUT && MSG_host_is_avail(dst_pm)) //Wait while you receive the message o
+     ret = MSG_task_receive_with_timeout(&task, ms->mbox_ctl, 10);
 
     xbt_free(ms->mbox_ctl);
     xbt_free(ms->mbox);
@@ -1160,13 +1198,20 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
     if(ret == MSG_HOST_FAILURE){
         // Note that since the communication failed, the owner did not change and the task should be destroyed on the other side.
         // Hence, just throw the execption
-       THROWF(host_error, 0, "DST host failed during the migration of %s", sg_host_name(vm));
+        //XBT_INFO("SRC crashes, throw an exception (m-control)");
+        return -1; 
     } 
+    else if((ret == MSG_TRANSFER_FAILURE) || (ret == MSG_TIMEOUT)){ // MSG_TIMEOUT here means that MSG_host_is_avail() returned false.
+        //XBT_INFO("DST crashes, throw an exception (m-control)");
+        return -2;  
+    }
 
+    
     char *expected_task_name = get_mig_task_name(vm, src_pm, dst_pm, 4);
     xbt_assert(strcmp(task->name, expected_task_name) == 0);
     xbt_free(expected_task_name);
     MSG_task_destroy(task);
+    return 0; 
   }
 }
 
@@ -1214,19 +1259,25 @@ void MSG_vm_migrate(msg_vm_t vm, msg_host_t new_pm)
   msg_host_priv_t priv = msg_host_resource_priv(vm);
   priv->is_migrating = 1;
 
-  TRY{
-   do_migration(vm, old_pm, new_pm);
-  } CATCH_ANONYMOUS{
-   RETHROW; 
-   // TODO clean the code Adrien
+  {
+   
+    int ret = do_migration(vm, old_pm, new_pm); 
+    if (ret == -1){
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "SRC host failed during migration");
+    }
+    else if(ret == -2){ 
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "DST host failed during migration");
+    }
   }
-  priv->is_migrating = 0;
-
-  XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
 
-  #ifdef HAVE_TRACING
-  TRACE_msg_vm_change_host(vm, old_pm, new_pm);
-  #endif
+  // This part is done in the RX code, to handle the corner case where SRC can crash just at the end of the migration process
+  // In that case, the VM has been already assigned to the DST node.
+  //XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
+  //#ifdef HAVE_TRACING
+  //TRACE_msg_vm_change_host(vm, old_pm, new_pm);
+  //#endif
 }
 
 
@@ -1372,8 +1423,8 @@ void MSG_vm_set_affinity(msg_vm_t vm, msg_host_t pm, unsigned long mask)
 
   msg_host_t pm_now = MSG_vm_get_pm(vm);
   if (pm_now == pm) {
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
     simcall_vm_set_affinity(vm, pm, mask);
   } else
-    XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
 }
index a18c9b2..d99d4af 100644 (file)
@@ -40,6 +40,7 @@ public class SleepHostOff extends Process{
           } catch (HostFailureException e) {
             Msg.info("catch HostException");
             e.printStackTrace();
+            break; //Break is needed to finalize the endless loop 
           }
         }
       }