Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[SAMPI] Update the sampi_loadbalancer code
authorChristian Heinrich <franz-christian.heinrich@inria.fr>
Wed, 16 May 2018 13:46:05 +0000 (15:46 +0200)
committerChristian Heinrich <franz-christian.heinrich@inria.fr>
Thu, 2 Aug 2018 19:55:53 +0000 (21:55 +0200)
This is a first version for our new load balancing
module. It's not yet ready, but we're getting there.

examples/smpi/load_balancer/load_balancer.cpp
src/smpi/plugins/sampi_loadbalancer.cpp

index 61e1413..e9722af 100644 (file)
@@ -34,18 +34,7 @@ int main(int argc, char* argv[])
   AMPI_Migrate(MPI_COMM_WORLD);
   if (rank != 0)
   free(pointer);
-  /* Connect your callback function to the "blah" event in the trace files */
-  //xbt_replay_action_register("blah", action_blah);
 
-  /* The send action is an override, so we have to first save its previous value in a global */
-  //int rank;
-  //MPI_Comm_rank(MPI_COMM_WORLD, &rank);
-  //if (rank == 0) {
-    //previous_send = xbt_replay_action_get("send");
-    //xbt_replay_action_register("send", overriding_send);
-  //}
-  /* The regular run of the replayer */
-  //smpi_replay_main(&argc, &argv);
   MPI_Finalize();
   return 0;
 }
index c9b9a45..f367ede 100644 (file)
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include <simgrid/plugins/load_balancer.h>
-#include <simgrid/s4u/Actor.hpp>
+#include <simgrid/s4u.hpp>
 #include <simgrid/smpi/replay.hpp>
+#include <smpi/smpi.h>
+#include <src/smpi/include/smpi_comm.hpp>
+#include <src/smpi/include/smpi_actor.hpp>
 #include <xbt/replay.hpp>
 
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(plugin_load_balancer, surf, "Logging specific to the SMPI load balancing plugin");
+#include "src/kernel/activity/ExecImpl.hpp"
+#include "src/simix/ActorImpl.hpp"
+#include <simgrid/smpi/loadbalancer/load_balancer.hpp>
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(plugin_load_balancer, smpi, "Logging specific to the SMPI load balancing plugin");
+
+static simgrid::config::Flag<int> cfg_migration_frequency("smpi/plugin/lb/migration-frequency", {"smpi/plugin/lb/migration_frequency"},
+    "After how many calls to the migration function should the migration be actually executed?", 10,
+    [](double val){if (val != 10) sg_load_balancer_plugin_init();});
 
 namespace simgrid {
 namespace smpi {
 namespace plugin {
 
+static simgrid::plugin::loadbalancer::LoadBalancer lb;
+
+class MigrateParser : public simgrid::smpi::replay::ActionArgParser {
+public:
+  double memory_consumption;
+  void parse(simgrid::xbt::ReplayAction& action, std::string name)
+  {
+    // The only parameter is the amount of memory used by the current process.
+    CHECK_ACTION_PARAMS(action, 1, 0);
+    memory_consumption = std::stod(action[2]);
+  }
+};
+
+/* This function simulates what happens when the original application calls
+ * (A)MPI_Migrate. It executes the load balancing heuristics, makes the necessary
+ * migrations and updates the task mapping in the load balancer. 
+ */
+class MigrateAction : public simgrid::smpi::replay::ReplayAction<simgrid::smpi::plugin::MigrateParser> {
+public:
+  explicit MigrateAction() : ReplayAction("Migrate") {}
+  void kernel(simgrid::xbt::ReplayAction& action)
+  {
+    static std::map<simgrid::s4u::ActorPtr, int> migration_counter;
+    static simgrid::s4u::Barrier smpilb_bar(smpi_process_count());
+    simgrid::s4u::Host* cur_host = simgrid::s4u::this_actor::get_host();
+    simgrid::s4u::Host* migrate_to_host;
+
+    TRACE_migration_call(my_proc_id, NULL);
+
+    // TODO cheinrich: Why do we need this barrier?
+    smpilb_bar.wait();
+
+    static bool was_executed = false;
+    if (not was_executed) {
+      was_executed = true;
+      smpi_bench_begin();
+      XBT_INFO("RUNNING THE LB");
+      lb.run();
+      smpi_bench_end();
+    }
+
+    // This barrier is required to ensure that the mapping has been computed and is available
+    smpilb_bar.wait();
+    was_executed = false; // Must stay behind this barrier so that all processes have passed the if clause
+    migration_counter[simgrid::s4u::Actor::self()]++;
+    if ((migration_counter[simgrid::s4u::Actor::self()] % simgrid::config::get_value<int>(cfg_migration_frequency.get_name())) != 0) {
+      return;
+    }
+
+    migrate_to_host = lb.get_mapping();
+
+    if (cur_host != migrate_to_host) { // Origin and dest are not the same -> migrate
+      sg_host_t migration_hosts[2] = {cur_host, migrate_to_host};
+      // Changing this to double[2] ... will cause trouble with parallel_execute, because that fct is trying to call free().
+      double* comp_amount  = new double[2]{0, 0};
+      double* comm_amount  = new double[4]{0, std::max(args.memory_consumption, 1.0), 0, 0};
 
+      xbt_os_timer_t timer = smpi_process()->timer();
+      xbt_os_threadtimer_start(timer);
+      simgrid::s4u::this_actor::parallel_execute(2, migration_hosts, comp_amount, comm_amount, -1.0);
+      xbt_os_threadtimer_stop(timer);
+      smpi_execute(xbt_os_timer_elapsed(timer));
+
+      // Update the process and host mapping in SimGrid.
+      TRACE_smpi_process_change_host(my_proc_id, migrate_to_host);
+      simgrid::s4u::this_actor::migrate(migrate_to_host);
+    }
+
+    smpilb_bar.wait();
+
+    smpi_bench_begin();
+  }
+};
+
+/******************************************************************************
+ *         Code to include the duration of iterations in the trace.           *
+ ******************************************************************************/
+
+// FIXME Move declaration
+XBT_PRIVATE void action_iteration_in(simgrid::xbt::ReplayAction& action);
+void action_iteration_in(simgrid::xbt::ReplayAction& action)
+{
+  CHECK_ACTION_PARAMS(action, 0, 0)
+  TRACE_Iteration_in(simgrid::s4u::this_actor::get_pid(), nullptr);
+}
+
+// FIXME Move declaration
+XBT_PRIVATE void action_iteration_out(simgrid::xbt::ReplayAction& action);
+void action_iteration_out(simgrid::xbt::ReplayAction& action)
+{
+  CHECK_ACTION_PARAMS(action, 0, 0)
+  TRACE_Iteration_out(simgrid::s4u::this_actor::get_pid(), nullptr);
+}
 }
 }
 }
@@ -26,4 +129,16 @@ namespace plugin {
  */
 void sg_load_balancer_plugin_init()
 {
+  static bool done = false;
+  if (!done) {
+    done = true;
+    simgrid::kernel::activity::ExecImpl::on_completion.connect([](simgrid::kernel::activity::ExecImplPtr activity){
+        simgrid::smpi::plugin::lb.record_actor_computation(activity->simcalls_.front()->issuer->iface(), activity->surf_action_->get_cost());
+    });
+
+    xbt_replay_action_register(
+        "migrate", [](simgrid::xbt::ReplayAction& action) { simgrid::smpi::plugin::MigrateAction().execute(action); });
+    xbt_replay_action_register("iteration_in", simgrid::smpi::plugin::action_iteration_in);
+    xbt_replay_action_register("iteration_out", simgrid::smpi::plugin::action_iteration_out);
+  }
 }