Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid
authorMartin Quinson <martin.quinson@loria.fr>
Thu, 9 Oct 2014 15:59:08 +0000 (17:59 +0200)
committerMartin Quinson <martin.quinson@loria.fr>
Thu, 9 Oct 2014 15:59:08 +0000 (17:59 +0200)
Conflicts:
src/bindings/java/smx_context_java.c

18 files changed:
buildtools/Cmake/AddTests.cmake
examples/java/cloud/migration/XVM.java
examples/msg/mc/bugged1_liveness_sparse.tesh
examples/msg/mc/bugged1_liveness_visited_sparse.tesh
src/bindings/java/jmsg_vm.c
src/bindings/java/org/simgrid/msg/VM.java
src/bindings/java/smx_context_cojava.c
src/bindings/java/smx_context_java.c
src/mc/mc_checkpoint.c
src/mc/mc_comm_determinism.c
src/mc/mc_diff.c
src/mc/mc_ignore.c
src/mc/mc_memory.c
src/mc/mc_page_snapshot.cpp
src/mc/mc_private.h
src/msg/msg_vm.c
src/simix/smx_context_raw.c
teshsuite/java/sleep_host_off/SleepHostOff.java

index b659eb4..ff453f3 100644 (file)
@@ -113,10 +113,10 @@ ENDIF()
     ADD_TESH_FACTORIES(mc-bugged1                "ucontext;raw" --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1.tesh)
     ADD_TESH_FACTORIES(mc-bugged2                "ucontext;raw" --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged2.tesh)
     IF(CONTEXT_UCONTEXT AND PROCESSOR_x86_64) # liveness model-checking works only on 64bits (for now ...)
-      ADD_TESH(mc-bugged1-liveness-ucontext      --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness.tesh)
-      ADD_TESH(mc-bugged1-liveness-ucontext-sparse      --cfg contexts/factory:ucontext --cfg model-check/sparse-checkpoint:yes --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_sparse.tesh)
-      ADD_TESH(mc-bugged1-liveness-visited-ucontext --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited.tesh)
-      ADD_TESH(mc-bugged1-liveness-visited-ucontext-sparse --cfg contexts/factory:ucontext --cfg model-check/sparse-checkpoint:yes --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited_sparse.tesh)
+      ADD_TESH(mc-bugged1-liveness-ucontext         --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness.tesh)
+      ADD_TESH(mc-bugged1-liveness-ucontext-sparse  --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_sparse.tesh)
+      ADD_TESH(mc-bugged1-liveness-visited-ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited.tesh)
+      ADD_TESH(mc-bugged1-liveness-visited-ucontext-sparse --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited_sparse.tesh)
     ENDIF()
   ENDIF()
 
index 97467cb..46b2dc9 100644 (file)
@@ -8,6 +8,7 @@ package cloud.migration;
 
 import org.simgrid.msg.Host;
 import org.simgrid.msg.HostNotFoundException;
+import org.simgrid.msg.HostFailureException;
 import org.simgrid.msg.Msg;
 import org.simgrid.msg.VM;
 
@@ -65,10 +66,15 @@ public class XVM extends VM {
         return this.currentLoad;
     }
 
-    public void migrate(Host host){
+    public void migrate(Host host) throws HostFailureException {
         Msg.info("Start migration of VM " + this.getName() + " to " + host.getName());
         Msg.info("    currentLoad:" + this.currentLoad + "/ramSize:" + this.ramsize + "/dpIntensity:" + this.dpIntensity + "/remaining:" + this.daemon.getRemaining());
-        super.migrate(host);
+        try{
+               super.migrate(host);
+       } catch (Exception e){
+               Msg.info("Something wrong during the live migration of VM "+this.getName());
+               throw new HostFailureException(); 
+        }
         this.setLoad(this.currentLoad); //Fixed the fact that setBound is not propagated to the new node.
         Msg.info("End of migration of VM " + this.getName() + " to node " + host.getName());
     }
index 37ac72c..f5100d6 100644 (file)
@@ -2,7 +2,7 @@
 
 ! expect signal SIGABRT
 ! timeout 60
-$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=contexts/stack_size:256
+$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=contexts/stack_size:256 --cfg=model-check/sparse-checkpoint:yes
 > [  0.000000] (0:@) Configuration change: Set 'model-check' to '1'
 > [  0.000000] (0:@) Configuration change: Set 'model-check/sparse-checkpoint' to 'yes'
 > [  0.000000] (0:@) Check the liveness property promela_bugged1_liveness
index 0c34a4f..bd9981d 100644 (file)
@@ -2,7 +2,7 @@
 
 ! expect signal SIGABRT
 ! timeout 90
-$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness_visited.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=model-check/visited:100 --cfg=contexts/stack_size:256
+$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness_visited.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=model-check/visited:100 --cfg=contexts/stack_size:256 --cfg=model-check/sparse-checkpoint:yes
 > [  0.000000] (0:@) Configuration change: Set 'model-check' to '1'
 > [  0.000000] (0:@) Configuration change: Set 'model-check/visited' to '100'
 > [  0.000000] (0:@) Configuration change: Set 'model-check/sparse-checkpoint' to 'yes'
index d03010e..a86efda 100644 (file)
@@ -128,6 +128,7 @@ Java_org_simgrid_msg_VM_internalmig(JNIEnv *env, jobject jvm, jobject jhost) {
   TRY{
   MSG_vm_migrate(vm,host);
   } CATCH_ANONYMOUS{
+      XBT_INFO("CATCH EXCEPTION MIGRATION");
       jxbt_throw_host_failure(env, (char*)"during migration");
   } 
 }
index 4454960..c25f3b7 100644 (file)
@@ -147,16 +147,23 @@ public class VM extends Host{
        /**  
         * Invoke native migration routine
        */
-       public native void internalmig(Host destination);
+       public native void internalmig(Host destination) throws Exception; // TODO add throws DoubleMigrationException (i.e. when you call migrate on a VM that is already migrating);
+
 
        
        /** Change the host on which all processes are running
         * (pre-copy is implemented)
         */     
-       public void migrate(Host destination){
-               this.internalmig(destination);
-               // TODO we should test whether the migration has been correctly finalized. 
-               // If and only if it is ok, then we should change the currentHost value. 
+       public void migrate(Host destination) throws HostFailureException{
+               try {
+                       Msg.info("Migrate begins");
+                       this.internalmig(destination);
+                       Msg.info("Migrate ends");
+               } catch (Exception e){
+                 Msg.info("an exception occurs during the migration of VM "+this.getName());
+                 throw new HostFailureException();
+               }
+               // If the migration correcly returned, then we should change the currentHost value. 
                this.currentHost = destination; 
        }
        
index 3503536..bfafcb4 100644 (file)
@@ -180,7 +180,8 @@ void smx_ctx_cojava_stop(smx_context_t context)
   if (context->iwannadie) {
     context->iwannadie = 0;
     JNIEnv *env = get_current_thread_env();
-    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :)"));
+    // TODO it will be nice to have the name of the process to help the end-user to know which Process has been killed
+    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :) (file smx_context_cojava.c)"));
     THROWF(cancel_error, 0, "process cancelled");
   }
   else {
index ce2d01e..6a52403 100644 (file)
@@ -154,7 +154,11 @@ void smx_ctx_java_stop(smx_context_t context)
     context->iwannadie = 0;
     JNIEnv *env = get_current_thread_env();
     XBT_DEBUG("Gonnal launch Killed Error");
-    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", bprintf("Process %s killed: ", MSG_process_get_name( (msg_process_t)context)));
+    // TODO Adrien, if the process has not been created at the java layer, why should we raise the exception/error at the java level (this happens
+    // for instance during the migration process that creates at the C level two processes: one on the SRC node and one on the DST node, if the DST process is killed. 
+    // it is not required to raise an exception at the JAVA level, the low level should be able to manage such an issue correctly but this is not the case right now unfortunately ...
+    // TODO it will be nice to have the name of the process to help the end-user to know which Process has been killed
+    jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", bprintf("Process %s killed :) (file smx_context_java.c)", MSG_process_get_name( (msg_process_t)context) ));
     XBT_DEBUG("Trigger a cancel error at the C level");
     THROWF(cancel_error, 0, "process cancelled");
   } else {
index 2449821..9b21eb1 100644 (file)
@@ -171,8 +171,8 @@ static void MC_snapshot_add_region(mc_snapshot_t snapshot, int type,
 static void MC_get_memory_regions(mc_snapshot_t snapshot)
 {
 
-  void *start_heap = ((xbt_mheap_t) std_heap)->base;
-  void *end_heap = ((xbt_mheap_t) std_heap)->breakval;
+  void *start_heap = std_heap->base;
+  void *end_heap = std_heap->breakval;
   MC_snapshot_add_region(snapshot, 0, start_heap, start_heap,
                          (char *) end_heap - (char *) start_heap);
   snapshot->heap_bytes_used = mmalloc_get_bytes_used(std_heap);
index 89379e4..cfbf3f3 100644 (file)
@@ -159,7 +159,7 @@ static void update_comm_pattern(mc_comm_pattern_t comm_pattern, smx_action_t com
     comm_pattern->data_size = *(comm->comm.dst_buff_size);
     comm_pattern->data = xbt_malloc0(comm_pattern->data_size);
     addr_pointed = *(void **) comm->comm.src_buff;
-    if (addr_pointed > std_heap && addr_pointed < ((xbt_mheap_t) std_heap)->breakval)
+    if (addr_pointed > (void*) std_heap && addr_pointed < std_heap->breakval)
       memcpy(comm_pattern->data, addr_pointed, comm_pattern->data_size);
     else
       memcpy(comm_pattern->data, comm->comm.src_buff, comm_pattern->data_size);
@@ -183,7 +183,7 @@ void get_comm_pattern(xbt_dynar_t list, smx_simcall_t request, int call)
     pattern->data_size = pattern->comm->comm.src_buff_size;
     pattern->data = xbt_malloc0(pattern->data_size);
     addr_pointed = *(void **) pattern->comm->comm.src_buff;
-    if (addr_pointed > std_heap && addr_pointed < ((xbt_mheap_t) std_heap)->breakval)
+    if (addr_pointed > (void*) std_heap && addr_pointed < std_heap->breakval)
       memcpy(pattern->data, addr_pointed, pattern->data_size);
     else
       memcpy(pattern->data, pattern->comm->comm.src_buff, pattern->data_size);
index 377e3cb..a9b8dd4 100644 (file)
@@ -436,8 +436,8 @@ int mmalloc_compare_heap(mc_snapshot_t snapshot1, mc_snapshot_t snapshot2)
   mc_mem_region_t heap_region2 = snapshot2->regions[0];
 
   // This is in snapshot do not use them directly:
-  malloc_info* heapinfos1 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot1, MC_NO_PROCESS_INDEX);
-  malloc_info* heapinfos2 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot2, MC_NO_PROCESS_INDEX);
+  malloc_info* heapinfos1 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot1, MC_NO_PROCESS_INDEX);
+  malloc_info* heapinfos2 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot2, MC_NO_PROCESS_INDEX);
 
   while (i1 <= state->heaplimit) {
 
@@ -1136,8 +1136,8 @@ int compare_heap_area(int process_index, void *area1, void *area2, mc_snapshot_t
 
   int match_pairs = 0;
 
-  malloc_info* heapinfos1 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot1, process_index);
-  malloc_info* heapinfos2 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot2, process_index);
+  malloc_info* heapinfos1 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot1, process_index);
+  malloc_info* heapinfos2 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot2, process_index);
 
   malloc_info heapinfo_temp1, heapinfo_temp2;
 
index 48f7704..8ee35e5 100644 (file)
@@ -73,18 +73,16 @@ void MC_ignore_heap(void *address, size_t size)
 
   region->block =
       ((char *) address -
-       (char *) ((xbt_mheap_t) std_heap)->heapbase) / BLOCKSIZE + 1;
+       (char *) std_heap->heapbase) / BLOCKSIZE + 1;
 
-  if (((xbt_mheap_t) std_heap)->heapinfo[region->block].type == 0) {
+  if (std_heap->heapinfo[region->block].type == 0) {
     region->fragment = -1;
-    ((xbt_mheap_t) std_heap)->heapinfo[region->block].busy_block.ignore++;
+    std_heap->heapinfo[region->block].busy_block.ignore++;
   } else {
     region->fragment =
-        ((uintptr_t) (ADDR2UINT(address) % (BLOCKSIZE))) >> ((xbt_mheap_t)
-                                                             std_heap)->
+        ((uintptr_t) (ADDR2UINT(address) % (BLOCKSIZE))) >> std_heap->
         heapinfo[region->block].type;
-    ((xbt_mheap_t) std_heap)->heapinfo[region->block].busy_frag.ignore[region->
-                                                                       fragment]++;
+    std_heap->heapinfo[region->block].busy_frag.ignore[region->fragment]++;
   }
 
   if (mc_heap_comparison_ignore == NULL) {
@@ -226,7 +224,8 @@ static void mc_ignore_local_variable_in_scope(const char *var_name,
   // Processing of direct variables:
 
   // If the current subprogram matche the given name:
-  if (subprogram_name == NULL || strcmp(subprogram_name, subprogram->name) == 0) {
+  if (!subprogram_name ||
+      subprogram->name && strcmp(subprogram_name, subprogram->name) == 0) {
 
     // Try to find the variable and remove it:
     int start = 0;
@@ -330,7 +329,7 @@ void MC_new_stack_area(void *stack, smx_process_t process, void *context, size_t
   region->size = size;
   region->block =
       ((char *) stack -
-       (char *) ((xbt_mheap_t) std_heap)->heapbase) / BLOCKSIZE + 1;
+       (char *) std_heap->heapbase) / BLOCKSIZE + 1;
 #ifdef HAVE_SMPI
   if (smpi_privatize_global_variables && process) {
     region->process_index = smpi_process_index_of_smx_process(process);
index c629cf5..77d9284 100644 (file)
@@ -14,8 +14,8 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_memory, mc,
                                 "Logging specific to MC (memory)");
 
 /* Pointers to each of the heap regions to use */
-void *std_heap = NULL;          /* memory erased each time the MC stuff rollbacks to the beginning. Almost everything goes here */
-void *mc_heap = NULL;           /* memory persistent over the MC rollbacks. Only MC stuff should go there */
+xbt_mheap_t std_heap = NULL;          /* memory erased each time the MC stuff rollbacks to the beginning. Almost everything goes here */
+xbt_mheap_t mc_heap = NULL;           /* memory persistent over the MC rollbacks. Only MC stuff should go there */
 
 /* Initialize the model-checker memory subsystem */
 /* It creates the two heap regions: std_heap and mc_heap */
index 60ff820..a231848 100644 (file)
@@ -176,7 +176,7 @@ mc_mem_region_t mc_region_new_sparse(int type, void *start_addr, void* permanent
 
   uint64_t* pagemap = NULL;
   if (_sg_mc_soft_dirty && mc_model_checker->parent_snapshot) {
-      pagemap = (uint64_t*) mmalloc_no_memset((xbt_mheap_t) mc_heap, sizeof(uint64_t) * page_count);
+      pagemap = (uint64_t*) mmalloc_no_memset(mc_heap, sizeof(uint64_t) * page_count);
       mc_read_pagemap(pagemap, mc_page_number(NULL, permanent_addr), page_count);
   }
 
@@ -185,7 +185,7 @@ mc_mem_region_t mc_region_new_sparse(int type, void *start_addr, void* permanent
     ref_reg==NULL ? NULL : ref_reg->page_numbers);
 
   if(pagemap) {
-    mfree((xbt_mheap_t) mc_heap, pagemap);
+    mfree(mc_heap, pagemap);
   }
   return new_reg;
 }
@@ -200,7 +200,7 @@ void mc_region_restore_sparse(mc_mem_region_t reg, mc_mem_region_t ref_reg)
 
   // Read soft-dirty bits if necessary in order to know which pages have changed:
   if (_sg_mc_soft_dirty && mc_model_checker->parent_snapshot) {
-    pagemap = (uint64_t*) mmalloc_no_memset((xbt_mheap_t) mc_heap, sizeof(uint64_t) * page_count);
+    pagemap = (uint64_t*) mmalloc_no_memset(mc_heap, sizeof(uint64_t) * page_count);
     mc_read_pagemap(pagemap, mc_page_number(NULL, reg->permanent_addr), page_count);
   }
 
index c06613e..09413c4 100644 (file)
@@ -319,8 +319,8 @@ void MC_print_statistics(mc_stats_t);
 /* Normally the system should operate in std, for switching to raw mode */
 /* you must wrap the code between MC_SET_RAW_MODE and MC_UNSET_RAW_MODE */
 
-extern void *std_heap;
-extern void *mc_heap;
+extern xbt_mheap_t std_heap;
+extern xbt_mheap_t mc_heap;
 
 
 /* FIXME: Horrible hack! because the mmalloc library doesn't provide yet of */
@@ -755,7 +755,7 @@ static inline __attribute__ ((always_inline))
   void* mc_snapshot_get_heap_end(mc_snapshot_t snapshot) {
   if(snapshot==NULL)
       xbt_die("snapshot is NULL");
-  void** addr = &((xbt_mheap_t)std_heap)->breakval;
+  void** addr = &(std_heap->breakval);
   return mc_snapshot_read_pointer(addr, snapshot, MC_ANY_PROCESS_INDEX);
 }
 
@@ -818,4 +818,3 @@ void* mc_snapshot_read_pointer_region(void* addr, mc_mem_region_t region)
 SG_END_DECL()
 
 #endif
-
index 64c3a49..e010913 100644 (file)
@@ -371,12 +371,13 @@ static int migration_rx_fun(int argc, char *argv[])
 
   char *finalize_task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 3);
 
+  int ret = 0; 
   for (;;) {
     msg_task_t task = NULL;
-    MSG_task_recv(&task, ms->mbox);
+    ret = MSG_task_recv(&task, ms->mbox);
     {
       double received ;
-      if (task)
+      if (ret == MSG_OK)
        received = MSG_task_get_data_size(task);
       else{
        // An error occured, clean the code and return
@@ -401,36 +402,58 @@ static int migration_rx_fun(int argc, char *argv[])
   // Here Stage 1, 2  and 3 have been performed. 
   // Hence complete the migration
 
+  // Copy the reference to the vm (if SRC crashes now, do_migration will free ms)
+  // This is clearly ugly but I (Adrien) need more time to do something cleaner (actually we should copy the whole ms structure at the begining and free it at the end of each function)
+   msg_vm_t vm = ms->vm; 
+   msg_host_t src_pm = ms->src_pm; 
+   msg_host_t dst_pm = ms-> dst_pm; 
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
+
 // TODO: we have an issue, if the DST node is turning off during the three next calls, then the VM is in an inconsistent state
 // I should check with Takahiro in order to make this portion of code atomic
   /* deinstall the current affinity setting for the CPU */
-  simcall_vm_set_affinity(ms->vm, ms->src_pm, 0);
+  simcall_vm_set_affinity(vm, src_pm, 0);
 
   /* Update the vm location */
-  simcall_vm_migrate(ms->vm, ms->dst_pm);
+  simcall_vm_migrate(vm, dst_pm);
   
   /* Resume the VM */
-  simcall_vm_resume(ms->vm);
+  simcall_vm_resume(vm);
 
   /* install the affinity setting of the VM on the destination pm */
   {
-    msg_host_priv_t priv = msg_host_resource_priv(ms->vm);
 
-    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *) ms->dst_pm, sizeof(msg_host_t));
-    simcall_vm_set_affinity(ms->vm, ms->dst_pm, affinity_mask);
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(ms->dst_pm), MSG_host_get_name(ms->vm));
+    unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *)dst_pm, sizeof(msg_host_t));
+    simcall_vm_set_affinity(vm, dst_pm, affinity_mask);
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(dst_pm), MSG_host_get_name(vm));
   }
 
   {
-    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
 
+   // Now the VM is running on the new host (the migration is completed) (even if the SRC crash)
+   msg_host_priv_t priv = msg_host_resource_priv(vm);
+   priv->is_migrating = 0;
+   XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", ms->vm->key, ms->src_pm->key, ms->dst_pm->key);
+   #ifdef HAVE_TRACING
+    TRACE_msg_vm_change_host(ms->vm, ms->src_pm, ms->dst_pm);
+   #endif
+  
+  }
+  // Inform the SRC that the migration has been correctly performed
+  {
+    char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
     msg_task_t task = MSG_task_create(task_name, 0, 0, NULL);
     msg_error_t ret = MSG_task_send(task, ms->mbox_ctl);
     // xbt_assert(ret == MSG_OK);
     if(ret == MSG_HOST_FAILURE){
-    // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+    // The DST has crashed, this is a problem has the VM since we are not sure whether SRC is considering that the VM has been correctly migrated on the DST node
+    // TODO What does it mean ? What should we do ? 
      MSG_task_destroy(task);
-    }
+    } else if(ret == MSG_TRANSFER_FAILURE){
+    // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+       MSG_task_destroy(task);
+     }
+
     xbt_free(task_name);
   }
 
@@ -858,15 +881,20 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
     ret = MSG_task_send_bounded(task, mbox, mig_speed);
   else
     ret = MSG_task_send(task, mbox);
+
 //  xbt_assert(ret == MSG_OK);
-    xbt_free(task_name);
-    if(ret == MSG_HOST_FAILURE){
-       THROWF(host_error, 0, "host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
-       //XBT_INFO("host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+  xbt_free(task_name);
+  if(ret == MSG_HOST_FAILURE){
+       //XBT_INFO("SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        MSG_task_destroy(task);
-       return; 
-      
-     }
+       THROWF(host_error, 0, "SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+   }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+  }
+//else
+//   XBT_INFO("Ret != FAILURE !!!!"); 
 #endif
 
   double clock_end = MSG_get_clock();
@@ -897,12 +925,18 @@ static void send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_p
 //      xbt_assert(ret == MSG_OK);
       xbt_free(task_name);
       if(ret == MSG_HOST_FAILURE){
-       THROWF(host_error, 0, "host failed during migration of VM %s (stage 3)", sg_host_name(vm));
-       //XBT_INFO("host failed during migration of %s (stage 3)", sg_host_name(vm));
+       //XBT_INFO("SRC host failed during migration of %s (stage 3)", sg_host_name(vm));
+       MSG_task_destroy(task);
+       THROWF(host_error, 0, "SRC host failed during migration of VM %s (stage 3)", sg_host_name(vm));
         // The owner of the task did not change so destroy the task 
+       return; 
+      }else if(ret == MSG_TRANSFER_FAILURE){
+       //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        MSG_task_destroy(task);
+       THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
        return; 
-      }
+     }
+
     }
   }
 #endif
@@ -936,7 +970,6 @@ static double send_stage1(struct migration_session *ms,
       datasize = remaining;
 
     remaining -= datasize;
-
     send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, datasize, ms->mbox, 1, 0, mig_speed, xfer_cpu_overhead);
     double computed = lookup_computed_flop_counts(ms->vm, 1, 0);
     computed_total += computed;
@@ -982,6 +1015,8 @@ static int migration_tx_fun(int argc, char *argv[])
   const double xfer_cpu_overhead = params.xfer_cpu_overhead;
   const double dpt_cpu_overhead = params.dpt_cpu_overhead;
 
+  msg_vm_t vm=ms->vm; 
+
   double remaining_size = ramsize + devsize;
 
   double max_downtime = params.max_downtime;
@@ -999,10 +1034,10 @@ static int migration_tx_fun(int argc, char *argv[])
     XBT_WARN("migrate a VM, but ramsize is zero");
 
 
-  XBT_INFO("mig-stage1: remaining_size %f", remaining_size);
+  XBT_DEBUG("mig-stage1: remaining_size %f", remaining_size);
 
   /* Stage1: send all memory pages to the destination. */
-  start_dirty_page_tracking(ms->vm);
+  start_dirty_page_tracking(vm);
 
   double computed_during_stage1 = 0;
   if (!skip_stage1) {
@@ -1014,9 +1049,9 @@ static int migration_tx_fun(int argc, char *argv[])
     TRY{
        computed_during_stage1 = send_stage1(ms, ramsize, mig_speed, xfer_cpu_overhead, dp_rate, dp_cap, dpt_cpu_overhead);
     } CATCH_ANONYMOUS{
-      //hostfailure
-     // Stop the dirty page tracking an return (there is no memory space to release) 
-      stop_dirty_page_tracking(ms->vm);
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
+      // Stop the dirty page tracking an return (there is no memory space to release) 
+      stop_dirty_page_tracking(vm);
       return 0; 
     }
     remaining_size -= ramsize;
@@ -1024,7 +1059,7 @@ static int migration_tx_fun(int argc, char *argv[])
     double clock_post_send = MSG_get_clock();
     double bandwidth = ramsize / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
   }
 
 
@@ -1050,7 +1085,7 @@ static int migration_tx_fun(int argc, char *argv[])
       updated_size = get_updated_size(computed, dp_rate, dp_cap);
     }
 
-    XBT_INFO("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
+    XBT_DEBUG("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
         stage2_round, updated_size, computed_during_stage1, dp_rate, dp_cap);
 
 
@@ -1065,7 +1100,7 @@ static int migration_tx_fun(int argc, char *argv[])
     {
       remaining_size += updated_size;
 
-      XBT_INFO("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
+      XBT_DEBUG("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
           remaining_size, (remaining_size < threshold) ? "<" : ">", threshold);
 
       if (remaining_size < threshold)
@@ -1076,16 +1111,16 @@ static int migration_tx_fun(int argc, char *argv[])
     TRY{
       send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, updated_size, ms->mbox, 2, stage2_round, mig_speed, xfer_cpu_overhead);
     }CATCH_ANONYMOUS{
-      //hostfailure
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
       // Stop the dirty page tracking an return (there is no memory space to release) 
-      stop_dirty_page_tracking(ms->vm);
+      stop_dirty_page_tracking(vm);
       return 0; 
     }
     double clock_post_send = MSG_get_clock();
 
     double bandwidth = updated_size / (clock_post_send - clock_prev_send);
     threshold = get_threshold_value(bandwidth, max_downtime);
-    XBT_INFO("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
+    XBT_DEBUG("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
 
 
     remaining_size -= updated_size;
@@ -1095,16 +1130,16 @@ static int migration_tx_fun(int argc, char *argv[])
 
 stage3:
   /* Stage3: stop the VM and copy the rest of states. */
-  XBT_INFO("mig-stage3: remaining_size %f", remaining_size);
-  simcall_vm_suspend(ms->vm);
-  stop_dirty_page_tracking(ms->vm);
+  XBT_DEBUG("mig-stage3: remaining_size %f", remaining_size);
+  simcall_vm_suspend(vm);
+  stop_dirty_page_tracking(vm);
  
  TRY{
     send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, remaining_size, ms->mbox, 3, 0, mig_speed, xfer_cpu_overhead);
   }CATCH_ANONYMOUS{
-      //hostfailure
+      //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
       // Stop the dirty page tracking an return (there is no memory space to release) 
-      simcall_vm_resume(ms->vm);
+      simcall_vm_resume(vm);
       return 0; 
     }
   
@@ -1117,7 +1152,7 @@ stage3:
 
 
 
-static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
+static int do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
 {
   struct migration_session *ms = xbt_new(struct migration_session, 1);
   ms->vm = vm;
@@ -1149,8 +1184,11 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
 
   /* wait until the migration have finished or on error has occured */
   {
+    XBT_DEBUG("wait for reception of the final ACK (i.e. migration has been correctly performed");
     msg_task_t task = NULL;
-    msg_error_t ret = MSG_task_recv(&task, ms->mbox_ctl);
+    msg_error_t ret = MSG_TIMEOUT; 
+    while (ret == MSG_TIMEOUT && MSG_host_is_on(dst_pm)) //Wait while you receive the message o
+     ret = MSG_task_receive_with_timeout(&task, ms->mbox_ctl, 10);
 
     xbt_free(ms->mbox_ctl);
     xbt_free(ms->mbox);
@@ -1160,13 +1198,20 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
     if(ret == MSG_HOST_FAILURE){
         // Note that since the communication failed, the owner did not change and the task should be destroyed on the other side.
         // Hence, just throw the execption
-       THROWF(host_error, 0, "DST host failed during the migration of %s", sg_host_name(vm));
+        //XBT_INFO("SRC crashes, throw an exception (m-control)");
+        return -1; 
     } 
+    else if((ret == MSG_TRANSFER_FAILURE) || (ret == MSG_TIMEOUT)){ // MSG_TIMEOUT here means that MSG_host_is_avail() returned false.
+        //XBT_INFO("DST crashes, throw an exception (m-control)");
+        return -2;  
+    }
 
+    
     char *expected_task_name = get_mig_task_name(vm, src_pm, dst_pm, 4);
     xbt_assert(strcmp(task->name, expected_task_name) == 0);
     xbt_free(expected_task_name);
     MSG_task_destroy(task);
+    return 0; 
   }
 }
 
@@ -1214,19 +1259,25 @@ void MSG_vm_migrate(msg_vm_t vm, msg_host_t new_pm)
   msg_host_priv_t priv = msg_host_resource_priv(vm);
   priv->is_migrating = 1;
 
-  TRY{
-   do_migration(vm, old_pm, new_pm);
-  } CATCH_ANONYMOUS{
-   RETHROW; 
-   // TODO clean the code Adrien
+  {
+   
+    int ret = do_migration(vm, old_pm, new_pm); 
+    if (ret == -1){
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "SRC host failed during migration");
+    }
+    else if(ret == -2){ 
+     priv->is_migrating = 0;
+     THROWF(host_error, 0, "DST host failed during migration");
+    }
   }
-  priv->is_migrating = 0;
-
-  XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
 
-  #ifdef HAVE_TRACING
-  TRACE_msg_vm_change_host(vm, old_pm, new_pm);
-  #endif
+  // This part is done in the RX code, to handle the corner case where SRC can crash just at the end of the migration process
+  // In that case, the VM has been already assigned to the DST node.
+  //XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
+  //#ifdef HAVE_TRACING
+  //TRACE_msg_vm_change_host(vm, old_pm, new_pm);
+  //#endif
 }
 
 
@@ -1372,8 +1423,8 @@ void MSG_vm_set_affinity(msg_vm_t vm, msg_host_t pm, unsigned long mask)
 
   msg_host_t pm_now = MSG_vm_get_pm(vm);
   if (pm_now == pm) {
-    XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
     simcall_vm_set_affinity(vm, pm, mask);
   } else
-    XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+    XBT_DEBUG("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
 }
index 23b08df..df53bd9 100644 (file)
@@ -5,6 +5,7 @@
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
+#include <math.h>
 #include "smx_private.h"
 #include "xbt/parmap.h"
 #include "xbt/dynar.h"
@@ -34,8 +35,16 @@ static xbt_os_thread_key_t raw_worker_id_key; /* thread-specific storage for the
 xbt_os_timer_t round_time;
 double par_time,seq_time;
 double par_ratio,seq_ratio;
+int reached_seq_limit, reached_par_limit;
 static unsigned int par_proc_that_ran = 0,seq_proc_that_ran = 0;  /* Counters of processes that have run in SCHED_ROUND_LIMIT scheduling rounds */
-static unsigned int seq_sched_round, par_sched_round; /* Amount of SR that ran serial/parallel*/
+static unsigned int seq_sched_round=0, par_sched_round=0; /* Amount of SR that ran serial/parallel*/
+/*Varables used to calculate running variance and mean*/
+double prev_avg_par_proc=0,prev_avg_seq_proc=0;
+double delta=0;
+double s_par_proc=0,s_seq_proc=0; /*Standard deviation of number of processes computed in par/seq during the current simulation*/
+double avg_par_proc=0,sd_par_proc=0;
+double avg_seq_proc=0,sd_seq_proc=0;
+double par_window=4294967296,seq_window=0; /*par_window is initially 1<<32*/
 #endif
 
 static unsigned long raw_process_index = 0;   /* index of the next process to run in the
@@ -216,10 +225,6 @@ static unsigned int sr_count = 0;
 static xbt_os_timer_t timer;
 #endif
 
-#ifdef ADAPTIVE_THRESHOLD
-int reached_seq_limit, reached_par_limit;
-#endif
-
 static void smx_ctx_raw_wrapper(smx_ctx_raw_t context);
 static int smx_ctx_raw_factory_finalize(smx_context_factory_t *factory);
 static smx_context_t smx_ctx_raw_create_context(xbt_main_func_t code, int argc,
@@ -604,52 +609,77 @@ static void smx_ctx_raw_runall_parallel(void)
 static void smx_ctx_raw_runall(void)
 {
   unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run);
+  unsigned long threshold = SIMIX_context_get_parallel_threshold();
   reached_seq_limit = (seq_sched_round % SCHED_ROUND_LIMIT == 0); 
   reached_par_limit = (par_sched_round % SCHED_ROUND_LIMIT == 0);
 
-  if(reached_par_limit){
-    par_sched_round = 0;
+  if(reached_seq_limit && reached_par_limit){
     par_ratio = (par_proc_that_ran != 0) ? (par_time / (double)par_proc_that_ran) : 0;
-    par_time = 0; par_proc_that_ran = 0;
-  }
-
-  if(reached_seq_limit){
-    seq_sched_round = 0;
     seq_ratio = (seq_proc_that_ran != 0) ? (seq_time / (double)seq_proc_that_ran) : 0; 
-    seq_time = 0; seq_proc_that_ran = 0;
-  }
-
-  if(reached_seq_limit && reached_par_limit){
     if(seq_ratio > par_ratio){
-        SIMIX_context_set_parallel_threshold(SIMIX_context_get_parallel_threshold() - 1);
+        if(nb_processes < avg_par_proc) {
+          threshold = (threshold>2) ? threshold - 1 : threshold ;
+          SIMIX_context_set_parallel_threshold(threshold);
+        }
     } else {
-        SIMIX_context_set_parallel_threshold(SIMIX_context_get_parallel_threshold() + 1);
+        if(nb_processes > avg_seq_proc){
+          SIMIX_context_set_parallel_threshold(threshold+1);
+        }
     }
   }
 
-  XBT_CRITICAL("Adaptive Algorithm. Parallel Threshold is: %d. Processes: %d", SIMIX_context_get_parallel_threshold(), nb_processes);
+  //XBT_CRITICAL("Thres: %d.", SIMIX_context_get_parallel_threshold());
+
   if (nb_processes >= SIMIX_context_get_parallel_threshold()) {
-    XBT_DEBUG("Runall // %lu", nb_processes);
     simix_global->context_factory->suspend = smx_ctx_raw_suspend_parallel;
-    xbt_os_cputimer_start(round_time);
-    smx_ctx_raw_runall_parallel();
-    xbt_os_cputimer_stop(round_time);
-    par_time += xbt_os_timer_elapsed(round_time);
-    par_proc_that_ran += nb_processes;
-    par_sched_round++;
+    if(nb_processes < par_window){ 
+      par_sched_round++;
+      xbt_os_cputimer_start(round_time);
+      smx_ctx_raw_runall_parallel();
+      xbt_os_cputimer_stop(round_time);
+      par_time += xbt_os_timer_elapsed(round_time);
+
+      prev_avg_par_proc = avg_par_proc;
+      delta = (nb_processes-avg_par_proc);
+      avg_par_proc = (par_sched_round==1) ? nb_processes : avg_par_proc + delta / (double) par_sched_round;
+
+      if(par_sched_round>=2){
+        s_par_proc = sd_par_proc + (nb_processes - prev_avg_par_proc)*delta; 
+        sd_par_proc = sqrt(s_par_proc/(par_sched_round-1));
+        par_window = avg_par_proc + sd_par_proc;
+      }else{
+        sd_par_proc = 0;
+      }
+
+      par_proc_that_ran += nb_processes;
+    } else{
+      smx_ctx_raw_runall_parallel();
+    }
   } else {
-    XBT_DEBUG("Runall serial %lu", nb_processes);
     simix_global->context_factory->suspend = smx_ctx_raw_suspend_serial;
-    xbt_os_cputimer_start(round_time);
-#ifdef TIME_BENCH_PER_SR
-    smx_ctx_raw_runall_serial(simix_global->process_to_run);
-#else
-    smx_ctx_raw_runall_serial();
-#endif
-    xbt_os_cputimer_stop(round_time);
-    seq_time += xbt_os_timer_elapsed(round_time);
-    seq_proc_that_ran += nb_processes;
-    seq_sched_round++;
+    if(nb_processes > seq_window){ 
+      seq_sched_round++;
+      xbt_os_cputimer_start(round_time);
+      smx_ctx_raw_runall_serial();
+      xbt_os_cputimer_stop(round_time);
+      seq_time += xbt_os_timer_elapsed(round_time);
+
+      prev_avg_seq_proc = avg_seq_proc;
+      delta = (nb_processes-avg_seq_proc);
+      avg_seq_proc = (seq_sched_round==1) ? nb_processes : avg_seq_proc + delta / (double) seq_sched_round;
+
+      if(seq_sched_round>=2){
+        s_seq_proc = sd_seq_proc + (nb_processes - prev_avg_seq_proc)*delta; 
+        sd_seq_proc = sqrt(s_seq_proc/(seq_sched_round-1));
+        seq_window = avg_seq_proc - sd_seq_proc;
+      } else {
+        sd_seq_proc = 0;
+      }
+
+      seq_proc_that_ran += nb_processes;
+    } else {
+      smx_ctx_raw_runall_serial();
+    }
   }
 }
 
index a18c9b2..d99d4af 100644 (file)
@@ -40,6 +40,7 @@ public class SleepHostOff extends Process{
           } catch (HostFailureException e) {
             Msg.info("catch HostException");
             e.printStackTrace();
+            break; //Break is needed to finalize the endless loop 
           }
         }
       }