X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/c078a363724fe8ef749dfbda30f09b5f471643f7..7d6123fb0ebff1aec98f31fe1962dc22e49b8571:/src/msg/msg_vm.c diff --git a/src/msg/msg_vm.c b/src/msg/msg_vm.c index a5331a5e88..3b04f74402 100644 --- a/src/msg/msg_vm.c +++ b/src/msg/msg_vm.c @@ -308,6 +308,8 @@ static inline char *get_mig_task_name(const char *vm_name, const char *src_pm_na return bprintf("__task_mig_stage%d:%s(%s-%s)", stage, vm_name, src_pm_name, dst_pm_name); } +static void launch_deferred_exec_process(msg_host_t host, double computation); + static int migration_rx_fun(int argc, char *argv[]) { const char *pr_name = MSG_process_get_name(MSG_process_self()); @@ -331,6 +333,12 @@ static int migration_rx_fun(int argc, char *argv[]) for (;;) { msg_task_t task = NULL; MSG_task_recv(&task, mbox); + { + double received = MSG_task_get_data_size(task); + /* TODO */ + const double alpha = 0.22L * 1.0E8 / (80L * 1024 * 1024); + launch_deferred_exec_process(vm, received * alpha); + } if (strcmp(task->name, finalize_task_name) == 0) need_exit = 1; @@ -520,25 +528,303 @@ void MSG_host_del_task(msg_host_t host, msg_task_t task) } +static int deferred_exec_fun(int argc, char *argv[]) +{ + xbt_assert(argc == 2); + const char *comp_str = argv[1]; + double computaion = atof(comp_str); + + msg_task_t task = MSG_task_create("__task_deferred", computaion, 0, NULL); + // XBT_INFO("exec deferred %f", computaion); + + /* dpt is the results of the VM activity */ + MSG_task_set_priority(task, 1000000); + MSG_task_execute(task); + + + + MSG_task_destroy(task); + + return 0; +} + +static void launch_deferred_exec_process(msg_host_t host, double computation) +{ + char *pr_name = bprintf("__pr_deferred_exec_%s", MSG_host_get_name(host)); + + int nargvs = 3; + char **argv = xbt_new(char *, nargvs); + argv[0] = xbt_strdup(pr_name); + argv[1] = bprintf("%lf", computation); + argv[2] = NULL; + + msg_process_t pr = MSG_process_create_with_arguments(pr_name, deferred_exec_fun, NULL, host, nargvs - 1, argv); + + xbt_free(pr_name); +} + + +static int task_tx_overhead_fun(int argc, char *argv[]) +{ + xbt_assert(argc == 2); + const char *mbox = argv[1]; + + int need_exit = 0; + + XBT_INFO("start %s", mbox); + + for (;;) { + msg_task_t task = NULL; + MSG_task_recv(&task, mbox); + + // XBT_INFO("task->name %s", task->name); + + if (strcmp(task->name, "finalize_making_overhead") == 0) + need_exit = 1; + + // XBT_INFO("exec"); + MSG_task_execute(task); + MSG_task_destroy(task); + + if (need_exit) + break; + } + + XBT_INFO("bye"); + + return 0; +} + +static void start_overhead_process(msg_task_t comm_task) +{ + char *pr_name = bprintf("__pr_task_tx_overhead_%s", MSG_task_get_name(comm_task)); + char *mbox = bprintf("__mb_task_tx_overhead_%s", MSG_task_get_name(comm_task)); + + int nargvs = 3; + char **argv = xbt_new(char *, nargvs); + argv[0] = xbt_strdup(pr_name); + argv[1] = xbt_strdup(mbox); + argv[2] = NULL; + + XBT_INFO("micro start: mbox %s", mbox); + msg_process_t pr = MSG_process_create_with_arguments(pr_name, task_tx_overhead_fun, NULL, MSG_host_self(), nargvs - 1, argv); + + xbt_free(pr_name); + xbt_free(mbox); +} + +static void shutdown_overhead_process(msg_task_t comm_task) +{ + char *mbox = bprintf("__mb_task_tx_overhead_%s", MSG_task_get_name(comm_task)); + + msg_task_t task = MSG_task_create("finalize_making_overhead", 0, 0, NULL); + + XBT_INFO("micro shutdown: mbox %s", mbox); + msg_error_t ret = MSG_task_send(task, mbox); + xbt_assert(ret == MSG_OK); + + xbt_free(mbox); + XBT_INFO("shutdown done"); +} + +static void request_overhead(msg_task_t comm_task, double computation) +{ + char *mbox = bprintf("__mb_task_tx_overhead_%s", MSG_task_get_name(comm_task)); + + msg_task_t task = MSG_task_create("micro", computation, 0, NULL); + + XBT_INFO("req overhead"); + msg_error_t ret = MSG_task_send(task, mbox); + xbt_assert(ret == MSG_OK); + + xbt_free(mbox); +} + +/* alpha is (floating_operations / bytes). + * + * When actual migration traffic was 32 mbytes/s, we observed the CPU + * utilization of the main thread of the Qemu process was 10 %. + * alpha = 0.1 * C / (32 * 1024 * 1024) + * where the CPU capacity of the PM is C flops/s. + * + * */ +static void task_send_bounded_with_cpu_overhead(msg_task_t comm_task, char *mbox, double mig_speed, double alpha) +{ + const double chunk_size = 1024 * 1024; + double remaining = MSG_task_get_data_size(comm_task); + + start_overhead_process(comm_task); + + + while (remaining > 0) { + double data_size = chunk_size; + if (remaining < chunk_size) + data_size = remaining; + + remaining -= data_size; + + XBT_INFO("remaining %f bytes", remaining); + + + double clock_sta = MSG_get_clock(); + + /* create a micro task */ + { + char *mtask_name = bprintf("__micro_%s", MSG_task_get_name(comm_task)); + msg_task_t mtask = MSG_task_create(mtask_name, 0, data_size, NULL); + + request_overhead(comm_task, data_size * alpha); + + msg_error_t ret = MSG_task_send(mtask, mbox); + xbt_assert(ret == MSG_OK); + + xbt_free(mtask_name); + } + +#if 0 + { + /* In the real world, sending data involves small CPU computation. */ + char *mtask_name = bprintf("__micro_%s", MSG_task_get_name(comm_task)); + msg_task_t mtask = MSG_task_create(mtask_name, data_size * alpha, data_size, NULL); + MSG_task_execute(mtask); + MSG_task_destroy(mtask); + xbt_free(mtask_name); + } +#endif + + /* TODO */ + + double clock_end = MSG_get_clock(); + + + if (mig_speed > 0) { + /* + * (max bandwidth) > data_size / ((elapsed time) + time_to_sleep) + * + * Thus, we get + * time_to_sleep > data_size / (max bandwidth) - (elapsed time) + * + * If time_to_sleep is smaller than zero, the elapsed time was too big. We + * do not need a micro sleep. + **/ + double time_to_sleep = data_size / mig_speed - (clock_end - clock_sta); + if (time_to_sleep > 0) + MSG_process_sleep(time_to_sleep); + + + XBT_INFO("duration %f", clock_end - clock_sta); + XBT_INFO("time_to_sleep %f", time_to_sleep); + } + } + + XBT_INFO("%s", MSG_task_get_name(comm_task)); + shutdown_overhead_process(comm_task); + +} + + +#if 0 +static void make_cpu_overhead_of_data_transfer(msg_task_t comm_task, double init_comm_size) +{ + double prev_remaining = init_comm_size; + + for (;;) { + double remaining = MSG_task_get_remaining_communication(comm_task); + if (remaining == 0) + need_exit = 1; + + double sent = prev_remaining - remaining; + double comp_size = sent * overhead; + + + char *comp_task_name = bprintf("__sender_overhead%s", MSG_task_get_name(comm_task)); + msg_task_t comp_task = MSG_task_create(comp_task_name, comp_size, 0, NULL); + MSG_task_execute(comp_task); + MSG_task_destroy(comp_task); + + if (need_exit) + break; + + prev_remaining = remaining; + + } + + xbt_free(comp_task_name); +} +#endif + +#define USE_MICRO_TASK 1 + static void send_migration_data(const char *vm_name, const char *src_pm_name, const char *dst_pm_name, double size, char *mbox, int stage, int stage2_round, double mig_speed) { char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, stage); msg_task_t task = MSG_task_create(task_name, 0, size, NULL); + + double clock_sta = MSG_get_clock(); + +#ifdef USE_MICRO_TASK + // const double alpha = 0.1L * 1.0E8 / (32L * 1024 * 1024); + // const double alpha = 0.25L * 1.0E8 / (85L * 1024 * 1024); + // const double alpha = 0.20L * 1.0E8 / (85L * 1024 * 1024); + // const double alpha = 0.25L * 1.0E8 / (85L * 1024 * 1024); + // const double alpha = 0.32L * 1.0E8 / (24L * 1024 * 1024); // makes super good values for 32 mbytes/s + //const double alpha = 0.32L * 1.0E8 / (32L * 1024 * 1024); + // const double alpha = 0.56L * 1.0E8 / (80L * 1024 * 1024); + ////const double alpha = 0.20L * 1.0E8 / (80L * 1024 * 1024); + // const double alpha = 0.56L * 1.0E8 / (90L * 1024 * 1024); + // const double alpha = 0.66L * 1.0E8 / (90L * 1024 * 1024); + + // const double alpha = 0.20L * 1.0E8 / (80L * 1024 * 1024); + const double alpha = 0.22L * 1.0E8 / (80L * 1024 * 1024); + + task_send_bounded_with_cpu_overhead(task, mbox, mig_speed, alpha); + +#else msg_error_t ret; if (mig_speed > 0) ret = MSG_task_send_bounded(task, mbox, mig_speed); else ret = MSG_task_send(task, mbox); xbt_assert(ret == MSG_OK); +#endif + + double clock_end = MSG_get_clock(); + double duration = clock_end - clock_sta; + double actual_speed = size / duration; +#ifdef USE_MICRO_TASK + double cpu_utilization = size * alpha / duration / 1.0E8; +#else + double cpu_utilization = 0; +#endif + + + if (stage == 2) - XBT_INFO("mig-stage%d.%d: sent %f", stage, stage2_round, size); + XBT_INFO("mig-stage%d.%d: sent %f duration %f actual_speed %f (target %f) cpu %f", stage, stage2_round, size, duration, actual_speed, mig_speed, cpu_utilization); else - XBT_INFO("mig-stage%d: sent %f", stage, size); + XBT_INFO("mig-stage%d: sent %f duration %f actual_speed %f (target %f) cpu %f", stage, size, duration, actual_speed, mig_speed, cpu_utilization); xbt_free(task_name); + + + +#ifdef USE_MICRO_TASK + /* The name of a micro task starts with __micro, which does not match the + * special name that finalizes the receiver loop. Thus, we send the special task. + **/ + { + if (stage == 3) { + char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, stage); + msg_task_t task = MSG_task_create(task_name, 0, 0, NULL); + msg_error_t ret = MSG_task_send(task, mbox); + xbt_assert(ret == MSG_OK); + xbt_free(task_name); + } + } +#endif } @@ -560,13 +846,23 @@ static int migration_tx_fun(int argc, char *argv[]) simcall_host_get_params(vm, ¶ms); const long ramsize = params.ramsize; const long devsize = params.devsize; + const int skip_stage1 = params.skip_stage1; const int skip_stage2 = params.skip_stage2; - const double max_downtime = params.max_downtime; const double dp_rate = params.dp_rate; const double dp_cap = params.dp_cap; const double mig_speed = params.mig_speed; double remaining_size = ramsize + devsize; - double threshold = max_downtime * 125 * 1000 * 1000; + + double max_downtime = params.max_downtime; + if (max_downtime == 0) { + XBT_WARN("use the default max_downtime value 30ms"); + max_downtime = 0.03; + } + + /* This value assumes the network link is 1Gbps. */ + double threshold = max_downtime * 125 * 1024 * 1024; + + /* setting up parameters has done */ if (ramsize == 0) @@ -579,9 +875,10 @@ static int migration_tx_fun(int argc, char *argv[]) /* Stage1: send all memory pages to the destination. */ start_dirty_page_tracking(vm); - send_migration_data(vm_name, src_pm_name, dst_pm_name, ramsize, mbox, 1, 0, mig_speed); - - remaining_size -= ramsize; + if (!skip_stage1) { + send_migration_data(vm_name, src_pm_name, dst_pm_name, ramsize, mbox, 1, 0, mig_speed); + remaining_size -= ramsize; + } @@ -605,6 +902,16 @@ static int migration_tx_fun(int argc, char *argv[]) updated_size = dp_cap; } + + // double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000 * 1000; // super cool, but 520 for 0 32 8g 75% + // double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000; + // double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000; + double dpt_overhead_parameter = 1.0L * 1E8 / 0.5 / 40 / 1024 / 1024 * 1000 * 1000 * 1000 * 1000 * 1000 * 1000; + + double overhead = dpt_overhead_parameter * updated_size; + XBT_INFO("updated %f overhead %f", updated_size, overhead); + launch_deferred_exec_process(vm, overhead); + remaining_size += updated_size; XBT_INFO("mig-stage2.%d: remaining_size %f (%s threshold %f)", stage2_round,