- container_t vm_container = PJ_container_get(vm->getCname());
- simgrid::instr::Type* type = vm_container->type_->getChild("MSG_VM_STATE");
- simgrid::instr::Value* val = simgrid::instr::Value::get_or_new("start", "0 0 1", type); // start is blue
- new simgrid::instr::PushStateEvent(MSG_get_clock(), vm_container, type, val);
- }
-}
-
-/** @brief Immediately kills all processes within the given VM.
- * @ingroup msg_VMs
- *
- * Any memory that they allocated will be leaked, unless you used #MSG_process_on_exit().
- *
- * No extra delay occurs. If you want to simulate this too, you want to use a #MSG_process_sleep().
- */
-void MSG_vm_shutdown(msg_vm_t vm)
-{
- smx_actor_t issuer = SIMIX_process_self();
- simgrid::simix::kernelImmediate([vm, issuer]() { vm->pimpl_vm_->shutdown(issuer); });
-
- // Make sure that processes in the VM are killed in this scheduling round before processing (eg with the VM destroy)
- MSG_process_sleep(0.);
-}
-
-static std::string get_mig_process_tx_name(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
-{
- return std::string("__pr_mig_tx:") + vm->getCname() + "(" + src_pm->getCname() + "-" + dst_pm->getCname() + ")";
-}
-
-static std::string get_mig_process_rx_name(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm)
-{
- return std::string("__pr_mig_rx:") + vm->getCname() + "(" + src_pm->getCname() + "-" + dst_pm->getCname() + ")";
-}
-
-static std::string get_mig_task_name(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm, int stage)
-{
- return std::string("__task_mig_stage") + std::to_string(stage) + ":" + vm->getCname() + "(" + src_pm->getCname() +
- "-" + dst_pm->getCname() + ")";
-}
-
-struct migration_session {
- msg_vm_t vm;
- msg_host_t src_pm;
- msg_host_t dst_pm;
-
- /* The miration_rx process uses mbox_ctl to let the caller of do_migration()
- * know the completion of the migration. */
- char *mbox_ctl;
- /* The migration_rx and migration_tx processes use mbox to transfer migration data. */
- char *mbox;
-};
-
-static int migration_rx_fun(int argc, char *argv[])
-{
- XBT_DEBUG("mig: rx_start");
-
- // The structure has been created in the do_migration function and should only be freed in the same place ;)
- struct migration_session* ms = static_cast<migration_session*>(MSG_process_get_data(MSG_process_self()));
-
- bool received_finalize = false;
-
- std::string finalize_task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 3);
- while (not received_finalize) {
- msg_task_t task = nullptr;
- int ret = MSG_task_recv(&task, ms->mbox);
-
- if (ret != MSG_OK) {
- // An error occurred, clean the code and return
- // The owner did not change, hence the task should be only destroyed on the other side
- return 0;
- }
-
- if (finalize_task_name == task->name)
- received_finalize = 1;
-
- MSG_task_destroy(task);
- }
-
- // Here Stage 1, 2 and 3 have been performed.
- // Hence complete the migration
-
- // Copy the reference to the vm (if SRC crashes now, do_migration will free ms)
- // This is clearly ugly but I (Adrien) need more time to do something cleaner (actually we should copy the whole ms
- // structure at the beginning and free it at the end of each function)
- simgrid::s4u::VirtualMachine* vm = ms->vm;
- msg_host_t dst_pm = ms->dst_pm;
-
- // Make sure that we cannot get interrupted between the migrate and the resume to not end in an inconsistent state
- simgrid::simix::kernelImmediate([vm, dst_pm]() {
- /* Update the vm location */
- /* precopy migration makes the VM temporally paused */
- xbt_assert(vm->pimpl_vm_->getState() == SURF_VM_STATE_SUSPENDED);
-
- /* Update the vm location and resume it */
- vm->pimpl_vm_->setPm(dst_pm);
- vm->pimpl_vm_->resume();
- });
-
-
- // Now the VM is running on the new host (the migration is completed) (even if the SRC crash)
- vm->pimpl_vm_->isMigrating = false;
- XBT_DEBUG("VM(%s) moved from PM(%s) to PM(%s)", ms->vm->getCname(), ms->src_pm->getCname(), ms->dst_pm->getCname());
-
- if (TRACE_msg_vm_is_enabled()) {
- static long long int counter = 0;
- char key[INSTR_DEFAULT_STR_SIZE];
- snprintf(key, INSTR_DEFAULT_STR_SIZE, "%lld", counter);
- counter++;
-
- // start link
- container_t msg = PJ_container_get(vm->getCname());
- simgrid::instr::Type* type = PJ_type_get_root()->getChild("MSG_VM_LINK");
- new simgrid::instr::StartLinkEvent(MSG_get_clock(), PJ_container_get_root(), type, msg, "M", key);
-
- // destroy existing container of this vm
- container_t existing_container = PJ_container_get(vm->getCname());
- PJ_container_remove_from_parent(existing_container);
- delete existing_container;
-
- // create new container on the new_host location
- new simgrid::instr::Container(vm->getCname(), simgrid::instr::INSTR_MSG_VM,
- PJ_container_get(ms->dst_pm->getCname()));
-
- // end link
- msg = PJ_container_get(vm->getCname());
- type = PJ_type_get_root()->getChild("MSG_VM_LINK");
- new simgrid::instr::EndLinkEvent(MSG_get_clock(), PJ_container_get_root(), type, msg, "M", key);
- }
-
- // Inform the SRC that the migration has been correctly performed
- std::string task_name = get_mig_task_name(ms->vm, ms->src_pm, ms->dst_pm, 4);
- msg_task_t task = MSG_task_create(task_name.c_str(), 0, 0, nullptr);
- msg_error_t ret = MSG_task_send(task, ms->mbox_ctl);
- // xbt_assert(ret == MSG_OK);
- if(ret == MSG_HOST_FAILURE){
- // The DST has crashed, this is a problem has the VM since we are not sure whether SRC is considering that the VM
- // has been correctly migrated on the DST node
- // TODO What does it mean ? What should we do ?
- MSG_task_destroy(task);
- } else if(ret == MSG_TRANSFER_FAILURE){
- // The SRC has crashed, this is not a problem has the VM has been correctly migrated on the DST node
- MSG_task_destroy(task);
- }
-
- XBT_DEBUG("mig: rx_done");
- return 0;
-}
-
-static void start_dirty_page_tracking(msg_vm_t vm)
-{
- vm->pimpl_vm_->dp_enabled = 1;
- if (vm->pimpl_vm_->dp_objs.empty())
- return;
-
- for (auto const& elm : vm->pimpl_vm_->dp_objs) {
- dirty_page_t dp = elm.second;
- double remaining = MSG_task_get_flops_amount(dp->task);
- dp->prev_clock = MSG_get_clock();
- dp->prev_remaining = remaining;
- XBT_DEBUG("%s@%s remaining %f", elm.first.c_str(), vm->getCname(), remaining);
- }
-}
-
-static void stop_dirty_page_tracking(msg_vm_t vm)
-{
- vm->pimpl_vm_->dp_enabled = 0;
-}
-
-static double get_computed(const char* key, msg_vm_t vm, dirty_page_t dp, double remaining, double clock)
-{
- double computed = dp->prev_remaining - remaining;
- double duration = clock - dp->prev_clock;
-
- XBT_DEBUG("%s@%s: computed %f ops (remaining %f -> %f) in %f secs (%f -> %f)", key, vm->getCname(), computed,
- dp->prev_remaining, remaining, duration, dp->prev_clock, clock);
-
- return computed;
-}
-
-static double lookup_computed_flop_counts(msg_vm_t vm, int stage_for_fancy_debug, int stage2_round_for_fancy_debug)
-{
- double total = 0;
-
- for (auto const& elm : vm->pimpl_vm_->dp_objs) {
- const char* key = elm.first.c_str();
- dirty_page_t dp = elm.second;
- double remaining = MSG_task_get_flops_amount(dp->task);
-
- double clock = MSG_get_clock();
-
- // total += calc_updated_pages(key, vm, dp, remaining, clock);
- total += get_computed(key, vm, dp, remaining, clock);
-
- dp->prev_remaining = remaining;
- dp->prev_clock = clock;
- }
-
- total += vm->pimpl_vm_->dp_updated_by_deleted_tasks;
-
- XBT_DEBUG("mig-stage%d.%d: computed %f flop_counts (including %f by deleted tasks)", stage_for_fancy_debug,
- stage2_round_for_fancy_debug, total, vm->pimpl_vm_->dp_updated_by_deleted_tasks);
-
- vm->pimpl_vm_->dp_updated_by_deleted_tasks = 0;
-
- return total;
-}
-
-// TODO Is this code redundant with the information provided by
-// msg_process_t MSG_process_create(const char *name, xbt_main_func_t code, void *data, msg_host_t host)
-/** @brief take care of the dirty page tracking, in case we're adding a task to a migrating VM */
-void MSG_host_add_task(msg_host_t host, msg_task_t task)
-{
- simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
- if (vm == nullptr)
- return;
-
- double remaining = MSG_task_get_flops_amount(task);
- char *key = bprintf("%s-%p", task->name, task);
-
- dirty_page_t dp = xbt_new0(s_dirty_page, 1);
- dp->task = task;
- if (vm->pimpl_vm_->dp_enabled) {
- dp->prev_clock = MSG_get_clock();
- dp->prev_remaining = remaining;
- }
- vm->pimpl_vm_->dp_objs.insert({key, dp});
- XBT_DEBUG("add %s on %s (remaining %f, dp_enabled %d)", key, host->getCname(), remaining, vm->pimpl_vm_->dp_enabled);
-
- xbt_free(key);
-}
-
-void MSG_host_del_task(msg_host_t host, msg_task_t task)
-{
- simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
- if (vm == nullptr)
- return;
-
- char *key = bprintf("%s-%p", task->name, task);
- dirty_page_t dp = nullptr;
- if (vm->pimpl_vm_->dp_objs.find(key) != vm->pimpl_vm_->dp_objs.end())
- dp = vm->pimpl_vm_->dp_objs.at(key);
- xbt_assert(dp && dp->task == task);
-
- /* If we are in the middle of dirty page tracking, we record how much computation has been done until now, and keep
- * the information for the lookup_() function that will called soon. */
- if (vm->pimpl_vm_->dp_enabled) {
- double remaining = MSG_task_get_flops_amount(task);
- double clock = MSG_get_clock();
- // double updated = calc_updated_pages(key, host, dp, remaining, clock);
- double updated = get_computed(key, vm, dp, remaining, clock); // was host instead of vm
-
- vm->pimpl_vm_->dp_updated_by_deleted_tasks += updated;
- }
-
- vm->pimpl_vm_->dp_objs.erase(key);
- xbt_free(dp);
-
- XBT_DEBUG("del %s on %s", key, host->getCname());
- xbt_free(key);
-}
-
-static sg_size_t send_migration_data(msg_vm_t vm, msg_host_t src_pm, msg_host_t dst_pm, sg_size_t size, char* mbox,
- int stage, int stage2_round, double mig_speed, double timeout)
-{
- sg_size_t sent = 0;
- std::string task_name = get_mig_task_name(vm, src_pm, dst_pm, stage);
- msg_task_t task = MSG_task_create(task_name.c_str(), 0, static_cast<double>(size), nullptr);
-
- double clock_sta = MSG_get_clock();
-
- msg_error_t ret;
- if (mig_speed > 0)
- ret = MSG_task_send_with_timeout_bounded(task, mbox, timeout, mig_speed);
- else
- ret = MSG_task_send(task, mbox);
-
- if (ret == MSG_OK) {
- sent = size;
- } else if (ret == MSG_TIMEOUT) {
- sg_size_t remaining = static_cast<sg_size_t>(MSG_task_get_remaining_communication(task));
- sent = size - remaining;
- XBT_VERB("timeout (%lf s) in sending_migration_data, remaining %llu bytes of %llu", timeout, remaining, size);
- }
-
- /* FIXME: why try-and-catch is used here? */
- if(ret == MSG_HOST_FAILURE){
- XBT_DEBUG("SRC host failed during migration of %s (stage %d)", vm->getCname(), stage);
- MSG_task_destroy(task);
- THROWF(host_error, 0, "SRC host failed during migration of %s (stage %d)", vm->getCname(), stage);
- }else if(ret == MSG_TRANSFER_FAILURE){
- XBT_DEBUG("DST host failed during migration of %s (stage %d)", vm->getCname(), stage);
- MSG_task_destroy(task);
- THROWF(host_error, 0, "DST host failed during migration of %s (stage %d)", vm->getCname(), stage);