return __MSG_vm_is_state(vm, SURF_VM_STATE_RUNNING);
}
-#if 0
/** @brief Returns whether the given VM is currently migrating
* @ingroup msg_VMs
*/
{
return __MSG_vm_is_state(vm, SURF_VM_STATE_MIGRATING);
}
-#endif
/** @brief Returns whether the given VM is currently suspended, not running.
* @ingroup msg_VMs
*
*/
msg_vm_t MSG_vm_create(msg_host_t ind_pm, const char *name,
- int ncpus, int ramsize, int net_cap, char *disk_path, int disksize)
+ int ncpus, long ramsize, long net_cap, char *disk_path, long disksize)
{
msg_vm_t vm = MSG_vm_create_core(ind_pm, name);
s_ws_params_t params;
memset(¶ms, 0, sizeof(params));
params.ramsize = ramsize;
- params.overcommit = 0;
+ //params.overcommit = 0;
simcall_host_set_params(vm, ¶ms);
}
- /* TODO: We will revisit the disk support later. */
+ /* TODO: Limit net capability, take into account disk considerations. */
return vm;
}
return ind_vm;
}
+/** @brief Destroy a VM. Destroy the VM object from the simulation.
+ * @ingroup msg_VMs
+ */
+void MSG_vm_destroy(msg_vm_t vm)
+{
+ /* First, terminate all processes on the VM if necessary */
+ if (MSG_vm_is_running(vm))
+ simcall_vm_shutdown(vm);
+
+ if (!MSG_vm_is_created(vm)) {
+ XBT_CRITICAL("shutdown the given VM before destroying it");
+ DIE_IMPOSSIBLE;
+ }
+
+ /* Then, destroy the VM object */
+ simcall_vm_destroy(vm);
+
+ __MSG_host_destroy(vm);
+
+ #ifdef HAVE_TRACING
+ TRACE_msg_vm_end(vm);
+ #endif
+}
+
/** @brief Start a vm (i.e., boot the guest operating system)
* @ingroup msg_VMs
return bprintf("__task_mig_stage%d:%s(%s-%s)", stage, vm_name, src_pm_name, dst_pm_name);
}
+static void launch_deferred_exec_process(msg_host_t host, double computation);
+
static int migration_rx_fun(int argc, char *argv[])
{
const char *pr_name = MSG_process_get_name(MSG_process_self());
const char *host_name = MSG_host_get_name(MSG_host_self());
- XBT_INFO("%s@%s start", pr_name, host_name);
+ XBT_DEBUG("mig: rx_start");
xbt_assert(argc == 4);
const char *vm_name = argv[1];
for (;;) {
msg_task_t task = NULL;
MSG_task_recv(&task, mbox);
+ {
+ double received = MSG_task_get_data_size(task);
+ /* TODO */
+ const double alpha = 0.22L * 1.0E8 / (80L * 1024 * 1024);
+ launch_deferred_exec_process(vm, received * alpha);
+ }
if (strcmp(task->name, finalize_task_name) == 0)
need_exit = 1;
xbt_free(mbox_ctl);
xbt_free(finalize_task_name);
- XBT_INFO("%s@%s done", pr_name, host_name);
+ XBT_DEBUG("mig: rx_done");
return 0;
}
dp->prev_clock = MSG_get_clock();
dp->prev_remaining = remaining;
- XBT_INFO("%s@%s remaining %f", key, sg_host_name(vm), remaining);
+ // XBT_INFO("%s@%s remaining %f", key, sg_host_name(vm), remaining);
}
}
XBT_INFO("%s@%s: computated %f ops (remaining %f -> %f) in %f secs (%f -> %f)",
key, sg_host_name(vm), computed, dp->prev_remaining, remaining, duration, dp->prev_clock, clock);
- XBT_INFO("%s@%s: updated %f bytes, %f Mbytes/s",
+ XBT_INFO("%s@%s: updated %f bytes, %f Mbytes/s",
key, sg_host_name(vm), updated, updated / duration / 1000 / 1000);
return updated;
double get_computed(char *key, msg_vm_t vm, dirty_page_t dp, double remaining, double clock)
{
- double computed = dp->prev_remaining - remaining;
- double duration = clock - dp->prev_clock;
+ double computed = dp->prev_remaining - remaining;
+ double duration = clock - dp->prev_clock;
- XBT_INFO("%s@%s: computated %f ops (remaining %f -> %f) in %f secs (%f -> %f)",
- key, sg_host_name(vm), computed, dp->prev_remaining, remaining, duration, dp->prev_clock, clock);
+ XBT_DEBUG("%s@%s: computated %f ops (remaining %f -> %f) in %f secs (%f -> %f)",
+ key, sg_host_name(vm), computed, dp->prev_remaining, remaining, duration, dp->prev_clock, clock);
- return computed;
+ return computed;
}
-static double lookup_dirty_pages(msg_vm_t vm)
+static double lookup_computed_flop_counts(msg_vm_t vm, int stage2_round_for_fancy_debug)
{
msg_host_priv_t priv = msg_host_resource_priv(vm);
double total = 0;
}
total += priv->dp_updated_by_deleted_tasks;
- XBT_INFO("total %f (including %f by deleted tasks)", total, priv->dp_updated_by_deleted_tasks);
+
+ XBT_INFO("mig-stage2.%d: computed %f flop_counts (including %f by deleted tasks)",
+ stage2_round_for_fancy_debug,
+ total, priv->dp_updated_by_deleted_tasks);
+
+
+
priv->dp_updated_by_deleted_tasks = 0;
return total;
}
+// TODO Is this code redundant with the information provided by
+// msg_process_t MSG_process_create(const char *name, xbt_main_func_t code, void *data, msg_host_t host)
void MSG_host_add_task(msg_host_t host, msg_task_t task)
{
msg_host_priv_t priv = msg_host_resource_priv(host);
/* It should be okay that we add a task onto a migrating VM. */
if (priv->dp_enabled) {
- XBT_INFO("add (dp_enabled) %s on %s (remaining %f)", key, sg_host_name(host), remaining);
dp->prev_clock = MSG_get_clock();
dp->prev_remaining = remaining;
}
xbt_assert(xbt_dict_get_or_null(priv->dp_objs, key) == NULL);
xbt_dict_set(priv->dp_objs, key, dp, NULL);
- XBT_INFO("add %s on %s (remaining %f)", key, sg_host_name(host), remaining);
+ XBT_DEBUG("add %s on %s (remaining %f, dp_enabled %d)", key, sg_host_name(host), remaining, priv->dp_enabled);
xbt_free(key);
}
xbt_dict_remove(priv->dp_objs, key);
xbt_free(dp);
- XBT_INFO("del %s on %s", key, sg_host_name(host));
+ XBT_DEBUG("del %s on %s", key, sg_host_name(host));
xbt_free(key);
}
+static int deferred_exec_fun(int argc, char *argv[])
+{
+ xbt_assert(argc == 2);
+ const char *comp_str = argv[1];
+ double computaion = atof(comp_str);
+
+ msg_task_t task = MSG_task_create("__task_deferred", computaion, 0, NULL);
+ // XBT_INFO("exec deferred %f", computaion);
+
+ /* dpt is the results of the VM activity */
+ MSG_task_set_priority(task, 1000000);
+ MSG_task_execute(task);
+
+
+
+ MSG_task_destroy(task);
+
+ return 0;
+}
+
+static void launch_deferred_exec_process(msg_host_t host, double computation)
+{
+ char *pr_name = bprintf("__pr_deferred_exec_%s", MSG_host_get_name(host));
+
+ int nargvs = 3;
+ char **argv = xbt_new(char *, nargvs);
+ argv[0] = xbt_strdup(pr_name);
+ argv[1] = bprintf("%lf", computation);
+ argv[2] = NULL;
+
+ msg_process_t pr = MSG_process_create_with_arguments(pr_name, deferred_exec_fun, NULL, host, nargvs - 1, argv);
+
+ xbt_free(pr_name);
+}
+
+
+static int task_tx_overhead_fun(int argc, char *argv[])
+{
+ xbt_assert(argc == 2);
+ const char *mbox = argv[1];
+
+ int need_exit = 0;
+
+ XBT_INFO("start %s", mbox);
+
+ for (;;) {
+ msg_task_t task = NULL;
+ MSG_task_recv(&task, mbox);
+
+ // XBT_INFO("task->name %s", task->name);
+
+ if (strcmp(task->name, "finalize_making_overhead") == 0)
+ need_exit = 1;
+
+ // XBT_INFO("exec");
+ MSG_task_execute(task);
+ MSG_task_destroy(task);
+
+ if (need_exit)
+ break;
+ }
+
+ XBT_INFO("bye");
+
+ return 0;
+}
+
+static void start_overhead_process(msg_task_t comm_task)
+{
+ char *pr_name = bprintf("__pr_task_tx_overhead_%s", MSG_task_get_name(comm_task));
+ char *mbox = bprintf("__mb_task_tx_overhead_%s", MSG_task_get_name(comm_task));
+
+ int nargvs = 3;
+ char **argv = xbt_new(char *, nargvs);
+ argv[0] = xbt_strdup(pr_name);
+ argv[1] = xbt_strdup(mbox);
+ argv[2] = NULL;
+
+ XBT_INFO("micro start: mbox %s", mbox);
+ msg_process_t pr = MSG_process_create_with_arguments(pr_name, task_tx_overhead_fun, NULL, MSG_host_self(), nargvs - 1, argv);
+
+ xbt_free(pr_name);
+ xbt_free(mbox);
+}
+
+static void shutdown_overhead_process(msg_task_t comm_task)
+{
+ char *mbox = bprintf("__mb_task_tx_overhead_%s", MSG_task_get_name(comm_task));
+
+ msg_task_t task = MSG_task_create("finalize_making_overhead", 0, 0, NULL);
+
+ XBT_INFO("micro shutdown: mbox %s", mbox);
+ msg_error_t ret = MSG_task_send(task, mbox);
+ xbt_assert(ret == MSG_OK);
+
+ xbt_free(mbox);
+ XBT_INFO("shutdown done");
+}
+
+static void request_overhead(msg_task_t comm_task, double computation)
+{
+ char *mbox = bprintf("__mb_task_tx_overhead_%s", MSG_task_get_name(comm_task));
+
+ msg_task_t task = MSG_task_create("micro", computation, 0, NULL);
+
+ XBT_INFO("req overhead");
+ msg_error_t ret = MSG_task_send(task, mbox);
+ xbt_assert(ret == MSG_OK);
+
+ xbt_free(mbox);
+}
+
+/* alpha is (floating_operations / bytes).
+ *
+ * When actual migration traffic was 32 mbytes/s, we observed the CPU
+ * utilization of the main thread of the Qemu process was 10 %.
+ * alpha = 0.1 * C / (32 * 1024 * 1024)
+ * where the CPU capacity of the PM is C flops/s.
+ *
+ * */
+static void task_send_bounded_with_cpu_overhead(msg_task_t comm_task, char *mbox, double mig_speed, double alpha)
+{
+ const double chunk_size = 1024 * 1024;
+ double remaining = MSG_task_get_data_size(comm_task);
+
+ start_overhead_process(comm_task);
+
+
+ while (remaining > 0) {
+ double data_size = chunk_size;
+ if (remaining < chunk_size)
+ data_size = remaining;
+
+ remaining -= data_size;
+
+ XBT_INFO("remaining %f bytes", remaining);
+
+
+ double clock_sta = MSG_get_clock();
+
+ /* create a micro task */
+ {
+ char *mtask_name = bprintf("__micro_%s", MSG_task_get_name(comm_task));
+ msg_task_t mtask = MSG_task_create(mtask_name, 0, data_size, NULL);
+
+ request_overhead(comm_task, data_size * alpha);
+
+ msg_error_t ret = MSG_task_send(mtask, mbox);
+ xbt_assert(ret == MSG_OK);
+
+ xbt_free(mtask_name);
+ }
+
+#if 0
+ {
+ /* In the real world, sending data involves small CPU computation. */
+ char *mtask_name = bprintf("__micro_%s", MSG_task_get_name(comm_task));
+ msg_task_t mtask = MSG_task_create(mtask_name, data_size * alpha, data_size, NULL);
+ MSG_task_execute(mtask);
+ MSG_task_destroy(mtask);
+ xbt_free(mtask_name);
+ }
+#endif
+
+ /* TODO */
+
+ double clock_end = MSG_get_clock();
+
+
+ if (mig_speed > 0) {
+ /*
+ * (max bandwidth) > data_size / ((elapsed time) + time_to_sleep)
+ *
+ * Thus, we get
+ * time_to_sleep > data_size / (max bandwidth) - (elapsed time)
+ *
+ * If time_to_sleep is smaller than zero, the elapsed time was too big. We
+ * do not need a micro sleep.
+ **/
+ double time_to_sleep = data_size / mig_speed - (clock_end - clock_sta);
+ if (time_to_sleep > 0)
+ MSG_process_sleep(time_to_sleep);
+
+
+ XBT_INFO("duration %f", clock_end - clock_sta);
+ XBT_INFO("time_to_sleep %f", time_to_sleep);
+ }
+ }
+
+ XBT_INFO("%s", MSG_task_get_name(comm_task));
+ shutdown_overhead_process(comm_task);
+
+}
+
+
+#if 0
+static void make_cpu_overhead_of_data_transfer(msg_task_t comm_task, double init_comm_size)
+{
+ double prev_remaining = init_comm_size;
+
+ for (;;) {
+ double remaining = MSG_task_get_remaining_communication(comm_task);
+ if (remaining == 0)
+ need_exit = 1;
+
+ double sent = prev_remaining - remaining;
+ double comp_size = sent * overhead;
+
+
+ char *comp_task_name = bprintf("__sender_overhead%s", MSG_task_get_name(comm_task));
+ msg_task_t comp_task = MSG_task_create(comp_task_name, comp_size, 0, NULL);
+ MSG_task_execute(comp_task);
+ MSG_task_destroy(comp_task);
+
+ if (need_exit)
+ break;
+
+ prev_remaining = remaining;
+
+ }
+
+ xbt_free(comp_task_name);
+}
+#endif
+
+#define USE_MICRO_TASK 1
+
+static void send_migration_data(const char *vm_name, const char *src_pm_name, const char *dst_pm_name,
+ double size, char *mbox, int stage, int stage2_round, double mig_speed)
+{
+ char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, stage);
+ msg_task_t task = MSG_task_create(task_name, 0, size, NULL);
+
+
+ double clock_sta = MSG_get_clock();
+
+#ifdef USE_MICRO_TASK
+ // const double alpha = 0.1L * 1.0E8 / (32L * 1024 * 1024);
+ // const double alpha = 0.25L * 1.0E8 / (85L * 1024 * 1024);
+ // const double alpha = 0.20L * 1.0E8 / (85L * 1024 * 1024);
+ // const double alpha = 0.25L * 1.0E8 / (85L * 1024 * 1024);
+ // const double alpha = 0.32L * 1.0E8 / (24L * 1024 * 1024); // makes super good values for 32 mbytes/s
+ //const double alpha = 0.32L * 1.0E8 / (32L * 1024 * 1024);
+ // const double alpha = 0.56L * 1.0E8 / (80L * 1024 * 1024);
+ ////const double alpha = 0.20L * 1.0E8 / (80L * 1024 * 1024);
+ // const double alpha = 0.56L * 1.0E8 / (90L * 1024 * 1024);
+ // const double alpha = 0.66L * 1.0E8 / (90L * 1024 * 1024);
+
+ // const double alpha = 0.20L * 1.0E8 / (80L * 1024 * 1024);
+ const double alpha = 0.22L * 1.0E8 / (80L * 1024 * 1024);
+
+ task_send_bounded_with_cpu_overhead(task, mbox, mig_speed, alpha);
+
+#else
+ msg_error_t ret;
+ if (mig_speed > 0)
+ ret = MSG_task_send_bounded(task, mbox, mig_speed);
+ else
+ ret = MSG_task_send(task, mbox);
+ xbt_assert(ret == MSG_OK);
+#endif
+
+ double clock_end = MSG_get_clock();
+ double duration = clock_end - clock_sta;
+ double actual_speed = size / duration;
+#ifdef USE_MICRO_TASK
+ double cpu_utilization = size * alpha / duration / 1.0E8;
+#else
+ double cpu_utilization = 0;
+#endif
+
+
+
+
+ if (stage == 2)
+ XBT_INFO("mig-stage%d.%d: sent %f duration %f actual_speed %f (target %f) cpu %f", stage, stage2_round, size, duration, actual_speed, mig_speed, cpu_utilization);
+ else
+ XBT_INFO("mig-stage%d: sent %f duration %f actual_speed %f (target %f) cpu %f", stage, size, duration, actual_speed, mig_speed, cpu_utilization);
+
+ xbt_free(task_name);
+
+
+
+#ifdef USE_MICRO_TASK
+ /* The name of a micro task starts with __micro, which does not match the
+ * special name that finalizes the receiver loop. Thus, we send the special task.
+ **/
+ {
+ if (stage == 3) {
+ char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, stage);
+ msg_task_t task = MSG_task_create(task_name, 0, 0, NULL);
+ msg_error_t ret = MSG_task_send(task, mbox);
+ xbt_assert(ret == MSG_OK);
+ xbt_free(task_name);
+ }
+ }
+#endif
+}
+
+
static int migration_tx_fun(int argc, char *argv[])
{
const char *pr_name = MSG_process_get_name(MSG_process_self());
const char *host_name = MSG_host_get_name(MSG_host_self());
- XBT_INFO("%s@%s start", pr_name, host_name);
+ XBT_DEBUG("mig: tx_start");
xbt_assert(argc == 4);
const char *vm_name = argv[1];
simcall_host_get_params(vm, ¶ms);
const long ramsize = params.ramsize;
const long devsize = params.devsize;
+ const int skip_stage1 = params.skip_stage1;
const int skip_stage2 = params.skip_stage2;
- const double max_downtime = params.max_downtime;
const double dp_rate = params.dp_rate;
const double dp_cap = params.dp_cap;
+ const double mig_speed = params.mig_speed;
double remaining_size = ramsize + devsize;
- double threshold = max_downtime * 125 * 1000 * 1000;
+
+ double max_downtime = params.max_downtime;
+ if (max_downtime == 0) {
+ XBT_WARN("use the default max_downtime value 30ms");
+ max_downtime = 0.03;
+ }
+
+ /* This value assumes the network link is 1Gbps. */
+ double threshold = max_downtime * 125 * 1024 * 1024;
+
+ /* setting up parameters has done */
if (ramsize == 0)
char *mbox = get_mig_mbox_src_dst(vm_name, src_pm_name, dst_pm_name);
- XBT_INFO("%s@%s stage1:", pr_name, host_name);
+ XBT_INFO("mig-stage1: remaining_size %f", remaining_size);
/* Stage1: send all memory pages to the destination. */
start_dirty_page_tracking(vm);
- {
- char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, 1);
-
- msg_task_t task = MSG_task_create(task_name, 0, ramsize, NULL);
- msg_error_t ret = MSG_task_send(task, mbox);
- xbt_assert(ret == MSG_OK);
-
- xbt_free(task_name);
+ if (!skip_stage1) {
+ send_migration_data(vm_name, src_pm_name, dst_pm_name, ramsize, mbox, 1, 0, mig_speed);
+ remaining_size -= ramsize;
}
- remaining_size -= ramsize;
-
/* Stage2: send update pages iteratively until the size of remaining states
goto stage3;
}
- XBT_INFO("%s@%s stage2: remaining_size %f", pr_name, host_name, remaining_size);
+ int stage2_round = 0;
for (;;) {
// long updated_size = lookup_dirty_pages(vm);
- double updated_size = lookup_dirty_pages(vm) * dp_rate;
+ double updated_size = lookup_computed_flop_counts(vm, stage2_round) * dp_rate;
if (updated_size > dp_cap) {
- XBT_INFO("%f bytes updated, but cap it with the working set size %f", updated_size, dp_cap);
+ XBT_INFO("mig-stage2.%d: %f bytes updated, but cap it with the working set size %f",
+ stage2_round, updated_size, dp_cap);
updated_size = dp_cap;
}
+
+ // double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000 * 1000; // super cool, but 520 for 0 32 8g 75%
+ // double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000;
+ // double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000;
+ double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000;
+
+ double overhead = dpt_overhead_parameter * updated_size;
+ XBT_INFO("updated %f overhead %f", updated_size, overhead);
+ launch_deferred_exec_process(vm, overhead);
+
remaining_size += updated_size;
- XBT_INFO("%s@%s stage2: remaining_size %f %s threshold %f", pr_name, host_name,
+ XBT_INFO("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,
remaining_size, (remaining_size < threshold) ? "<" : ">", threshold);
if (remaining_size < threshold)
break;
-
- char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, 2);
- {
- msg_task_t task = MSG_task_create(task_name, 0, updated_size, NULL);
- msg_error_t ret = MSG_task_send(task, mbox);
- xbt_assert(ret == MSG_OK);
- XBT_INFO("%s@%s stage2: %f sent", pr_name, host_name, updated_size);
- }
- xbt_free(task_name);
+ send_migration_data(vm_name, src_pm_name, dst_pm_name, updated_size, mbox, 2, stage2_round, mig_speed);
remaining_size -= updated_size;
+ stage2_round += 1;
}
stage3:
/* Stage3: stop the VM and copy the rest of states. */
- XBT_INFO("%s@%s stage3: remaining_size %ld", pr_name, host_name, remaining_size);
+ XBT_INFO("mig-stage3: remaining_size %f", remaining_size);
simcall_vm_suspend(vm);
stop_dirty_page_tracking(vm);
- {
- char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, 3);
-
- msg_task_t task = MSG_task_create(task_name, 0, remaining_size, NULL);
- msg_error_t ret = MSG_task_send(task, mbox);
- xbt_assert(ret == MSG_OK);
-
- xbt_free(task_name);
- }
+ send_migration_data(vm_name, src_pm_name, dst_pm_name, remaining_size, mbox, 3, 0, mig_speed);
xbt_free(mbox);
- XBT_INFO("%s@%s done", pr_name, host_name);
+ XBT_DEBUG("mig: tx_done");
return 0;
}
argv[2] = xbt_strdup(sg_host_name(src_pm));
argv[3] = xbt_strdup(sg_host_name(dst_pm));
argv[4] = NULL;
-
msg_process_t pr = MSG_process_create_with_arguments(pr_name, migration_tx_fun, NULL, src_pm, nargvs - 1, argv);
xbt_free(pr_name);
}
-/** @brief Destroy a VM. Destroy the VM object from the simulation.
- * @ingroup msg_VMs
- */
-void MSG_vm_destroy(msg_vm_t vm)
-{
- /* First, terminate all processes on the VM if necessary */
- if (MSG_vm_is_running(vm))
- simcall_vm_shutdown(vm);
-
- if (!MSG_vm_is_created(vm)) {
- XBT_CRITICAL("shutdown the given VM before destroying it");
- DIE_IMPOSSIBLE;
- }
-
- /* Then, destroy the VM object */
- simcall_vm_destroy(vm);
-
- __MSG_host_destroy(vm);
-
- #ifdef HAVE_TRACING
- TRACE_msg_vm_end(vm);
- #endif
-}
-/** @brief Get the physical host of a givne VM.
+/** @brief Get the physical host of a given VM.
* @ingroup msg_VMs
*/
msg_host_t MSG_vm_get_pm(msg_vm_t vm)