Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill a few globals.
[simgrid.git] / examples / smpi / replay_multiple_manual_deploy / replay_multiple_manual.cpp
index 45764a5..7707303 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009-2018. The SimGrid Team.
+/* Copyright (c) 2009-2023. The SimGrid Team.
  * All rights reserved.                                                     */
 
 /* This program is free software; you can redistribute it and/or modify it
       This is done to avoid SMPI actors to start at actor_id=0.
    3. For each job:
         1. Sleep until job's starting time is reached (if needed)
-        2. Launch the replay of the corresponding time-indepent trace.
+        2. Launch the replay of the corresponding time-independent trace.
         3. Create inter-process noise, by spawning useless actors.
-   4. Wait for completion (implicitly, via MSG_main's return)
+   4. Wait for completion (via s4u::Engine's run method)
 */
 
 #include <algorithm>
 #include <fstream>
-#include <iostream>
+#include <memory>
+#include <sstream>
 #include <stdexcept>
 #include <vector>
 
 #include <boost/algorithm/string.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/regex.hpp>
 
-#include <simgrid/msg.h>
 #include <simgrid/s4u.hpp>
 #include <smpi/smpi.h>
+#include <xbt/file.hpp>
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(replay_multiple_manual, "Messages specific for this example");
 
@@ -45,52 +44,21 @@ struct Job {
   int unique_job_number;                     //!< The job unique number in [0, n[.
 };
 
-// ugly globals to avoid creating structures for giving args to processes
-std::vector<simgrid::s4u::Host*> hosts;
-int noise_between_jobs;
-
-static bool job_comparator(const Job* j1, const Job* j2)
+static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank)
 {
-  if (j1->starting_time == j2->starting_time)
-    return j1->smpi_app_name < j2->smpi_app_name;
-  return j1->starting_time < j2->starting_time;
-}
-
-struct s_smpi_replay_process_args {
-  Job* job;
-  msg_sem_t semaphore;
-  int rank;
-};
-
-static int smpi_replay_process(int argc, char* argv[])
-{
-  s_smpi_replay_process_args* args = static_cast<s_smpi_replay_process_args*>(MSG_process_get_data(MSG_process_self()));
-
-  if (args->semaphore != nullptr)
-    MSG_sem_acquire(args->semaphore);
-
-  XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
-           args->job->smpi_app_name.c_str());
-
-  smpi_replay_run(&argc, &argv);
-  XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
-           args->job->smpi_app_name.c_str());
+  XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str());
+  smpi_replay_run(job->smpi_app_name.c_str(), rank, 0, job->traces_filenames[rank].c_str());
+  XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number,
+           job->smpi_app_name.c_str());
 
-  if (args->semaphore != nullptr)
-    MSG_sem_release(args->semaphore);
-
-  delete args;
-  return 0;
+  barrier->wait();
 }
 
 // Sleeps for a given amount of time
-static int sleeper_process(int* param)
+static int sleeper_process(int param)
 {
-  XBT_DEBUG("Sleeping for %d seconds", *param);
-  simgrid::s4u::this_actor::sleep_for(*param);
-
-  delete param;
-
+  XBT_DEBUG("Sleeping for %d seconds", param);
+  simgrid::s4u::this_actor::sleep_for(param);
   return 0;
 }
 
@@ -98,62 +66,38 @@ static int sleeper_process(int* param)
 static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host)
 {
   for (int i = 0; i < nb_processes; ++i) {
-    int* param = new int;
-    *param     = i + 1;
+    int param = i + 1;
     simgrid::s4u::Actor::create("meh", host, sleeper_process, param);
   }
 }
 
-static int job_executor_process(Job* job)
+static int job_executor_process(const std::vector<simgrid::s4u::Host*>& hosts, Job* job)
 {
-  msg_sem_t job_semaphore = MSG_sem_init(1);
   XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
 
+  simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1);
+
   for (int i = 0; i < job->app_size; ++i) {
-    char *str_instance_name, *str_rank, *str_pname, *str_tfname;
-    int err = asprintf(&str_instance_name, "%s", job->smpi_app_name.c_str());
-    xbt_assert(err != -1, "asprintf error");
-    err = asprintf(&str_rank, "%d", i);
-    xbt_assert(err != -1, "asprintf error");
-    err = asprintf(&str_pname, "%d_%d", job->unique_job_number, i);
-    xbt_assert(err != -1, "asprintf error");
-    err = asprintf(&str_tfname, "%s", job->traces_filenames[i].c_str());
-    xbt_assert(err != -1, "asprintf error");
-
-    char** argv = xbt_new(char*, 5);
-    argv[0]     = xbt_strdup("1");   // log only?
-    argv[1]     = str_instance_name; // application instance
-    argv[2]     = str_rank;          // rank
-    argv[3]     = str_tfname;        // smpi trace file for this rank
-    argv[4]     = xbt_strdup("0");   // ?
-
-    s_smpi_replay_process_args* args = new s_smpi_replay_process_args;
-    args->job                        = job;
-    args->semaphore                  = nullptr;
-    args->rank                       = i;
-
-    if (i == 0)
-      args->semaphore = job_semaphore;
-
-    MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv);
-    free(str_pname);
+    char* str_pname = bprintf("rank_%d_%d", job->unique_job_number, i);
+    simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i);
+    xbt_free(str_pname);
   }
 
-  MSG_sem_acquire(job_semaphore);
-  MSG_sem_destroy(job_semaphore);
+  barrier->wait();
 
+  simgrid::s4u::this_actor::sleep_for(1);
   XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
 
   return 0;
 }
 
 // Executes a workload of SMPI processes
-static int workload_executor_process(std::vector<Job*>* workload)
+static int workload_executor_process(const std::vector<simgrid::s4u::Host*>& hosts,
+                                     const std::vector<std::unique_ptr<Job>>& workload, int noise_between_jobs)
 {
-  for (Job* job : *workload) {
+  for (auto const& job : workload) {
     // Let's wait until the job's waiting time if needed
-    double curr_time = simgrid::s4u::Engine::get_clock();
-    if (job->starting_time > curr_time) {
+    if (double curr_time = simgrid::s4u::Engine::get_clock(); job->starting_time > curr_time) {
       double time_to_sleep = (double)job->starting_time - curr_time;
       XBT_INFO("Sleeping %g seconds (waiting for job %d, app '%s')", time_to_sleep, job->starting_time,
                job->smpi_app_name.c_str());
@@ -168,45 +112,48 @@ static int workload_executor_process(std::vector<Job*>* workload)
     }
 
     // Let's finally run the job executor
-    std::string job_process_name = "job_" + job->smpi_app_name;
+    char* str_pname = bprintf("job_%04d", job->unique_job_number);
     XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
-    simgrid::s4u::Actor::create(job_process_name.c_str(), hosts[job->allocation[0]], job_executor_process, job);
+    simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[0]], job_executor_process, std::cref(hosts),
+                                job.get());
+    xbt_free(str_pname);
   }
 
   return 0;
 }
 
 // Reads jobs from a workload file and returns them
-static std::vector<Job*> all_jobs(const std::string& workload_file)
+static std::vector<std::unique_ptr<Job>> all_jobs(const std::string& workload_file)
 {
   std::ifstream f(workload_file);
   xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str());
-  std::vector<Job*> jobs;
+  std::vector<std::unique_ptr<Job>> jobs;
 
-  boost::filesystem::path path(workload_file);
-  std::string dir = path.parent_path().native();
+  simgrid::xbt::Path path(workload_file);
+  std::string dir = path.get_dir_name();
 
-  boost::regex r(R"(^\s*(\S+)\s+(\S+\.txt)\s+(\d+)\s+(\d+)\s+(\d+(?:,\d+)*).*$)");
   std::string line;
   while (std::getline(f, line)) {
-    boost::smatch m;
-
-    if (boost::regex_match(line, m, r)) {
+    std::string app_name;
+    std::string filename_unprefixed;
+    int app_size;
+    int starting_time;
+    std::string alloc;
+
+    std::istringstream is(line);
+    if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) {
       try {
-        Job* job           = new Job;
-        job->smpi_app_name = m[1];
-        job->filename      = dir + "/" + std::string(m[2]);
-        job->app_size      = stoi(m[3]);
-        job->starting_time = stoi(m[4]);
-        std::string alloc  = m[5];
-
-        std::string filename_unprefixed = m[2];
+        auto job           = std::make_unique<Job>();
+        job->smpi_app_name = app_name;
+        job->filename      = dir + "/" + filename_unprefixed;
+        job->app_size      = app_size;
+        job->starting_time = starting_time;
 
         std::vector<std::string> subparts;
         boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on);
 
         if ((int)subparts.size() != job->app_size)
-          throw std::runtime_error("size/alloc inconsistency");
+          throw std::invalid_argument("size/alloc inconsistency");
 
         job->allocation.resize(subparts.size());
         for (unsigned int i = 0; i < subparts.size(); ++i)
@@ -214,8 +161,8 @@ static std::vector<Job*> all_jobs(const std::string& workload_file)
 
         // Let's read the filename
         std::ifstream traces_file(job->filename);
-        if (!traces_file.is_open())
-          throw std::runtime_error("Cannot open file " + job->filename);
+        if (not traces_file.is_open())
+          throw std::invalid_argument("Cannot open file " + job->filename);
 
         std::string traces_line;
         while (std::getline(traces_file, traces_line)) {
@@ -224,24 +171,27 @@ static std::vector<Job*> all_jobs(const std::string& workload_file)
         }
 
         if (static_cast<int>(job->traces_filenames.size()) < job->app_size)
-          throw std::runtime_error("size/tracefiles inconsistency");
+          throw std::invalid_argument("size/tracefiles inconsistency");
         job->traces_filenames.resize(job->app_size);
 
         XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, "
                  "alloc='%s'",
                  job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time,
                  alloc.c_str());
-        jobs.push_back(job);
-      } catch (const std::exception& e) {
-        printf("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
+        jobs.emplace_back(std::move(job));
+      } catch (const std::invalid_argument& e) {
+        xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
       }
     }
   }
 
   // Jobs are sorted by ascending date, then by lexicographical order of their
   // application names
-  sort(jobs.begin(), jobs.end(), job_comparator);
-
+  sort(jobs.begin(), jobs.end(), [](auto const& j1, auto const& j2) {
+    if (j1->starting_time == j2->starting_time)
+      return j1->smpi_app_name < j2->smpi_app_name;
+    return j1->starting_time < j2->starting_time;
+  });
   for (unsigned int i = 0; i < jobs.size(); ++i)
     jobs[i]->unique_job_number = i;
 
@@ -256,18 +206,17 @@ int main(int argc, char* argv[])
              argv[0], argv[0]);
 
   //  Simulation setting
-  MSG_init(&argc, argv);
   simgrid::s4u::Engine e(&argc, argv);
   e.load_platform(argv[1]);
-  hosts = e.get_all_hosts();
+  const auto hosts = e.get_all_hosts();
   xbt_assert(hosts.size() >= 4, "The given platform should contain at least 4 hosts (found %zu).", hosts.size());
 
   // Let's retrieve all SMPI jobs
-  std::vector<Job*> jobs = all_jobs(argv[2]);
+  std::vector<std::unique_ptr<Job>> jobs = all_jobs(argv[2]);
 
   // Let's register them
-  for (const Job* job : jobs)
-    SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size);
+  for (auto const& job : jobs)
+    SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size);
 
   SMPI_init();
 
@@ -275,7 +224,7 @@ int main(int argc, char* argv[])
   int initial_noise = std::stoi(argv[3]);
   xbt_assert(initial_noise >= 0, "Invalid initial_noise argument");
 
-  noise_between_jobs = std::stoi(argv[4]);
+  int noise_between_jobs = std::stoi(argv[4]);
   xbt_assert(noise_between_jobs >= 0, "Invalid noise_between_jobs argument");
 
   if (initial_noise > 0) {
@@ -284,15 +233,13 @@ int main(int argc, char* argv[])
   }
 
   // Let's execute the workload
-  simgrid::s4u::Actor::create("workload_executor", hosts[0], workload_executor_process, &jobs);
+  simgrid::s4u::Actor::create("workload", hosts[0], workload_executor_process, std::cref(hosts), std::cref(jobs),
+                              noise_between_jobs);
 
   e.run();
-  XBT_INFO("Simulation finished! Final time: %g", e.get_clock());
+  XBT_INFO("Simulation finished! Final time: %g", simgrid::s4u::Engine::get_clock());
 
   SMPI_finalize();
 
-  for (const Job* job : jobs)
-    delete job;
-
   return 0;
 }