-/* Copyright (c) 2004-2018. The SimGrid Team.
+/* Copyright (c) 2004-2022. The SimGrid Team.
* All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
-#include "SmpiHost.hpp"
+#include "smpi_host.hpp"
#include "private.hpp"
-#include "simgrid/msg.h" /* barrier */
#include "simgrid/s4u/Engine.hpp"
+#include "simgrid/s4u/Barrier.hpp"
#include "smpi_comm.hpp"
#include <map>
-namespace simgrid {
-namespace smpi {
-namespace app {
+XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(smpi);
+
+namespace simgrid::smpi::app {
+
+static int universe_size = 0;
class Instance {
public:
- Instance(const std::string name, int max_no_processes, int process_count, MPI_Comm comm,
- msg_bar_t finalization_barrier)
- : name(name)
- , size(max_no_processes)
- , present_processes(0)
- , comm_world(comm)
- , finalization_barrier(finalization_barrier)
- { }
-
- const std::string name;
- int size;
- int present_processes;
- MPI_Comm comm_world;
- msg_bar_t finalization_barrier;
+ explicit Instance(int max_no_processes) : size_(max_no_processes)
+ {
+ auto* group = new simgrid::smpi::Group(size_);
+ comm_world_ = new simgrid::smpi::Comm(group, nullptr, false, -1);
+ universe_size += max_no_processes;
+ bar_ = s4u::Barrier::create(size_);
+ }
+ s4u::BarrierPtr bar_;
+ unsigned int size_;
+ unsigned int finalized_ranks_ = 0;
+ MPI_Comm comm_world_;
};
-}
-}
-}
+} // namespace simgrid::smpi::app
using simgrid::smpi::app::Instance;
-static std::map<std::string, Instance> smpi_instances;
-extern int process_count; // How many processes have been allocated over all instances?
+static std::map<std::string, Instance, std::less<>> smpi_instances;
-/** \ingroup smpi_simulation
- * \brief Registers a running instance of a MPI program.
+/** @ingroup smpi_simulation
+ * @brief Registers a running instance of an MPI program.
*
- * FIXME : remove MSG from the loop at some point.
- * \param name the reference name of the function.
- * \param code the main mpi function (must have a int ..(int argc, char *argv[])) prototype
- * \param num_processes the size of the instance we want to deploy
+ * @param name the reference name of the function.
+ * @param code either the main mpi function
+ * (must have a int ..(int argc, char *argv[]) prototype) or nullptr
+ * (if the function deployment is managed somewhere else —
+ * e.g., when deploying manually or using smpirun)
+ * @param num_processes the size of the instance we want to deploy
*/
void SMPI_app_instance_register(const char *name, xbt_main_func_t code, int num_processes)
{
- if (code != nullptr) { // When started with smpirun, we will not execute a function
- SIMIX_function_register(name, code);
- }
-
- static int already_called = 0;
- if (not already_called) {
- already_called = 1;
- std::vector<simgrid::s4u::Host*> list = simgrid::s4u::Engine::get_instance()->get_all_hosts();
- for (auto const& host : list) {
- host->extension_set(new simgrid::smpi::SmpiHost(host));
- }
- }
-
- Instance instance(std::string(name), num_processes, process_count, MPI_COMM_NULL, MSG_barrier_init(num_processes));
- MPI_Group group = new simgrid::smpi::Group(instance.size);
- instance.comm_world = new simgrid::smpi::Comm(group, nullptr);
- MPI_Attr_put(instance.comm_world, MPI_UNIVERSE_SIZE, reinterpret_cast<void*>(instance.size));
+ if (code != nullptr) // When started with smpirun, we will not execute a function
+ simgrid::s4u::Engine::get_instance()->register_function(name, code);
- process_count+=num_processes;
+ smpi_instances.try_emplace(name, num_processes);
+}
- smpi_instances.insert(std::pair<std::string, Instance>(name, instance));
+void smpi_deployment_register_process(const std::string& instance_id, int rank, const simgrid::s4u::Actor* actor)
+{
+ const Instance& instance = smpi_instances.at(instance_id);
+ instance.comm_world_->group()->set_mapping(actor->get_pid(), rank);
}
-void smpi_deployment_register_process(const std::string instance_id, int rank, simgrid::s4u::ActorPtr actor)
+void smpi_deployment_startup_barrier(const std::string& instance_id)
{
- if (smpi_instances.empty()) // no instance registered, we probably used smpirun.
- return;
+ const Instance& instance = smpi_instances.at(instance_id);
+ instance.bar_->wait();
+}
+void smpi_deployment_unregister_process(const std::string& instance_id)
+{
Instance& instance = smpi_instances.at(instance_id);
+ instance.finalized_ranks_++;
- instance.present_processes++;
- instance.comm_world->group()->set_mapping(actor, rank);
+ if (instance.finalized_ranks_ == instance.size_) {
+ simgrid::smpi::Comm::destroy(instance.comm_world_);
+ smpi_instances.erase(instance_id);
+ }
}
-MPI_Comm* smpi_deployment_comm_world(const std::string instance_id)
+MPI_Comm* smpi_deployment_comm_world(const std::string& instance_id)
{
- if (smpi_instances.empty()) { // no instance registered, we probably used smpirun.
+ if (smpi_instances
+ .empty()) { // no instance registered, we probably used smpirun. (FIXME: I guess this never happens for real)
return nullptr;
}
Instance& instance = smpi_instances.at(instance_id);
- return &instance.comm_world;
+ return &instance.comm_world_;
}
-msg_bar_t smpi_deployment_finalization_barrier(const std::string instance_id)
+void smpi_deployment_cleanup_instances(){
+ for (auto const& [name, instance] : smpi_instances) {
+ XBT_INFO("Stalling SMPI instance: %s. Do all your MPI ranks call MPI_Finalize()?", name.c_str());
+ simgrid::smpi::Comm::destroy(instance.comm_world_);
+ }
+ smpi_instances.clear();
+}
+
+int smpi_get_universe_size()
{
- if (smpi_instances.empty()) { // no instance registered, we probably used smpirun.
- return nullptr;
+ return simgrid::smpi::app::universe_size;
+}
+
+/** @brief Auxiliary method to get list of hosts to deploy app */
+static std::vector<simgrid::s4u::Host*> smpi_get_hosts(const simgrid::s4u::Engine* e, const std::string& hostfile)
+{
+ if (hostfile == "") {
+ return e->get_all_hosts();
}
- Instance& instance = smpi_instances.at(instance_id);
- return instance.finalization_barrier;
+ std::vector<simgrid::s4u::Host*> hosts;
+ std::ifstream in(hostfile.c_str());
+ xbt_assert(in, "smpirun: Cannot open the host file: %s", hostfile.c_str());
+ std::string str;
+ while (std::getline(in, str)) {
+ if (not str.empty())
+ hosts.emplace_back(e->host_by_name(str));
+ }
+ xbt_assert(not hosts.empty(), "smpirun: the hostfile '%s' is empty", hostfile.c_str());
+ return hosts;
}
-void smpi_deployment_cleanup_instances(){
- for (auto const& item : smpi_instances) {
- Instance instance = item.second;
- MSG_barrier_destroy(instance.finalization_barrier);
- simgrid::smpi::Comm::destroy(instance.comm_world);
+/** @brief Read replay configuration from file */
+static std::vector<std::string> smpi_read_replay(const std::string& replayfile)
+{
+ std::vector<std::string> replay;
+ if (replayfile == "")
+ return replay;
+
+ std::ifstream in(replayfile.c_str());
+ xbt_assert(in, "smpirun: Cannot open the replay file: %s", replayfile.c_str());
+ std::string str;
+ while (std::getline(in, str)) {
+ if (not str.empty())
+ replay.emplace_back(str);
}
- smpi_instances.clear();
+
+ return replay;
+}
+
+/** @brief Build argument vector to pass to process */
+static std::vector<std::string> smpi_deployment_get_args(int rank_id, const std::vector<std::string>& replay,
+ const std::vector<const char*>& run_args)
+{
+ std::vector<std::string> args{std::to_string(rank_id)};
+ // pass arguments to process only if not a replay execution
+ if (replay.empty())
+ args.insert(args.end(), begin(run_args), end(run_args));
+ /* one trace per process */
+ if (replay.size() > 1)
+ args.emplace_back(replay[rank_id]);
+ return args;
+}
+
+/**
+ * @brief Deploy an SMPI application from a smpirun call
+ *
+ * This used to be done at smpirun script, parsing either the hostfile or the platform XML.
+ * If hostfile isn't provided, get the list of hosts from engine.
+ */
+int smpi_deployment_smpirun(const simgrid::s4u::Engine* e, const std::string& hostfile, int np,
+ const std::string& replayfile, int map, const std::vector<const char*>& run_args)
+{
+ auto hosts = smpi_get_hosts(e, hostfile);
+ auto replay = smpi_read_replay(replayfile);
+ int hosts_size = static_cast<int>(hosts.size());
+ if (np == 0)
+ np = hosts_size;
+
+ xbt_assert(np > 0, "Invalid number of process (np must be > 0). Check your np parameter, platform or hostfile");
+
+ if (np > hosts_size) {
+ XBT_INFO("You requested to use %d ranks, but there is only %d processes in your hostfile...", np, hosts_size);
+ }
+
+ for (int i = 0; i < np; i++) {
+ simgrid::s4u::Host* host = hosts[i % hosts_size];
+ std::string rank_id = std::to_string(i);
+ auto args = smpi_deployment_get_args(i, replay, run_args);
+ auto actor = simgrid::s4u::Actor::create(rank_id, host, rank_id, args);
+ /* keeping the same behavior as done in smpirun script, print mapping rank/process */
+ if (map != 0) {
+ XBT_INFO("[rank %d] -> %s", i, host->get_cname());
+ }
+ actor->set_property("instance_id", "smpirun");
+ actor->set_property("rank", rank_id);
+ if (not replay.empty())
+ actor->set_property("smpi_replay", "true");
+ /* shared trace file, set it to rank 0 */
+ if (i == 0 && replay.size() == 1)
+ actor->set_property("tracefile", replay[0]);
+ }
+ return np;
}