From: Christian Heinrich Date: Wed, 16 May 2018 13:46:05 +0000 (+0200) Subject: [SAMPI] Update the sampi_loadbalancer code X-Git-Tag: v3_21~334 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/bdca77bf82da4a65ea082f8ec1c167d88f9fcb96 [SAMPI] Update the sampi_loadbalancer code This is a first version for our new load balancing module. It's not yet ready, but we're getting there. --- diff --git a/examples/smpi/load_balancer/load_balancer.cpp b/examples/smpi/load_balancer/load_balancer.cpp index 61e141311a..e9722af1d4 100644 --- a/examples/smpi/load_balancer/load_balancer.cpp +++ b/examples/smpi/load_balancer/load_balancer.cpp @@ -34,18 +34,7 @@ int main(int argc, char* argv[]) AMPI_Migrate(MPI_COMM_WORLD); if (rank != 0) free(pointer); - /* Connect your callback function to the "blah" event in the trace files */ - //xbt_replay_action_register("blah", action_blah); - /* The send action is an override, so we have to first save its previous value in a global */ - //int rank; - //MPI_Comm_rank(MPI_COMM_WORLD, &rank); - //if (rank == 0) { - //previous_send = xbt_replay_action_get("send"); - //xbt_replay_action_register("send", overriding_send); - //} - /* The regular run of the replayer */ - //smpi_replay_main(&argc, &argv); MPI_Finalize(); return 0; } diff --git a/src/smpi/plugins/sampi_loadbalancer.cpp b/src/smpi/plugins/sampi_loadbalancer.cpp index c9b9a457e0..f367edea5f 100644 --- a/src/smpi/plugins/sampi_loadbalancer.cpp +++ b/src/smpi/plugins/sampi_loadbalancer.cpp @@ -4,17 +4,120 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include -#include +#include #include +#include +#include +#include #include -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(plugin_load_balancer, surf, "Logging specific to the SMPI load balancing plugin"); +#include "src/kernel/activity/ExecImpl.hpp" +#include "src/simix/ActorImpl.hpp" +#include + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(plugin_load_balancer, smpi, "Logging specific to the SMPI load balancing plugin"); + +static simgrid::config::Flag cfg_migration_frequency("smpi/plugin/lb/migration-frequency", {"smpi/plugin/lb/migration_frequency"}, + "After how many calls to the migration function should the migration be actually executed?", 10, + [](double val){if (val != 10) sg_load_balancer_plugin_init();}); namespace simgrid { namespace smpi { namespace plugin { +static simgrid::plugin::loadbalancer::LoadBalancer lb; + +class MigrateParser : public simgrid::smpi::replay::ActionArgParser { +public: + double memory_consumption; + void parse(simgrid::xbt::ReplayAction& action, std::string name) + { + // The only parameter is the amount of memory used by the current process. + CHECK_ACTION_PARAMS(action, 1, 0); + memory_consumption = std::stod(action[2]); + } +}; + +/* This function simulates what happens when the original application calls + * (A)MPI_Migrate. It executes the load balancing heuristics, makes the necessary + * migrations and updates the task mapping in the load balancer. + */ +class MigrateAction : public simgrid::smpi::replay::ReplayAction { +public: + explicit MigrateAction() : ReplayAction("Migrate") {} + void kernel(simgrid::xbt::ReplayAction& action) + { + static std::map migration_counter; + static simgrid::s4u::Barrier smpilb_bar(smpi_process_count()); + simgrid::s4u::Host* cur_host = simgrid::s4u::this_actor::get_host(); + simgrid::s4u::Host* migrate_to_host; + + TRACE_migration_call(my_proc_id, NULL); + + // TODO cheinrich: Why do we need this barrier? + smpilb_bar.wait(); + + static bool was_executed = false; + if (not was_executed) { + was_executed = true; + smpi_bench_begin(); + XBT_INFO("RUNNING THE LB"); + lb.run(); + smpi_bench_end(); + } + + // This barrier is required to ensure that the mapping has been computed and is available + smpilb_bar.wait(); + was_executed = false; // Must stay behind this barrier so that all processes have passed the if clause + migration_counter[simgrid::s4u::Actor::self()]++; + if ((migration_counter[simgrid::s4u::Actor::self()] % simgrid::config::get_value(cfg_migration_frequency.get_name())) != 0) { + return; + } + + migrate_to_host = lb.get_mapping(); + + if (cur_host != migrate_to_host) { // Origin and dest are not the same -> migrate + sg_host_t migration_hosts[2] = {cur_host, migrate_to_host}; + // Changing this to double[2] ... will cause trouble with parallel_execute, because that fct is trying to call free(). + double* comp_amount = new double[2]{0, 0}; + double* comm_amount = new double[4]{0, std::max(args.memory_consumption, 1.0), 0, 0}; + xbt_os_timer_t timer = smpi_process()->timer(); + xbt_os_threadtimer_start(timer); + simgrid::s4u::this_actor::parallel_execute(2, migration_hosts, comp_amount, comm_amount, -1.0); + xbt_os_threadtimer_stop(timer); + smpi_execute(xbt_os_timer_elapsed(timer)); + + // Update the process and host mapping in SimGrid. + TRACE_smpi_process_change_host(my_proc_id, migrate_to_host); + simgrid::s4u::this_actor::migrate(migrate_to_host); + } + + smpilb_bar.wait(); + + smpi_bench_begin(); + } +}; + +/****************************************************************************** + * Code to include the duration of iterations in the trace. * + ******************************************************************************/ + +// FIXME Move declaration +XBT_PRIVATE void action_iteration_in(simgrid::xbt::ReplayAction& action); +void action_iteration_in(simgrid::xbt::ReplayAction& action) +{ + CHECK_ACTION_PARAMS(action, 0, 0) + TRACE_Iteration_in(simgrid::s4u::this_actor::get_pid(), nullptr); +} + +// FIXME Move declaration +XBT_PRIVATE void action_iteration_out(simgrid::xbt::ReplayAction& action); +void action_iteration_out(simgrid::xbt::ReplayAction& action) +{ + CHECK_ACTION_PARAMS(action, 0, 0) + TRACE_Iteration_out(simgrid::s4u::this_actor::get_pid(), nullptr); +} } } } @@ -26,4 +129,16 @@ namespace plugin { */ void sg_load_balancer_plugin_init() { + static bool done = false; + if (!done) { + done = true; + simgrid::kernel::activity::ExecImpl::on_completion.connect([](simgrid::kernel::activity::ExecImplPtr activity){ + simgrid::smpi::plugin::lb.record_actor_computation(activity->simcalls_.front()->issuer->iface(), activity->surf_action_->get_cost()); + }); + + xbt_replay_action_register( + "migrate", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::plugin::MigrateAction().execute(action); }); + xbt_replay_action_register("iteration_in", simgrid::smpi::plugin::action_iteration_in); + xbt_replay_action_register("iteration_out", simgrid::smpi::plugin::action_iteration_out); + } }