Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
kill smpi_process_count(), use smpi_get_universe_size() instead
[simgrid.git] / src / smpi / plugins / sampi_loadbalancer.cpp
1 /* Copyright (c) 2018-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/plugins/load_balancer.h>
7 #include <simgrid/s4u.hpp>
8 #include <simgrid/smpi/replay.hpp>
9 #include <smpi/smpi.h>
10 #include <src/smpi/include/smpi_comm.hpp>
11 #include <src/smpi/include/smpi_actor.hpp>
12 #include <src/smpi/plugins/ampi/instr_ampi.hpp>
13 #include <src/smpi/plugins/ampi/ampi.hpp>
14 #include <xbt/replay.hpp>
15
16 #include "src/kernel/activity/ExecImpl.hpp"
17 #include "src/kernel/actor/ActorImpl.hpp"
18 #include "src/smpi/plugins/load_balancer/load_balancer.hpp" // This is not yet ready to be public
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(plugin_load_balancer, smpi, "Logging specific to the SMPI load balancing plugin");
21
22 static simgrid::config::Flag<int> cfg_migration_frequency("smpi/plugin/lb/migration-frequency", {"smpi/plugin/lb/migration_frequency"},
23     "After how many calls to the migration function should the migration be actually executed?", 10,
24     [](double val){if (val != 10) sg_load_balancer_plugin_init();});
25
26 namespace simgrid {
27 namespace smpi {
28 namespace plugin {
29
30 static simgrid::plugin::loadbalancer::LoadBalancer lb;
31
32 class MigrateParser : public simgrid::smpi::replay::ActionArgParser {
33 public:
34   double memory_consumption;
35   void parse(simgrid::xbt::ReplayAction& action, const std::string&)
36   {
37     // The only parameter is the amount of memory used by the current process.
38     CHECK_ACTION_PARAMS(action, 1, 0);
39     memory_consumption = std::stod(action[2]);
40   }
41 };
42
43 /* This function simulates what happens when the original application calls
44  * (A)MPI_Migrate. It executes the load balancing heuristics, makes the necessary
45  * migrations and updates the task mapping in the load balancer. 
46  */
47 class MigrateAction : public simgrid::smpi::replay::ReplayAction<simgrid::smpi::plugin::MigrateParser> {
48 public:
49   explicit MigrateAction() : ReplayAction("Migrate") {}
50   void kernel(simgrid::xbt::ReplayAction&)
51   {
52     static std::map<simgrid::s4u::ActorPtr, int> migration_call_counter;
53     static simgrid::s4u::Barrier smpilb_bar(smpi_get_universe_size());
54     simgrid::s4u::Host* cur_host = simgrid::s4u::this_actor::get_host();
55     simgrid::s4u::Host* migrate_to_host;
56
57     TRACE_migration_call(my_proc_id, nullptr);
58
59     // We only migrate every "cfg_migration_frequency"-times, not at every call
60     migration_call_counter[simgrid::s4u::Actor::self()]++;
61     if ((migration_call_counter[simgrid::s4u::Actor::self()] % simgrid::config::get_value<int>(cfg_migration_frequency.get_name())) != 0) {
62       return;
63     }
64
65     // TODO cheinrich: Why do we need this barrier?
66     smpilb_bar.wait();
67
68     static bool was_executed = false;
69     if (not was_executed) {
70       was_executed = true;
71       XBT_DEBUG("Process %li runs the load balancer", my_proc_id);
72       smpi_bench_begin();
73       lb.run();
74       smpi_bench_end();
75     }
76
77     // This barrier is required to ensure that the mapping has been computed and is available
78     smpilb_bar.wait();
79     was_executed = false; // Must stay behind this barrier so that all processes have passed the if clause
80
81     migrate_to_host = lb.get_mapping(simgrid::s4u::Actor::self());
82     if (cur_host != migrate_to_host) { // Origin and dest are not the same -> migrate
83       std::vector<simgrid::s4u::Host*> migration_hosts = {cur_host, migrate_to_host};
84       std::vector<double> comp_amount                  = {0, 0};
85       std::vector<double> comm_amount = {0, /*must not be 0*/ std::max(args.memory_consumption, 1.0), 0, 0};
86
87       xbt_os_timer_t timer = smpi_process()->timer();
88       xbt_os_threadtimer_start(timer);
89       simgrid::s4u::this_actor::parallel_execute(migration_hosts, comp_amount, comm_amount, -1.0);
90       xbt_os_threadtimer_stop(timer);
91       smpi_execute(xbt_os_timer_elapsed(timer));
92
93       // Update the process and host mapping in SimGrid.
94       XBT_DEBUG("Migrating process %li from %s to %s", my_proc_id, cur_host->get_cname(), migrate_to_host->get_cname());
95       TRACE_smpi_process_change_host(my_proc_id, migrate_to_host);
96       simgrid::s4u::this_actor::migrate(migrate_to_host);
97     }
98
99     smpilb_bar.wait();
100
101     smpi_bench_begin();
102   }
103 };
104
105 /******************************************************************************
106  *         Code to include the duration of iterations in the trace.           *
107  ******************************************************************************/
108
109 // FIXME Move declaration
110 XBT_PRIVATE void action_iteration_in(simgrid::xbt::ReplayAction& action);
111 void action_iteration_in(simgrid::xbt::ReplayAction& action)
112 {
113   CHECK_ACTION_PARAMS(action, 0, 0)
114   TRACE_Iteration_in(simgrid::s4u::this_actor::get_pid(), nullptr);
115   simgrid::smpi::plugin::ampi::on_iteration_in(*MPI_COMM_WORLD->group()->actor(std::stol(action[0])));
116 }
117
118 XBT_PRIVATE void action_iteration_out(simgrid::xbt::ReplayAction& action);
119 void action_iteration_out(simgrid::xbt::ReplayAction& action)
120 {
121   CHECK_ACTION_PARAMS(action, 0, 0)
122   TRACE_Iteration_out(simgrid::s4u::this_actor::get_pid(), nullptr);
123   simgrid::smpi::plugin::ampi::on_iteration_out(*MPI_COMM_WORLD->group()->actor(std::stol(action[0])));
124 }
125 }
126 }
127 }
128
129 /** @ingroup plugin_loadbalancer
130  * @brief Initializes the load balancer plugin
131  * @details The load balancer plugin supports several AMPI load balancers that move ranks
132  * around, based on their host's load.
133  */
134 void sg_load_balancer_plugin_init()
135 {
136   static bool done = false;
137   if (!done) {
138     done = true;
139     simgrid::kernel::activity::ExecImpl::on_completion.connect([](simgrid::kernel::activity::ExecImpl const& activity) {
140       simgrid::smpi::plugin::lb.record_actor_computation(activity.simcalls_.front()->issuer->iface(),
141                                                          activity.surf_action_->get_cost());
142     });
143
144     xbt_replay_action_register(
145         "migrate", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::plugin::MigrateAction().execute(action); });
146     xbt_replay_action_register("iteration_in", simgrid::smpi::plugin::action_iteration_in);
147     xbt_replay_action_register("iteration_out", simgrid::smpi::plugin::action_iteration_out);
148   }
149 }