Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Use :: to call get_clock() which is a static member of s4u::Engine.
[simgrid.git] / examples / smpi / replay_multiple_manual_deploy / replay_multiple_manual.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /* This example shows how to replay SMPI time-independent traces in a dynamic
8    fashion. It is inspired from Batsim (https://github.com/oar-team/batsim).
9
10    The program workflow can be summarized as:
11    1. Read an input workload (set of jobs).
12       Each job is a time-independent trace and a starting time.
13    2. Create initial noise, by spawning useless actors.
14       This is done to avoid SMPI actors to start at actor_id=0.
15    3. For each job:
16         1. Sleep until job's starting time is reached (if needed)
17         2. Launch the replay of the corresponding time-independent trace.
18         3. Create inter-process noise, by spawning useless actors.
19    4. Wait for completion (via s4u::Engine's run method)
20 */
21
22 #include <algorithm>
23 #include <fstream>
24 #include <memory>
25 #include <sstream>
26 #include <stdexcept>
27 #include <vector>
28
29 #include <boost/algorithm/string.hpp>
30
31 #include <simgrid/s4u.hpp>
32 #include <smpi/smpi.h>
33 #include <xbt/file.hpp>
34
35 XBT_LOG_NEW_DEFAULT_CATEGORY(replay_multiple_manual, "Messages specific for this example");
36
37 struct Job {
38   std::string smpi_app_name;   //!< The unique name of the SMPI application
39   std::string filename;        //!<  The filename of the main trace file (which contains other filenames for each rank)
40   int app_size;                //!< The number of processes (actors) of the job
41   int starting_time;           //!< When the job should start
42   std::vector<int> allocation; //!< Where the job should be executed. Values are hosts indexes.
43   std::vector<std::string> traces_filenames; //!< The filenames of the different action files. Read from filename.
44   int unique_job_number;                     //!< The job unique number in [0, n[.
45 };
46
47 // ugly globals to avoid creating structures for giving args to processes
48 static std::vector<simgrid::s4u::Host*> hosts;
49 static int noise_between_jobs;
50
51 static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank)
52 {
53   XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str());
54   smpi_replay_run(job->smpi_app_name.c_str(), rank, 0, job->traces_filenames[rank].c_str());
55   XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number,
56            job->smpi_app_name.c_str());
57
58   barrier->wait();
59 }
60
61 // Sleeps for a given amount of time
62 static int sleeper_process(int param)
63 {
64   XBT_DEBUG("Sleeping for %d seconds", param);
65   simgrid::s4u::this_actor::sleep_for(param);
66   return 0;
67 }
68
69 // Launches some sleeper processes
70 static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host)
71 {
72   for (int i = 0; i < nb_processes; ++i) {
73     int param = i + 1;
74     simgrid::s4u::Actor::create("meh", host, sleeper_process, param);
75   }
76 }
77
78 static int job_executor_process(Job* job)
79 {
80   XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
81
82   simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1);
83
84   for (int i = 0; i < job->app_size; ++i) {
85     char* str_pname = bprintf("rank_%d_%d", job->unique_job_number, i);
86     simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i);
87     xbt_free(str_pname);
88   }
89
90   barrier->wait();
91
92   simgrid::s4u::this_actor::sleep_for(1);
93   XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
94
95   return 0;
96 }
97
98 // Executes a workload of SMPI processes
99 static int workload_executor_process(const std::vector<std::unique_ptr<Job>>* workload)
100 {
101   for (auto const& job : *workload) {
102     // Let's wait until the job's waiting time if needed
103     double curr_time = simgrid::s4u::Engine::get_clock();
104     if (job->starting_time > curr_time) {
105       double time_to_sleep = (double)job->starting_time - curr_time;
106       XBT_INFO("Sleeping %g seconds (waiting for job %d, app '%s')", time_to_sleep, job->starting_time,
107                job->smpi_app_name.c_str());
108       simgrid::s4u::this_actor::sleep_for(time_to_sleep);
109     }
110
111     if (noise_between_jobs > 0) {
112       // Let's add some process noise
113       XBT_DEBUG("Popping %d noise processes before running job %d (app '%s')", noise_between_jobs,
114                 job->unique_job_number, job->smpi_app_name.c_str());
115       pop_some_processes(noise_between_jobs, hosts[job->allocation[0]]);
116     }
117
118     // Let's finally run the job executor
119     char* str_pname = bprintf("job_%04d", job->unique_job_number);
120     XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
121     simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[0]], job_executor_process, job.get());
122     xbt_free(str_pname);
123   }
124
125   return 0;
126 }
127
128 // Reads jobs from a workload file and returns them
129 static std::vector<std::unique_ptr<Job>> all_jobs(const std::string& workload_file)
130 {
131   std::ifstream f(workload_file);
132   xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str());
133   std::vector<std::unique_ptr<Job>> jobs;
134
135   simgrid::xbt::Path path(workload_file);
136   std::string dir = path.get_dir_name();
137
138   std::string line;
139   while (std::getline(f, line)) {
140     std::string app_name;
141     std::string filename_unprefixed;
142     int app_size;
143     int starting_time;
144     std::string alloc;
145
146     std::istringstream is(line);
147     if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) {
148       try {
149         auto job           = std::make_unique<Job>();
150         job->smpi_app_name = app_name;
151         job->filename      = dir + "/" + filename_unprefixed;
152         job->app_size      = app_size;
153         job->starting_time = starting_time;
154
155         std::vector<std::string> subparts;
156         boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on);
157
158         if ((int)subparts.size() != job->app_size)
159           throw std::invalid_argument("size/alloc inconsistency");
160
161         job->allocation.resize(subparts.size());
162         for (unsigned int i = 0; i < subparts.size(); ++i)
163           job->allocation[i] = stoi(subparts[i]);
164
165         // Let's read the filename
166         std::ifstream traces_file(job->filename);
167         if (!traces_file.is_open())
168           throw std::invalid_argument("Cannot open file " + job->filename);
169
170         std::string traces_line;
171         while (std::getline(traces_file, traces_line)) {
172           boost::trim_right(traces_line);
173           job->traces_filenames.push_back(dir + "/" + traces_line);
174         }
175
176         if (static_cast<int>(job->traces_filenames.size()) < job->app_size)
177           throw std::invalid_argument("size/tracefiles inconsistency");
178         job->traces_filenames.resize(job->app_size);
179
180         XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, "
181                  "alloc='%s'",
182                  job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time,
183                  alloc.c_str());
184         jobs.emplace_back(std::move(job));
185       } catch (const std::invalid_argument& e) {
186         xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
187       }
188     }
189   }
190
191   // Jobs are sorted by ascending date, then by lexicographical order of their
192   // application names
193   sort(jobs.begin(), jobs.end(), [](auto const& j1, auto const& j2) {
194     if (j1->starting_time == j2->starting_time)
195       return j1->smpi_app_name < j2->smpi_app_name;
196     return j1->starting_time < j2->starting_time;
197   });
198   for (unsigned int i = 0; i < jobs.size(); ++i)
199     jobs[i]->unique_job_number = i;
200
201   return jobs;
202 }
203
204 int main(int argc, char* argv[])
205 {
206   xbt_assert(argc > 4,
207              "Usage: %s platform_file workload_file initial_noise noise_between_jobs\n"
208              "\tExample: %s platform.xml workload_compute\n",
209              argv[0], argv[0]);
210
211   //  Simulation setting
212   simgrid::s4u::Engine e(&argc, argv);
213   e.load_platform(argv[1]);
214   hosts = e.get_all_hosts();
215   xbt_assert(hosts.size() >= 4, "The given platform should contain at least 4 hosts (found %zu).", hosts.size());
216
217   // Let's retrieve all SMPI jobs
218   std::vector<std::unique_ptr<Job>> jobs = all_jobs(argv[2]);
219
220   // Let's register them
221   for (auto const& job : jobs)
222     SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size);
223
224   SMPI_init();
225
226   // Read noise arguments
227   int initial_noise = std::stoi(argv[3]);
228   xbt_assert(initial_noise >= 0, "Invalid initial_noise argument");
229
230   noise_between_jobs = std::stoi(argv[4]);
231   xbt_assert(noise_between_jobs >= 0, "Invalid noise_between_jobs argument");
232
233   if (initial_noise > 0) {
234     XBT_DEBUG("Popping %d noise processes", initial_noise);
235     pop_some_processes(initial_noise, hosts[0]);
236   }
237
238   // Let's execute the workload
239   simgrid::s4u::Actor::create("workload", hosts[0], workload_executor_process, &jobs);
240
241   e.run();
242   XBT_INFO("Simulation finished! Final time: %g", simgrid::s4u::Engine::get_clock());
243
244   SMPI_finalize();
245
246   return 0;
247 }