Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f367edea5fd372d62a982784da5a084d535aff47
[simgrid.git] / src / smpi / plugins / sampi_loadbalancer.cpp
1 /* Copyright (c) 2018.      The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/plugins/load_balancer.h>
7 #include <simgrid/s4u.hpp>
8 #include <simgrid/smpi/replay.hpp>
9 #include <smpi/smpi.h>
10 #include <src/smpi/include/smpi_comm.hpp>
11 #include <src/smpi/include/smpi_actor.hpp>
12 #include <xbt/replay.hpp>
13
14 #include "src/kernel/activity/ExecImpl.hpp"
15 #include "src/simix/ActorImpl.hpp"
16 #include <simgrid/smpi/loadbalancer/load_balancer.hpp>
17
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(plugin_load_balancer, smpi, "Logging specific to the SMPI load balancing plugin");
19
20 static simgrid::config::Flag<int> cfg_migration_frequency("smpi/plugin/lb/migration-frequency", {"smpi/plugin/lb/migration_frequency"},
21     "After how many calls to the migration function should the migration be actually executed?", 10,
22     [](double val){if (val != 10) sg_load_balancer_plugin_init();});
23
24 namespace simgrid {
25 namespace smpi {
26 namespace plugin {
27
28 static simgrid::plugin::loadbalancer::LoadBalancer lb;
29
30 class MigrateParser : public simgrid::smpi::replay::ActionArgParser {
31 public:
32   double memory_consumption;
33   void parse(simgrid::xbt::ReplayAction& action, std::string name)
34   {
35     // The only parameter is the amount of memory used by the current process.
36     CHECK_ACTION_PARAMS(action, 1, 0);
37     memory_consumption = std::stod(action[2]);
38   }
39 };
40
41 /* This function simulates what happens when the original application calls
42  * (A)MPI_Migrate. It executes the load balancing heuristics, makes the necessary
43  * migrations and updates the task mapping in the load balancer. 
44  */
45 class MigrateAction : public simgrid::smpi::replay::ReplayAction<simgrid::smpi::plugin::MigrateParser> {
46 public:
47   explicit MigrateAction() : ReplayAction("Migrate") {}
48   void kernel(simgrid::xbt::ReplayAction& action)
49   {
50     static std::map<simgrid::s4u::ActorPtr, int> migration_counter;
51     static simgrid::s4u::Barrier smpilb_bar(smpi_process_count());
52     simgrid::s4u::Host* cur_host = simgrid::s4u::this_actor::get_host();
53     simgrid::s4u::Host* migrate_to_host;
54
55     TRACE_migration_call(my_proc_id, NULL);
56
57     // TODO cheinrich: Why do we need this barrier?
58     smpilb_bar.wait();
59
60     static bool was_executed = false;
61     if (not was_executed) {
62       was_executed = true;
63       smpi_bench_begin();
64       XBT_INFO("RUNNING THE LB");
65       lb.run();
66       smpi_bench_end();
67     }
68
69     // This barrier is required to ensure that the mapping has been computed and is available
70     smpilb_bar.wait();
71     was_executed = false; // Must stay behind this barrier so that all processes have passed the if clause
72     migration_counter[simgrid::s4u::Actor::self()]++;
73     if ((migration_counter[simgrid::s4u::Actor::self()] % simgrid::config::get_value<int>(cfg_migration_frequency.get_name())) != 0) {
74       return;
75     }
76
77     migrate_to_host = lb.get_mapping();
78
79     if (cur_host != migrate_to_host) { // Origin and dest are not the same -> migrate
80       sg_host_t migration_hosts[2] = {cur_host, migrate_to_host};
81       // Changing this to double[2] ... will cause trouble with parallel_execute, because that fct is trying to call free().
82       double* comp_amount  = new double[2]{0, 0};
83       double* comm_amount  = new double[4]{0, std::max(args.memory_consumption, 1.0), 0, 0};
84
85       xbt_os_timer_t timer = smpi_process()->timer();
86       xbt_os_threadtimer_start(timer);
87       simgrid::s4u::this_actor::parallel_execute(2, migration_hosts, comp_amount, comm_amount, -1.0);
88       xbt_os_threadtimer_stop(timer);
89       smpi_execute(xbt_os_timer_elapsed(timer));
90
91       // Update the process and host mapping in SimGrid.
92       TRACE_smpi_process_change_host(my_proc_id, migrate_to_host);
93       simgrid::s4u::this_actor::migrate(migrate_to_host);
94     }
95
96     smpilb_bar.wait();
97
98     smpi_bench_begin();
99   }
100 };
101
102 /******************************************************************************
103  *         Code to include the duration of iterations in the trace.           *
104  ******************************************************************************/
105
106 // FIXME Move declaration
107 XBT_PRIVATE void action_iteration_in(simgrid::xbt::ReplayAction& action);
108 void action_iteration_in(simgrid::xbt::ReplayAction& action)
109 {
110   CHECK_ACTION_PARAMS(action, 0, 0)
111   TRACE_Iteration_in(simgrid::s4u::this_actor::get_pid(), nullptr);
112 }
113
114 // FIXME Move declaration
115 XBT_PRIVATE void action_iteration_out(simgrid::xbt::ReplayAction& action);
116 void action_iteration_out(simgrid::xbt::ReplayAction& action)
117 {
118   CHECK_ACTION_PARAMS(action, 0, 0)
119   TRACE_Iteration_out(simgrid::s4u::this_actor::get_pid(), nullptr);
120 }
121 }
122 }
123 }
124
125 /** @ingroup plugin_loadbalancer
126  * @brief Initializes the load balancer plugin
127  * @details The load balancer plugin supports several AMPI load balancers that move ranks
128  * around, based on their host's load.
129  */
130 void sg_load_balancer_plugin_init()
131 {
132   static bool done = false;
133   if (!done) {
134     done = true;
135     simgrid::kernel::activity::ExecImpl::on_completion.connect([](simgrid::kernel::activity::ExecImplPtr activity){
136         simgrid::smpi::plugin::lb.record_actor_computation(activity->simcalls_.front()->issuer->iface(), activity->surf_action_->get_cost());
137     });
138
139     xbt_replay_action_register(
140         "migrate", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::plugin::MigrateAction().execute(action); });
141     xbt_replay_action_register("iteration_in", simgrid::smpi::plugin::action_iteration_in);
142     xbt_replay_action_register("iteration_out", simgrid::smpi::plugin::action_iteration_out);
143   }
144 }