X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a4cd8aafa1caf358f873f87b142f76185acf1901..b4b78f6684f68475ec9c7b63af21f733ff04a435:/src/msg/msg_vm.c diff --git a/src/msg/msg_vm.c b/src/msg/msg_vm.c index 6cbe343178..44bd5cad2e 100644 --- a/src/msg/msg_vm.c +++ b/src/msg/msg_vm.c @@ -1,4 +1,5 @@ -/* Copyright (c) 2012. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2012-2014. The SimGrid Team. + * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -20,6 +21,7 @@ #include "msg_private.h" #include "xbt/sysdep.h" #include "xbt/log.h" +#include "simgrid/platf.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_vm, msg, "Cloud-oriented parts of the MSG API"); @@ -56,7 +58,7 @@ xbt_dict_t MSG_vm_get_properties(msg_vm_t vm) /** \ingroup m_host_management * \brief Change the value of a given host property * - * \param host a host + * \param vm a vm * \param name a property name * \param value what to change the property to * \param free_ctn the freeing function to use to kill the value on need @@ -166,24 +168,33 @@ int MSG_vm_is_restoring(msg_vm_t vm) /** @brief Create a new VM with specified parameters. * @ingroup msg_VMs* + * All parameters are in MBytes * */ -msg_vm_t MSG_vm_create(msg_host_t ind_pm, const char *name, - int ncpus, long ramsize, long net_cap, char *disk_path, long disksize) +msg_vm_t MSG_vm_create(msg_host_t ind_pm, const char *name, int ncpus, int ramsize, + int net_cap, char *disk_path, int disksize, + int mig_netspeed, int dp_intensity) { - msg_vm_t vm = MSG_vm_create_core(ind_pm, name); - - { - s_ws_params_t params; - memset(¶ms, 0, sizeof(params)); - params.ramsize = ramsize; - //params.overcommit = 0; - simcall_host_set_params(vm, ¶ms); - } - - /* TODO: Limit net capability, take into account disk considerations. */ - - return vm; + /* For the moment, intensity_rate is the percentage against the migration bandwidth */ + double host_speed = MSG_get_host_speed(ind_pm); + double update_speed = ((double)dp_intensity/100) * mig_netspeed; + + msg_vm_t vm = MSG_vm_create_core(ind_pm, name); + s_ws_params_t params; + memset(¶ms, 0, sizeof(params)); + params.ramsize = 1L * 1024 * 1024 * ramsize; + //params.overcommit = 0; + params.devsize = 0; + params.skip_stage2 = 0; + params.max_downtime = 0.03; + params.dp_rate = (update_speed * 1L * 1024 * 1024 ) / host_speed; + params.dp_cap = params.ramsize / 0.9; // working set memory is 90% + params.mig_speed = 1L * 1024 * 1024 * mig_netspeed; // mig_speed + + //XBT_INFO("dp rate %f migspeed : %f intensity mem : %d, updatespeed %f, hostspeed %f",params.dp_rate, params.mig_speed, dp_intensity, update_speed, host_speed); + simcall_host_set_params(vm, ¶ms); + + return vm; } @@ -319,7 +330,8 @@ static int migration_rx_fun(int argc, char *argv[]) const char *src_pm_name = argv[2]; const char *dst_pm_name = argv[3]; msg_vm_t vm = MSG_get_host_by_name(vm_name); - msg_vm_t dst_pm = MSG_get_host_by_name(dst_pm_name); + msg_host_t src_pm = MSG_get_host_by_name(src_pm_name); + msg_host_t dst_pm = MSG_get_host_by_name(dst_pm_name); s_ws_params_t params; @@ -353,9 +365,21 @@ static int migration_rx_fun(int argc, char *argv[]) } + /* deinstall the current affinity setting */ + simcall_vm_set_affinity(vm, src_pm, 0); + simcall_vm_migrate(vm, dst_pm); simcall_vm_resume(vm); + /* install the affinity setting of the VM on the destination pm */ + { + msg_host_priv_t priv = msg_host_resource_priv(vm); + + unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(priv->affinity_mask_db, (char *) dst_pm, sizeof(msg_host_t)); + simcall_vm_set_affinity(vm, dst_pm, affinity_mask); + XBT_INFO("set affinity(0x%04lx@%s) for %s", affinity_mask, MSG_host_get_name(dst_pm), MSG_host_get_name(vm)); + } + { char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, 4); @@ -446,7 +470,8 @@ static double lookup_computed_flop_counts(msg_vm_t vm, int stage_for_fancy_debug dirty_page_t dp = NULL; xbt_dict_foreach(priv->dp_objs, cursor, key, dp) { double remaining = MSG_task_get_remaining_computation(dp->task); - double clock = MSG_get_clock(); + + double clock = MSG_get_clock(); // total += calc_updated_pages(key, vm, dp, remaining, clock); total += get_computed(key, vm, dp, remaining, clock); @@ -457,7 +482,7 @@ static double lookup_computed_flop_counts(msg_vm_t vm, int stage_for_fancy_debug total += priv->dp_updated_by_deleted_tasks; - XBT_INFO("mig-stage%d.%d: computed %f flop_counts (including %f by deleted tasks)", + XBT_DEBUG("mig-stage%d.%d: computed %f flop_counts (including %f by deleted tasks)", stage_for_fancy_debug, stage2_round_for_fancy_debug, total, priv->dp_updated_by_deleted_tasks); @@ -552,14 +577,12 @@ static void launch_deferred_exec_process(msg_host_t host, double computation, do int nargvs = 4; char **argv = xbt_new(char *, nargvs); - argv[0] = xbt_strdup(pr_name); - argv[1] = bprintf("%lf", computation); - argv[2] = bprintf("%lf", prio); + argv[0] = pr_name; + argv[1] = bprintf("%f", computation); + argv[2] = bprintf("%f", prio); argv[3] = NULL; MSG_process_create_with_arguments(pr_name, deferred_exec_fun, NULL, host, nargvs - 1, argv); - - xbt_free(pr_name); } @@ -602,15 +625,12 @@ static void start_overhead_process(msg_task_t comm_task) int nargvs = 3; char **argv = xbt_new(char *, nargvs); - argv[0] = xbt_strdup(pr_name); - argv[1] = xbt_strdup(mbox); + argv[0] = pr_name; + argv[1] = mbox; argv[2] = NULL; // XBT_INFO("micro start: mbox %s", mbox); MSG_process_create_with_arguments(pr_name, task_tx_overhead_fun, NULL, MSG_host_self(), nargvs - 1, argv); - - xbt_free(pr_name); - xbt_free(mbox); } static void shutdown_overhead_process(msg_task_t comm_task) @@ -753,7 +773,7 @@ static void make_cpu_overhead_of_data_transfer(msg_task_t comm_task, double init } #endif -#define USE_MICRO_TASK 1 +// #define USE_MICRO_TASK 1 #if 0 // const double alpha = 0.1L * 1.0E8 / (32L * 1024 * 1024); @@ -774,7 +794,7 @@ const double alpha = 0.22L * 1.0E8 / (80L * 1024 * 1024); static void send_migration_data(const char *vm_name, const char *src_pm_name, const char *dst_pm_name, - double size, char *mbox, int stage, int stage2_round, double mig_speed, double xfer_cpu_overhead) + sg_size_t size, char *mbox, int stage, int stage2_round, double mig_speed, double xfer_cpu_overhead) { char *task_name = get_mig_task_name(vm_name, src_pm_name, dst_pm_name, stage); msg_task_t task = MSG_task_create(task_name, 0, size, NULL); @@ -808,10 +828,11 @@ static void send_migration_data(const char *vm_name, const char *src_pm_name, co - if (stage == 2) - XBT_INFO("mig-stage%d.%d: sent %f duration %f actual_speed %f (target %f) cpu %f", stage, stage2_round, size, duration, actual_speed, mig_speed, cpu_utilization); - else - XBT_INFO("mig-stage%d: sent %f duration %f actual_speed %f (target %f) cpu %f", stage, size, duration, actual_speed, mig_speed, cpu_utilization); + if (stage == 2){ + XBT_DEBUG("mig-stage%d.%d: sent %llu duration %f actual_speed %f (target %f) cpu %f", stage, stage2_round, size, duration, actual_speed, mig_speed, cpu_utilization);} + else{ + XBT_DEBUG("mig-stage%d: sent %llu duration %f actual_speed %f (target %f) cpu %f", stage, size, duration, actual_speed, mig_speed, cpu_utilization); + } xbt_free(task_name); @@ -836,7 +857,7 @@ static void send_migration_data(const char *vm_name, const char *src_pm_name, co static double get_updated_size(double computed, double dp_rate, double dp_cap) { double updated_size = computed * dp_rate; - XBT_INFO("updated_size %f dp_rate %f", updated_size, dp_rate); + XBT_DEBUG("updated_size %f dp_rate %f", updated_size, dp_rate); if (updated_size > dp_cap) { // XBT_INFO("mig-stage2.%d: %f bytes updated, but cap it with the working set size %f", stage2_round, updated_size, dp_cap); updated_size = dp_cap; @@ -846,40 +867,48 @@ static double get_updated_size(double computed, double dp_rate, double dp_cap) } static double send_stage1(msg_host_t vm, const char *src_pm_name, const char *dst_pm_name, - long ramsize, double mig_speed, double xfer_cpu_overhead, double dp_rate, double dp_cap, double dpt_cpu_overhead) + sg_size_t ramsize, double mig_speed, double xfer_cpu_overhead, double dp_rate, double dp_cap, double dpt_cpu_overhead) { const char *vm_name = MSG_host_get_name(vm); char *mbox = get_mig_mbox_src_dst(vm_name, src_pm_name, dst_pm_name); - const long chunksize = 1024 * 1024 * 100; - long remaining = ramsize; + // const long chunksize = (sg_size_t)1024 * 1024 * 100; + const sg_size_t chunksize = (sg_size_t)1024 * 1024 * 100000; + sg_size_t remaining = ramsize; double computed_total = 0; while (remaining > 0) { - long datasize = chunksize; + sg_size_t datasize = chunksize; if (remaining < chunksize) datasize = remaining; remaining -= datasize; send_migration_data(vm_name, src_pm_name, dst_pm_name, datasize, mbox, 1, 0, mig_speed, xfer_cpu_overhead); - double computed = lookup_computed_flop_counts(vm, 1, 0); computed_total += computed; - { - double updated_size = get_updated_size(computed, dp_rate, dp_cap); + // { + // double updated_size = get_updated_size(computed, dp_rate, dp_cap); - double overhead = dpt_cpu_overhead * updated_size; - launch_deferred_exec_process(vm, overhead, 10000); - } + // double overhead = dpt_cpu_overhead * updated_size; + // launch_deferred_exec_process(vm, overhead, 10000); + // } } - + xbt_free(mbox); return computed_total; } +static double get_threshold_value(double bandwidth, double max_downtime) +{ + /* This value assumes the network link is 1Gbps. */ + // double threshold = max_downtime * 125 * 1024 * 1024; + double threshold = max_downtime * bandwidth; + + return threshold; +} static int migration_tx_fun(int argc, char *argv[]) { @@ -894,7 +923,7 @@ static int migration_tx_fun(int argc, char *argv[]) s_ws_params_t params; simcall_host_get_params(vm, ¶ms); - const long ramsize = params.ramsize; + const sg_size_t ramsize = params.ramsize; const long devsize = params.devsize; const int skip_stage1 = params.skip_stage1; const int skip_stage2 = params.skip_stage2; @@ -912,8 +941,7 @@ static int migration_tx_fun(int argc, char *argv[]) max_downtime = 0.03; } - /* This value assumes the network link is 1Gbps. */ - double threshold = max_downtime * 125 * 1024 * 1024; + double threshold = 0.00001; /* TODO: cleanup */ /* setting up parameters has done */ @@ -933,8 +961,15 @@ static int migration_tx_fun(int argc, char *argv[]) // send_migration_data(vm_name, src_pm_name, dst_pm_name, ramsize, mbox, 1, 0, mig_speed, xfer_cpu_overhead); /* send ramsize, but split it */ + double clock_prev_send = MSG_get_clock(); + computed_during_stage1 = send_stage1(vm, src_pm_name, dst_pm_name, ramsize, mig_speed, xfer_cpu_overhead, dp_rate, dp_cap, dpt_cpu_overhead); remaining_size -= ramsize; + + double clock_post_send = MSG_get_clock(); + double bandwidth = ramsize / (clock_post_send - clock_prev_send); + threshold = get_threshold_value(bandwidth, max_downtime); + XBT_INFO("actual banwdidth %f (MB/s), threshold %f", bandwidth / 1024 / 1024, threshold); } @@ -960,16 +995,16 @@ static int migration_tx_fun(int argc, char *argv[]) updated_size = get_updated_size(computed, dp_rate, dp_cap); } - XBT_INFO("%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f", + XBT_INFO("mig-stage 2:%d updated_size %f computed_during_stage1 %f dp_rate %f dp_cap %f", stage2_round, updated_size, computed_during_stage1, dp_rate, dp_cap); - if (stage2_round != 0) { - /* during stage1, we have already created overhead tasks */ - double overhead = dpt_cpu_overhead * updated_size; - XBT_INFO("updated %f overhead %f", updated_size, overhead); - launch_deferred_exec_process(vm, overhead, 10000); - } + // if (stage2_round != 0) { + // /* during stage1, we have already created overhead tasks */ + // double overhead = dpt_cpu_overhead * updated_size; + // XBT_DEBUG("updated %f overhead %f", updated_size, overhead); + // launch_deferred_exec_process(vm, overhead, 10000); + // } { @@ -982,9 +1017,22 @@ static int migration_tx_fun(int argc, char *argv[]) break; } + double clock_prev_send = MSG_get_clock(); send_migration_data(vm_name, src_pm_name, dst_pm_name, updated_size, mbox, 2, stage2_round, mig_speed, xfer_cpu_overhead); + double clock_post_send = MSG_get_clock(); + + double bandwidth = updated_size / (clock_post_send - clock_prev_send); + threshold = get_threshold_value(bandwidth, max_downtime); + XBT_INFO("actual banwdidth %f, threshold %f", bandwidth / 1024 / 1024, threshold); + + + + + + + remaining_size -= updated_size; stage2_round += 1; } @@ -1015,40 +1063,38 @@ static void do_migration(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm) char *pr_name = get_mig_process_rx_name(sg_host_name(vm), sg_host_name(src_pm), sg_host_name(dst_pm)); int nargvs = 5; char **argv = xbt_new(char *, nargvs); - argv[0] = xbt_strdup(pr_name); + argv[0] = pr_name; argv[1] = xbt_strdup(sg_host_name(vm)); argv[2] = xbt_strdup(sg_host_name(src_pm)); argv[3] = xbt_strdup(sg_host_name(dst_pm)); argv[4] = NULL; MSG_process_create_with_arguments(pr_name, migration_rx_fun, NULL, dst_pm, nargvs - 1, argv); - - xbt_free(pr_name); } { char *pr_name = get_mig_process_tx_name(sg_host_name(vm), sg_host_name(src_pm), sg_host_name(dst_pm)); int nargvs = 5; char **argv = xbt_new(char *, nargvs); - argv[0] = xbt_strdup(pr_name); + argv[0] = pr_name; argv[1] = xbt_strdup(sg_host_name(vm)); argv[2] = xbt_strdup(sg_host_name(src_pm)); argv[3] = xbt_strdup(sg_host_name(dst_pm)); argv[4] = NULL; MSG_process_create_with_arguments(pr_name, migration_tx_fun, NULL, src_pm, nargvs - 1, argv); - - xbt_free(pr_name); } /* wait until the migration have finished */ { msg_task_t task = NULL; msg_error_t ret = MSG_task_recv(&task, mbox_ctl); + xbt_assert(ret == MSG_OK); char *expected_task_name = get_mig_task_name(sg_host_name(vm), sg_host_name(src_pm), sg_host_name(dst_pm), 4); xbt_assert(strcmp(task->name, expected_task_name) == 0); xbt_free(expected_task_name); + MSG_task_destroy(task); } xbt_free(mbox_ctl); @@ -1184,16 +1230,17 @@ msg_host_t MSG_vm_get_pm(msg_vm_t vm) /** @brief Set a CPU bound for a given VM. * @ingroup msg_VMs * + * 1. * Note that in some cases MSG_task_set_bound() may not intuitively work for VMs. * * For example, * On PM0, there are Task1 and VM0. * On VM0, there is Task2. - * Now we bound 75% to Task1@PM0 and bound 25% to Task2@VM0. + * Now we bound 75% to Task1\@PM0 and bound 25% to Task2\@VM0. * Then, - * Task1@PM0 gets 50%. - * Task2@VM0 gets 25%. - * This is NOT 75% for Task1@PM0 and 25% for Task2@VM0, respectively. + * Task1\@PM0 gets 50%. + * Task2\@VM0 gets 25%. + * This is NOT 75% for Task1\@PM0 and 25% for Task2\@VM0, respectively. * * This is because a VM has the dummy CPU action in the PM layer. Putting a * task on the VM does not affect the bound of the dummy CPU action. The bound @@ -1207,8 +1254,38 @@ msg_host_t MSG_vm_get_pm(msg_vm_t vm) * * The current solution is to use MSG_vm_set_bound(), which allows us to * directly set the bound of the dummy CPU action. + * + * + * 2. + * Note that bound == 0 means no bound (i.e., unlimited). But, if a host has + * multiple CPU cores, the CPU share of a computation task (or a VM) never + * exceeds the capacity of a CPU core. */ void MSG_vm_set_bound(msg_vm_t vm, double bound) { - return simcall_vm_set_bound(vm, bound); + return simcall_vm_set_bound(vm, bound); +} + + +/** @brief Set the CPU affinity of a given VM. + * @ingroup msg_VMs + * + * This function changes the CPU affinity of a given VM. Usage is the same as + * MSG_task_set_affinity(). See the MSG_task_set_affinity() for details. + */ +void MSG_vm_set_affinity(msg_vm_t vm, msg_host_t pm, unsigned long mask) +{ + msg_host_priv_t priv = msg_host_resource_priv(vm); + + if (mask == 0) + xbt_dict_remove_ext(priv->affinity_mask_db, (char *) pm, sizeof(pm)); + else + xbt_dict_set_ext(priv->affinity_mask_db, (char *) pm, sizeof(pm), (void *) mask, NULL); + + msg_host_t pm_now = MSG_vm_get_pm(vm); + if (pm_now == pm) { + XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(pm), MSG_host_get_name(vm)); + simcall_vm_set_affinity(vm, pm, mask); + } else + XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(pm), MSG_host_get_name(vm)); }