ADD_TESH_FACTORIES(mc-bugged1 "ucontext;raw" --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1.tesh)
ADD_TESH_FACTORIES(mc-bugged2 "ucontext;raw" --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged2.tesh)
IF(CONTEXT_UCONTEXT AND PROCESSOR_x86_64) # liveness model-checking works only on 64bits (for now ...)
- ADD_TESH(mc-bugged1-liveness-ucontext --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness.tesh)
- ADD_TESH(mc-bugged1-liveness-ucontext-sparse --cfg contexts/factory:ucontext --cfg model-check/sparse-checkpoint:yes --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_sparse.tesh)
- ADD_TESH(mc-bugged1-liveness-visited-ucontext --cfg contexts/factory:ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited.tesh)
- ADD_TESH(mc-bugged1-liveness-visited-ucontext-sparse --cfg contexts/factory:ucontext --cfg model-check/sparse-checkpoint:yes --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited_sparse.tesh)
+ ADD_TESH(mc-bugged1-liveness-ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness.tesh)
+ ADD_TESH(mc-bugged1-liveness-ucontext-sparse --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_sparse.tesh)
+ ADD_TESH(mc-bugged1-liveness-visited-ucontext --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited.tesh)
+ ADD_TESH(mc-bugged1-liveness-visited-ucontext-sparse --setenv bindir=${CMAKE_BINARY_DIR}/examples/msg/mc --cd ${CMAKE_HOME_DIRECTORY}/examples/msg/mc bugged1_liveness_visited_sparse.tesh)
ENDIF()
ENDIF()
import org.simgrid.msg.Host;
import org.simgrid.msg.HostNotFoundException;
+import org.simgrid.msg.HostFailureException;
import org.simgrid.msg.Msg;
import org.simgrid.msg.VM;
return this.currentLoad;
}
- public void migrate(Host host){
+ public void migrate(Host host) throws HostFailureException {
Msg.info("Start migration of VM " + this.getName() + " to " + host.getName());
Msg.info(" currentLoad:" + this.currentLoad + "/ramSize:" + this.ramsize + "/dpIntensity:" + this.dpIntensity + "/remaining:" + this.daemon.getRemaining());
- super.migrate(host);
+ try{
+ super.migrate(host);
+ } catch (Exception e){
+ Msg.info("Something wrong during the live migration of VM "+this.getName());
+ throw new HostFailureException();
+ }
this.setLoad(this.currentLoad); //Fixed the fact that setBound is not propagated to the new node.
Msg.info("End of migration of VM " + this.getName() + " to node " + host.getName());
}
! expect signal SIGABRT
! timeout 60
-$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=contexts/stack_size:256
+$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=contexts/stack_size:256 --cfg=model-check/sparse-checkpoint:yes
> [ 0.000000] (0:@) Configuration change: Set 'model-check' to '1'
> [ 0.000000] (0:@) Configuration change: Set 'model-check/sparse-checkpoint' to 'yes'
> [ 0.000000] (0:@) Check the liveness property promela_bugged1_liveness
! expect signal SIGABRT
! timeout 90
-$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness_visited.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=model-check/visited:100 --cfg=contexts/stack_size:256
+$ ${bindir:=.}/bugged1_liveness ${srcdir:=.}/../../platforms/platform.xml ${srcdir:=.}/deploy_bugged1_liveness_visited.xml --cfg=model-check:1 "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" --cfg=contexts/factory:ucontext --cfg=model-check/visited:100 --cfg=contexts/stack_size:256 --cfg=model-check/sparse-checkpoint:yes
> [ 0.000000] (0:@) Configuration change: Set 'model-check' to '1'
> [ 0.000000] (0:@) Configuration change: Set 'model-check/visited' to '100'
> [ 0.000000] (0:@) Configuration change: Set 'model-check/sparse-checkpoint' to 'yes'
TRY{
MSG_vm_migrate(vm,host);
} CATCH_ANONYMOUS{
+ XBT_INFO("CATCH EXCEPTION MIGRATION");
jxbt_throw_host_failure(env, (char*)"during migration");
}
}
/**
* Invoke native migration routine
*/
- public native void internalmig(Host destination);
+ public native void internalmig(Host destination) throws Exception; // TODO add throws DoubleMigrationException (i.e. when you call migrate on a VM that is already migrating);
+
/** Change the host on which all processes are running
* (pre-copy is implemented)
*/
- public void migrate(Host destination){
- this.internalmig(destination);
- // TODO we should test whether the migration has been correctly finalized.
- // If and only if it is ok, then we should change the currentHost value.
+ public void migrate(Host destination) throws HostFailureException{
+ try {
+ Msg.info("Migrate begins");
+ this.internalmig(destination);
+ Msg.info("Migrate ends");
+ } catch (Exception e){
+ Msg.info("an exception occurs during the migration of VM "+this.getName());
+ throw new HostFailureException();
+ }
+ // If the migration correcly returned, then we should change the currentHost value.
this.currentHost = destination;
}
if (context->iwannadie) {
context->iwannadie = 0;
JNIEnv *env = get_current_thread_env();
- jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :)"));
+ // TODO it will be nice to have the name of the process to help the end-user to know which Process has been killed
+ jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", xbt_strdup("Process killed :) (file smx_context_cojava.c)"));
THROWF(cancel_error, 0, "process cancelled");
}
else {
context->iwannadie = 0;
JNIEnv *env = get_current_thread_env();
XBT_DEBUG("Gonnal launch Killed Error");
- jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", bprintf("Process %s killed: ", MSG_process_get_name( (msg_process_t)context)));
+ // TODO Adrien, if the process has not been created at the java layer, why should we raise the exception/error at the java level (this happens
+ // for instance during the migration process that creates at the C level two processes: one on the SRC node and one on the DST node, if the DST process is killed.
+ // it is not required to raise an exception at the JAVA level, the low level should be able to manage such an issue correctly but this is not the case right now unfortunately ...
+ // TODO it will be nice to have the name of the process to help the end-user to know which Process has been killed
+ jxbt_throw_by_name(env, "org/simgrid/msg/ProcessKilledError", bprintf("Process %s killed :) (file smx_context_java.c)", MSG_process_get_name( (msg_process_t)context) ));
XBT_DEBUG("Trigger a cancel error at the C level");
THROWF(cancel_error, 0, "process cancelled");
} else {
static void MC_get_memory_regions(mc_snapshot_t snapshot)
{
- void *start_heap = ((xbt_mheap_t) std_heap)->base;
- void *end_heap = ((xbt_mheap_t) std_heap)->breakval;
+ void *start_heap = std_heap->base;
+ void *end_heap = std_heap->breakval;
MC_snapshot_add_region(snapshot, 0, start_heap, start_heap,
(char *) end_heap - (char *) start_heap);
snapshot->heap_bytes_used = mmalloc_get_bytes_used(std_heap);
comm_pattern->data_size = *(comm->comm.dst_buff_size);
comm_pattern->data = xbt_malloc0(comm_pattern->data_size);
addr_pointed = *(void **) comm->comm.src_buff;
- if (addr_pointed > std_heap && addr_pointed < ((xbt_mheap_t) std_heap)->breakval)
+ if (addr_pointed > (void*) std_heap && addr_pointed < std_heap->breakval)
memcpy(comm_pattern->data, addr_pointed, comm_pattern->data_size);
else
memcpy(comm_pattern->data, comm->comm.src_buff, comm_pattern->data_size);
pattern->data_size = pattern->comm->comm.src_buff_size;
pattern->data = xbt_malloc0(pattern->data_size);
addr_pointed = *(void **) pattern->comm->comm.src_buff;
- if (addr_pointed > std_heap && addr_pointed < ((xbt_mheap_t) std_heap)->breakval)
+ if (addr_pointed > (void*) std_heap && addr_pointed < std_heap->breakval)
memcpy(pattern->data, addr_pointed, pattern->data_size);
else
memcpy(pattern->data, pattern->comm->comm.src_buff, pattern->data_size);
mc_mem_region_t heap_region2 = snapshot2->regions[0];
// This is in snapshot do not use them directly:
- malloc_info* heapinfos1 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot1, MC_NO_PROCESS_INDEX);
- malloc_info* heapinfos2 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot2, MC_NO_PROCESS_INDEX);
+ malloc_info* heapinfos1 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot1, MC_NO_PROCESS_INDEX);
+ malloc_info* heapinfos2 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot2, MC_NO_PROCESS_INDEX);
while (i1 <= state->heaplimit) {
int match_pairs = 0;
- malloc_info* heapinfos1 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot1, process_index);
- malloc_info* heapinfos2 = mc_snapshot_read_pointer(&((xbt_mheap_t)std_heap)->heapinfo, snapshot2, process_index);
+ malloc_info* heapinfos1 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot1, process_index);
+ malloc_info* heapinfos2 = mc_snapshot_read_pointer(&std_heap->heapinfo, snapshot2, process_index);
malloc_info heapinfo_temp1, heapinfo_temp2;
region->block =
((char *) address -
- (char *) ((xbt_mheap_t) std_heap)->heapbase) / BLOCKSIZE + 1;
+ (char *) std_heap->heapbase) / BLOCKSIZE + 1;
- if (((xbt_mheap_t) std_heap)->heapinfo[region->block].type == 0) {
+ if (std_heap->heapinfo[region->block].type == 0) {
region->fragment = -1;
- ((xbt_mheap_t) std_heap)->heapinfo[region->block].busy_block.ignore++;
+ std_heap->heapinfo[region->block].busy_block.ignore++;
} else {
region->fragment =
- ((uintptr_t) (ADDR2UINT(address) % (BLOCKSIZE))) >> ((xbt_mheap_t)
- std_heap)->
+ ((uintptr_t) (ADDR2UINT(address) % (BLOCKSIZE))) >> std_heap->
heapinfo[region->block].type;
- ((xbt_mheap_t) std_heap)->heapinfo[region->block].busy_frag.ignore[region->
- fragment]++;
+ std_heap->heapinfo[region->block].busy_frag.ignore[region->fragment]++;
}
if (mc_heap_comparison_ignore == NULL) {
// Processing of direct variables:
// If the current subprogram matche the given name:
- if (subprogram_name == NULL || strcmp(subprogram_name, subprogram->name) == 0) {
+ if (!subprogram_name ||
+ subprogram->name && strcmp(subprogram_name, subprogram->name) == 0) {
// Try to find the variable and remove it:
int start = 0;
region->size = size;
region->block =
((char *) stack -
- (char *) ((xbt_mheap_t) std_heap)->heapbase) / BLOCKSIZE + 1;
+ (char *) std_heap->heapbase) / BLOCKSIZE + 1;
#ifdef HAVE_SMPI
if (smpi_privatize_global_variables && process) {
region->process_index = smpi_process_index_of_smx_process(process);
"Logging specific to MC (memory)");
/* Pointers to each of the heap regions to use */
-void *std_heap = NULL; /* memory erased each time the MC stuff rollbacks to the beginning. Almost everything goes here */
-void *mc_heap = NULL; /* memory persistent over the MC rollbacks. Only MC stuff should go there */
+xbt_mheap_t std_heap = NULL; /* memory erased each time the MC stuff rollbacks to the beginning. Almost everything goes here */
+xbt_mheap_t mc_heap = NULL; /* memory persistent over the MC rollbacks. Only MC stuff should go there */
/* Initialize the model-checker memory subsystem */
/* It creates the two heap regions: std_heap and mc_heap */
uint64_t* pagemap = NULL;
if (_sg_mc_soft_dirty && mc_model_checker->parent_snapshot) {
- pagemap = (uint64_t*) mmalloc_no_memset((xbt_mheap_t) mc_heap, sizeof(uint64_t) * page_count);
+ pagemap = (uint64_t*) mmalloc_no_memset(mc_heap, sizeof(uint64_t) * page_count);
mc_read_pagemap(pagemap, mc_page_number(NULL, permanent_addr), page_count);
}
ref_reg==NULL ? NULL : ref_reg->page_numbers);
if(pagemap) {
- mfree((xbt_mheap_t) mc_heap, pagemap);
+ mfree(mc_heap, pagemap);
}
return new_reg;
}
// Read soft-dirty bits if necessary in order to know which pages have changed:
if (_sg_mc_soft_dirty && mc_model_checker->parent_snapshot) {
- pagemap = (uint64_t*) mmalloc_no_memset((xbt_mheap_t) mc_heap, sizeof(uint64_t) * page_count);
+ pagemap = (uint64_t*) mmalloc_no_memset(mc_heap, sizeof(uint64_t) * page_count);
mc_read_pagemap(pagemap, mc_page_number(NULL, reg->permanent_addr), page_count);
}
/* Normally the system should operate in std, for switching to raw mode */
/* you must wrap the code between MC_SET_RAW_MODE and MC_UNSET_RAW_MODE */
-extern void *std_heap;
-extern void *mc_heap;
+extern xbt_mheap_t std_heap;
+extern xbt_mheap_t mc_heap;
/* FIXME: Horrible hack! because the mmalloc library doesn't provide yet of */
void* mc_snapshot_get_heap_end(mc_snapshot_t snapshot) {
if(snapshot==NULL)
xbt_die("snapshot is NULL");
- void** addr = &((xbt_mheap_t)std_heap)->breakval;
+ void** addr = &(std_heap->breakval);
return mc_snapshot_read_pointer(addr, snapshot, MC_ANY_PROCESS_INDEX);
}
SG_END_DECL()
#endif
-
char *finalize_task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 3);
+ int ret = 0;
for (;;) {
msg_task_t task = NULL;
- MSG_task_recv(&task, ms->mbox);
+ ret = MSG_task_recv(&task, ms->mbox);
{
double received ;
- if (task)
+ if (ret == MSG_OK)
received = MSG_task_get_data_size(task);
else{
// An error occured, clean the code and return
// Here Stage 1, 2 and 3 have been performed.
// Hence complete the migration
+ // Copy the reference to the vm (if SRC crashes now, do_migration will free ms)
+ // This is clearly ugly but I (Adrien) need more time to do something cleaner (actually we should copy the whole ms structure at the begining and free it at the end of each function)
+ msg_vm_t vm = ms->vm;
+ msg_host_t src_pm = ms->src_pm;
+ msg_host_t dst_pm = ms-> dst_pm;
+ msg_host_priv_t priv = msg_host_resource_priv(vm);
+
// TODO: we have an issue, if the DST node is turning off during the three next calls, then the VM is in an inconsistent state
// I should check with Takahiro in order to make this portion of code atomic
/* deinstall the current affinity setting for the CPU */
- simcall_vm_set_affinity(ms->vm, ms->src_pm, 0);
+ simcall_vm_set_affinity(vm, src_pm, 0);
/* Update the vm location */
- simcall_vm_migrate(ms->vm, ms->dst_pm);
+ simcall_vm_migrate(vm, dst_pm);
/* Resume the VM */
- simcall_vm_resume(ms->vm);
+ simcall_vm_resume(vm);
/* install the affinity setting of the VM on the destination pm */
{
- msg_host_priv_t priv = msg_host_resource_priv(ms->vm);
- unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *) ms->dst_pm, sizeof(msg_host_t));
- simcall_vm_set_affinity(ms->vm, ms->dst_pm, affinity_mask);
- XBT_INFO("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(ms->dst_pm), MSG_host_get_name(ms->vm));
+ unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *)dst_pm, sizeof(msg_host_t));
+ simcall_vm_set_affinity(vm, dst_pm, affinity_mask);
+ XBT_DEBUG("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(dst_pm), MSG_host_get_name(vm));
}
{
- char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
+ // Now the VM is running on the new host (the migration is completed) (even if the SRC crash)
+ msg_host_priv_t priv = msg_host_resource_priv(vm);
+ priv->is_migrating = 0;
+ XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", ms->vm->key, ms->src_pm->key, ms->dst_pm->key);
+ #ifdef HAVE_TRACING
+ TRACE_msg_vm_change_host(ms->vm, ms->src_pm, ms->dst_pm);
+ #endif
+
+ }
+ // Inform the SRC that the migration has been correctly performed
+ {
+ char *task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
msg_task_t task = MSG_task_create(task_name, 0, 0, NULL);
msg_error_t ret = MSG_task_send(task, ms->mbox_ctl);
// xbt_assert(ret == MSG_OK);
if(ret == MSG_HOST_FAILURE){
- // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+ // The DST has crashed, this is a problem has the VM since we are not sure whether SRC is considering that the VM has been correctly migrated on the DST node
+ // TODO What does it mean ? What should we do ?
MSG_task_destroy(task);
- }
+ } else if(ret == MSG_TRANSFER_FAILURE){
+ // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
+ MSG_task_destroy(task);
+ }
+
xbt_free(task_name);
}
ret = MSG_task_send_bounded(task, mbox, mig_speed);
else
ret = MSG_task_send(task, mbox);
+
// xbt_assert(ret == MSG_OK);
- xbt_free(task_name);
- if(ret == MSG_HOST_FAILURE){
- THROWF(host_error, 0, "host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
- //XBT_INFO("host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+ xbt_free(task_name);
+ if(ret == MSG_HOST_FAILURE){
+ //XBT_INFO("SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
MSG_task_destroy(task);
- return;
-
- }
+ THROWF(host_error, 0, "SRC host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+ }else if(ret == MSG_TRANSFER_FAILURE){
+ //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+ MSG_task_destroy(task);
+ THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
+ }
+//else
+// XBT_INFO("Ret != FAILURE !!!!");
#endif
double clock_end = MSG_get_clock();
// xbt_assert(ret == MSG_OK);
xbt_free(task_name);
if(ret == MSG_HOST_FAILURE){
- THROWF(host_error, 0, "host failed during migration of VM %s (stage 3)", sg_host_name(vm));
- //XBT_INFO("host failed during migration of %s (stage 3)", sg_host_name(vm));
+ //XBT_INFO("SRC host failed during migration of %s (stage 3)", sg_host_name(vm));
+ MSG_task_destroy(task);
+ THROWF(host_error, 0, "SRC host failed during migration of VM %s (stage 3)", sg_host_name(vm));
// The owner of the task did not change so destroy the task
+ return;
+ }else if(ret == MSG_TRANSFER_FAILURE){
+ //XBT_INFO("DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
MSG_task_destroy(task);
+ THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", sg_host_name(vm), stage);
return;
- }
+ }
+
}
}
#endif
datasize = remaining;
remaining -= datasize;
-
send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, datasize, ms->mbox, 1, 0, mig_speed, xfer_cpu_overhead);
double computed = lookup_computed_flop_counts(ms->vm, 1, 0);
computed_total += computed;
const double xfer_cpu_overhead = params.xfer_cpu_overhead;
const double dpt_cpu_overhead = params.dpt_cpu_overhead;
+ msg_vm_t vm=ms->vm;
+
double remaining_size = ramsize + devsize;
double max_downtime = params.max_downtime;
XBT_WARN("migrate a VM, but ramsize is zero");
- XBT_INFO("mig-stage1: remaining_size %f", remaining_size);
+ XBT_DEBUG("mig-stage1: remaining_size %f", remaining_size);
/* Stage1: send all memory pages to the destination. */
- start_dirty_page_tracking(ms->vm);
+ start_dirty_page_tracking(vm);
double computed_during_stage1 = 0;
if (!skip_stage1) {
TRY{
computed_during_stage1 = send_stage1(ms, ramsize, mig_speed, xfer_cpu_overhead, dp_rate, dp_cap, dpt_cpu_overhead);
} CATCH_ANONYMOUS{
- //hostfailure
- // Stop the dirty page tracking an return (there is no memory space to release)
- stop_dirty_page_tracking(ms->vm);
+ //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
+ // Stop the dirty page tracking an return (there is no memory space to release)
+ stop_dirty_page_tracking(vm);
return 0;
}
remaining_size -= ramsize;
double clock_post_send = MSG_get_clock();
double bandwidth = ramsize / (clock_post_send - clock_prev_send);
threshold = get_threshold_value(bandwidth, max_downtime);
- XBT_INFO("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
+ XBT_DEBUG("actual bandwidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold);
}
updated_size = get_updated_size(computed, dp_rate, dp_cap);
}
- XBT_INFO("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
+ XBT_DEBUG("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f",
stage2_round, updated_size, computed_during_stage1, dp_rate, dp_cap);
{
remaining_size += updated_size;
- XBT_INFO("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
+ XBT_DEBUG("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
remaining_size, (remaining_size < threshold) ? "<" : ">", threshold);
if (remaining_size < threshold)
TRY{
send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, updated_size, ms->mbox, 2, stage2_round, mig_speed, xfer_cpu_overhead);
}CATCH_ANONYMOUS{
- //hostfailure
+ //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
// Stop the dirty page tracking an return (there is no memory space to release)
- stop_dirty_page_tracking(ms->vm);
+ stop_dirty_page_tracking(vm);
return 0;
}
double clock_post_send = MSG_get_clock();
double bandwidth = updated_size / (clock_post_send - clock_prev_send);
threshold = get_threshold_value(bandwidth, max_downtime);
- XBT_INFO("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
+ XBT_DEBUG("actual bandwidth %f, threshold %f", bandwidth / 1024 / 1024, threshold);
remaining_size -= updated_size;
stage3:
/* Stage3: stop the VM and copy the rest of states. */
- XBT_INFO("mig-stage3: remaining_size %f", remaining_size);
- simcall_vm_suspend(ms->vm);
- stop_dirty_page_tracking(ms->vm);
+ XBT_DEBUG("mig-stage3: remaining_size %f", remaining_size);
+ simcall_vm_suspend(vm);
+ stop_dirty_page_tracking(vm);
TRY{
send_migration_data(ms->vm, ms->src_pm, ms->dst_pm, remaining_size, ms->mbox, 3, 0, mig_speed, xfer_cpu_overhead);
}CATCH_ANONYMOUS{
- //hostfailure
+ //hostfailure (if you want to know whether this is the SRC or the DST please check directly in send_migration_data code)
// Stop the dirty page tracking an return (there is no memory space to release)
- simcall_vm_resume(ms->vm);
+ simcall_vm_resume(vm);
return 0;
}
-static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
+static int do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
{
struct migration_session *ms = xbt_new(struct migration_session, 1);
ms->vm = vm;
/* wait until the migration have finished or on error has occured */
{
+ XBT_DEBUG("wait for reception of the final ACK (i.e. migration has been correctly performed");
msg_task_t task = NULL;
- msg_error_t ret = MSG_task_recv(&task, ms->mbox_ctl);
+ msg_error_t ret = MSG_TIMEOUT;
+ while (ret == MSG_TIMEOUT && MSG_host_is_on(dst_pm)) //Wait while you receive the message o
+ ret = MSG_task_receive_with_timeout(&task, ms->mbox_ctl, 10);
xbt_free(ms->mbox_ctl);
xbt_free(ms->mbox);
if(ret == MSG_HOST_FAILURE){
// Note that since the communication failed, the owner did not change and the task should be destroyed on the other side.
// Hence, just throw the execption
- THROWF(host_error, 0, "DST host failed during the migration of %s", sg_host_name(vm));
+ //XBT_INFO("SRC crashes, throw an exception (m-control)");
+ return -1;
}
+ else if((ret == MSG_TRANSFER_FAILURE) || (ret == MSG_TIMEOUT)){ // MSG_TIMEOUT here means that MSG_host_is_avail() returned false.
+ //XBT_INFO("DST crashes, throw an exception (m-control)");
+ return -2;
+ }
+
char *expected_task_name = get_mig_task_name(vm, src_pm, dst_pm, 4);
xbt_assert(strcmp(task->name, expected_task_name) == 0);
xbt_free(expected_task_name);
MSG_task_destroy(task);
+ return 0;
}
}
msg_host_priv_t priv = msg_host_resource_priv(vm);
priv->is_migrating = 1;
- TRY{
- do_migration(vm, old_pm, new_pm);
- } CATCH_ANONYMOUS{
- RETHROW;
- // TODO clean the code Adrien
+ {
+
+ int ret = do_migration(vm, old_pm, new_pm);
+ if (ret == -1){
+ priv->is_migrating = 0;
+ THROWF(host_error, 0, "SRC host failed during migration");
+ }
+ else if(ret == -2){
+ priv->is_migrating = 0;
+ THROWF(host_error, 0, "DST host failed during migration");
+ }
}
- priv->is_migrating = 0;
-
- XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
- #ifdef HAVE_TRACING
- TRACE_msg_vm_change_host(vm, old_pm, new_pm);
- #endif
+ // This part is done in the RX code, to handle the corner case where SRC can crash just at the end of the migration process
+ // In that case, the VM has been already assigned to the DST node.
+ //XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", vm->key, old_pm->key, new_pm->key);
+ //#ifdef HAVE_TRACING
+ //TRACE_msg_vm_change_host(vm, old_pm, new_pm);
+ //#endif
}
msg_host_t pm_now = MSG_vm_get_pm(vm);
if (pm_now == pm) {
- XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+ XBT_DEBUG("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
simcall_vm_set_affinity(vm, pm, mask);
} else
- XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
+ XBT_DEBUG("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm));
}
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+#include <math.h>
#include "smx_private.h"
#include "xbt/parmap.h"
#include "xbt/dynar.h"
xbt_os_timer_t round_time;
double par_time,seq_time;
double par_ratio,seq_ratio;
+int reached_seq_limit, reached_par_limit;
static unsigned int par_proc_that_ran = 0,seq_proc_that_ran = 0; /* Counters of processes that have run in SCHED_ROUND_LIMIT scheduling rounds */
-static unsigned int seq_sched_round, par_sched_round; /* Amount of SR that ran serial/parallel*/
+static unsigned int seq_sched_round=0, par_sched_round=0; /* Amount of SR that ran serial/parallel*/
+/*Varables used to calculate running variance and mean*/
+double prev_avg_par_proc=0,prev_avg_seq_proc=0;
+double delta=0;
+double s_par_proc=0,s_seq_proc=0; /*Standard deviation of number of processes computed in par/seq during the current simulation*/
+double avg_par_proc=0,sd_par_proc=0;
+double avg_seq_proc=0,sd_seq_proc=0;
+double par_window=4294967296,seq_window=0; /*par_window is initially 1<<32*/
#endif
static unsigned long raw_process_index = 0; /* index of the next process to run in the
static xbt_os_timer_t timer;
#endif
-#ifdef ADAPTIVE_THRESHOLD
-int reached_seq_limit, reached_par_limit;
-#endif
-
static void smx_ctx_raw_wrapper(smx_ctx_raw_t context);
static int smx_ctx_raw_factory_finalize(smx_context_factory_t *factory);
static smx_context_t smx_ctx_raw_create_context(xbt_main_func_t code, int argc,
static void smx_ctx_raw_runall(void)
{
unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run);
+ unsigned long threshold = SIMIX_context_get_parallel_threshold();
reached_seq_limit = (seq_sched_round % SCHED_ROUND_LIMIT == 0);
reached_par_limit = (par_sched_round % SCHED_ROUND_LIMIT == 0);
- if(reached_par_limit){
- par_sched_round = 0;
+ if(reached_seq_limit && reached_par_limit){
par_ratio = (par_proc_that_ran != 0) ? (par_time / (double)par_proc_that_ran) : 0;
- par_time = 0; par_proc_that_ran = 0;
- }
-
- if(reached_seq_limit){
- seq_sched_round = 0;
seq_ratio = (seq_proc_that_ran != 0) ? (seq_time / (double)seq_proc_that_ran) : 0;
- seq_time = 0; seq_proc_that_ran = 0;
- }
-
- if(reached_seq_limit && reached_par_limit){
if(seq_ratio > par_ratio){
- SIMIX_context_set_parallel_threshold(SIMIX_context_get_parallel_threshold() - 1);
+ if(nb_processes < avg_par_proc) {
+ threshold = (threshold>2) ? threshold - 1 : threshold ;
+ SIMIX_context_set_parallel_threshold(threshold);
+ }
} else {
- SIMIX_context_set_parallel_threshold(SIMIX_context_get_parallel_threshold() + 1);
+ if(nb_processes > avg_seq_proc){
+ SIMIX_context_set_parallel_threshold(threshold+1);
+ }
}
}
- XBT_CRITICAL("Adaptive Algorithm. Parallel Threshold is: %d. Processes: %d", SIMIX_context_get_parallel_threshold(), nb_processes);
+ //XBT_CRITICAL("Thres: %d.", SIMIX_context_get_parallel_threshold());
+
if (nb_processes >= SIMIX_context_get_parallel_threshold()) {
- XBT_DEBUG("Runall // %lu", nb_processes);
simix_global->context_factory->suspend = smx_ctx_raw_suspend_parallel;
- xbt_os_cputimer_start(round_time);
- smx_ctx_raw_runall_parallel();
- xbt_os_cputimer_stop(round_time);
- par_time += xbt_os_timer_elapsed(round_time);
- par_proc_that_ran += nb_processes;
- par_sched_round++;
+ if(nb_processes < par_window){
+ par_sched_round++;
+ xbt_os_cputimer_start(round_time);
+ smx_ctx_raw_runall_parallel();
+ xbt_os_cputimer_stop(round_time);
+ par_time += xbt_os_timer_elapsed(round_time);
+
+ prev_avg_par_proc = avg_par_proc;
+ delta = (nb_processes-avg_par_proc);
+ avg_par_proc = (par_sched_round==1) ? nb_processes : avg_par_proc + delta / (double) par_sched_round;
+
+ if(par_sched_round>=2){
+ s_par_proc = sd_par_proc + (nb_processes - prev_avg_par_proc)*delta;
+ sd_par_proc = sqrt(s_par_proc/(par_sched_round-1));
+ par_window = avg_par_proc + sd_par_proc;
+ }else{
+ sd_par_proc = 0;
+ }
+
+ par_proc_that_ran += nb_processes;
+ } else{
+ smx_ctx_raw_runall_parallel();
+ }
} else {
- XBT_DEBUG("Runall serial %lu", nb_processes);
simix_global->context_factory->suspend = smx_ctx_raw_suspend_serial;
- xbt_os_cputimer_start(round_time);
-#ifdef TIME_BENCH_PER_SR
- smx_ctx_raw_runall_serial(simix_global->process_to_run);
-#else
- smx_ctx_raw_runall_serial();
-#endif
- xbt_os_cputimer_stop(round_time);
- seq_time += xbt_os_timer_elapsed(round_time);
- seq_proc_that_ran += nb_processes;
- seq_sched_round++;
+ if(nb_processes > seq_window){
+ seq_sched_round++;
+ xbt_os_cputimer_start(round_time);
+ smx_ctx_raw_runall_serial();
+ xbt_os_cputimer_stop(round_time);
+ seq_time += xbt_os_timer_elapsed(round_time);
+
+ prev_avg_seq_proc = avg_seq_proc;
+ delta = (nb_processes-avg_seq_proc);
+ avg_seq_proc = (seq_sched_round==1) ? nb_processes : avg_seq_proc + delta / (double) seq_sched_round;
+
+ if(seq_sched_round>=2){
+ s_seq_proc = sd_seq_proc + (nb_processes - prev_avg_seq_proc)*delta;
+ sd_seq_proc = sqrt(s_seq_proc/(seq_sched_round-1));
+ seq_window = avg_seq_proc - sd_seq_proc;
+ } else {
+ sd_seq_proc = 0;
+ }
+
+ seq_proc_that_ran += nb_processes;
+ } else {
+ smx_ctx_raw_runall_serial();
+ }
}
}
} catch (HostFailureException e) {
Msg.info("catch HostException");
e.printStackTrace();
+ break; //Break is needed to finalize the endless loop
}
}
}