+class MigrateParser : public replay::ActionArgParser {
+public:
+ double memory_consumption;
+ void parse(xbt::ReplayAction& action, const std::string&) override
+ {
+ // The only parameter is the amount of memory used by the current process.
+ CHECK_ACTION_PARAMS(action, 1, 0);
+ memory_consumption = std::stod(action[2]);
+ }
+};
+
+/* This function simulates what happens when the original application calls (A)MPI_Migrate. It executes the load
+ * balancing heuristics, makes the necessary migrations and updates the task mapping in the load balancer.
+ */
+class MigrateAction : public replay::ReplayAction<smpi::plugin::MigrateParser> {
+public:
+ explicit MigrateAction() : ReplayAction("Migrate") {}
+ void kernel(xbt::ReplayAction&) override
+ {
+ static std::map<s4u::ActorPtr, int> migration_call_counter;
+ static s4u::Barrier smpilb_bar(smpi_get_universe_size());
+ s4u::Host* cur_host = s4u::this_actor::get_host();
+ s4u::Host* migrate_to_host;
+
+ TRACE_migration_call(get_pid(), nullptr);
+
+ // We only migrate every "cfg_migration_frequency"-times, not at every call
+ migration_call_counter[s4u::Actor::self()]++;
+ if ((migration_call_counter[s4u::Actor::self()] % config::get_value<int>(cfg_migration_frequency.get_name())) !=
+ 0) {
+ return;
+ }
+
+ // TODO cheinrich: Why do we need this barrier?
+ smpilb_bar.wait();
+
+ static bool was_executed = false;
+ if (not was_executed) {
+ was_executed = true;
+ XBT_DEBUG("Process %li runs the load balancer", get_pid());
+ smpi_bench_begin();
+ lb.run();
+ smpi_bench_end();
+ }
+
+ // This barrier is required to ensure that the mapping has been computed and is available
+ smpilb_bar.wait();
+ was_executed = false; // Must stay behind this barrier so that all processes have passed the if clause
+
+ migrate_to_host = lb.get_mapping(simgrid::s4u::Actor::self());
+ if (cur_host != migrate_to_host) { // Origin and dest are not the same -> migrate
+ std::vector<s4u::Host*> migration_hosts = {cur_host, migrate_to_host};
+ std::vector<double> comp_amount = {0, 0};
+ std::vector<double> comm_amount = {0, /*must not be 0*/ std::max(get_args().memory_consumption, 1.0), 0, 0};
+
+ xbt_os_timer_t timer = smpi_process()->timer();
+ xbt_os_threadtimer_start(timer);
+ s4u::this_actor::parallel_execute(migration_hosts, comp_amount, comm_amount);
+ xbt_os_threadtimer_stop(timer);
+ smpi_execute(xbt_os_timer_elapsed(timer));
+
+ // Update the process and host mapping in SimGrid.
+ XBT_DEBUG("Migrating process %li from %s to %s", get_pid(), cur_host->get_cname(), migrate_to_host->get_cname());
+ TRACE_smpi_process_change_host(get_pid(), migrate_to_host);
+ s4u::this_actor::set_host(migrate_to_host);
+ }
+
+ smpilb_bar.wait();
+
+ smpi_bench_begin();
+ }
+};
+
+/******************************************************************************
+ * Code to include the duration of iterations in the trace. *
+ ******************************************************************************/
+
+// FIXME Move declaration
+XBT_PRIVATE void action_iteration_in(xbt::ReplayAction& action);
+void action_iteration_in(xbt::ReplayAction& action)
+{
+ CHECK_ACTION_PARAMS(action, 0, 0)
+ TRACE_Iteration_in(s4u::this_actor::get_pid(), nullptr);
+ smpi::plugin::ampi::on_iteration_in(*MPI_COMM_WORLD->group()->actor(std::stol(action[0])));