Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[examples] smpi-replay-mmd: hack--, memleak++
[simgrid.git] / examples / smpi / replay_multiple_manual_deploy / replay_multiple_manual.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /* This example shows how to replay SMPI time-independent traces in a dynamic
8    fashion. It is inspired from Batsim (https://github.com/oar-team/batsim).
9
10    The program workflow can be summarized as:
11    1. Read an input workload (set of jobs).
12       Each job is a time-independent trace and a starting time.
13    2. Create initial noise, by spawning useless actors.
14       This is done to avoid SMPI actors to start at actor_id=0.
15    3. For each job:
16         1. Sleep until job's starting time is reached (if needed)
17         2. Launch the replay of the corresponding time-indepent trace.
18         3. Create inter-process noise, by spawning useless actors.
19    4. Wait for completion (via s4u::Engine's run method)
20 */
21
22 #include <algorithm>
23 #include <fstream>
24 #include <sstream>
25 #include <stdexcept>
26 #include <vector>
27
28 #include <boost/algorithm/string.hpp>
29
30 #include <simgrid/s4u.hpp>
31 #include <smpi/smpi.h>
32 #include <xbt/file.hpp>
33
34 XBT_LOG_NEW_DEFAULT_CATEGORY(replay_multiple_manual, "Messages specific for this example");
35
36 struct Job {
37   std::string smpi_app_name;   //!< The unique name of the SMPI application
38   std::string filename;        //!<  The filename of the main trace file (which contains other filenames for each rank)
39   int app_size;                //!< The number of processes (actors) of the job
40   int starting_time;           //!< When the job should start
41   std::vector<int> allocation; //!< Where the job should be executed. Values are hosts indexes.
42   std::vector<std::string> traces_filenames; //!< The filenames of the different action files. Read from filename.
43   int unique_job_number;                     //!< The job unique number in [0, n[.
44 };
45
46 // ugly globals to avoid creating structures for giving args to processes
47 static std::vector<simgrid::s4u::Host*> hosts;
48 static int noise_between_jobs;
49
50 static bool job_comparator(const Job* j1, const Job* j2)
51 {
52   if (j1->starting_time == j2->starting_time)
53     return j1->smpi_app_name < j2->smpi_app_name;
54   return j1->starting_time < j2->starting_time;
55 }
56
57 static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank)
58 {
59   // Prepare data for smpi_replay_run
60   int argc    = 5;
61   char** argv = xbt_new(char*, argc);
62   argv[0]     = xbt_strdup("1");                                 // log only?
63   argv[1]     = xbt_strdup(job->smpi_app_name.c_str());          // application instance
64   argv[2]     = bprintf("%d", rank);                             // rank
65   argv[3]     = xbt_strdup(job->traces_filenames[rank].c_str()); // smpi trace file for this rank
66   argv[4]     = xbt_strdup("0");                                 // ?
67
68   XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str());
69   smpi_replay_run(&argc, &argv);
70   XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number,
71            job->smpi_app_name.c_str());
72
73   barrier->wait();
74
75   // Memory clean-up — leaks can come from argc/argv modifications from smpi_replay_run
76   for (int i = 0; i < argc; ++i)
77     xbt_free(argv[i]);
78   xbt_free(argv);
79 }
80
81 // Sleeps for a given amount of time
82 static int sleeper_process(int* param)
83 {
84   XBT_DEBUG("Sleeping for %d seconds", *param);
85   simgrid::s4u::this_actor::sleep_for(*param);
86
87   delete param;
88
89   return 0;
90 }
91
92 // Launches some sleeper processes
93 static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host)
94 {
95   for (int i = 0; i < nb_processes; ++i) {
96     int* param = new int;
97     *param     = i + 1;
98     simgrid::s4u::Actor::create("meh", host, sleeper_process, param);
99   }
100 }
101
102 static int job_executor_process(Job* job)
103 {
104   XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
105
106   simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1);
107
108   for (int i = 0; i < job->app_size; ++i) {
109     char* str_pname = bprintf("%d_%d", job->unique_job_number, i);
110     simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i);
111     xbt_free(str_pname);
112   }
113
114   barrier->wait();
115
116   XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
117
118   return 0;
119 }
120
121 // Executes a workload of SMPI processes
122 static int workload_executor_process(std::vector<Job*>* workload)
123 {
124   for (Job* job : *workload) {
125     // Let's wait until the job's waiting time if needed
126     double curr_time = simgrid::s4u::Engine::get_clock();
127     if (job->starting_time > curr_time) {
128       double time_to_sleep = (double)job->starting_time - curr_time;
129       XBT_INFO("Sleeping %g seconds (waiting for job %d, app '%s')", time_to_sleep, job->starting_time,
130                job->smpi_app_name.c_str());
131       simgrid::s4u::this_actor::sleep_for(time_to_sleep);
132     }
133
134     if (noise_between_jobs > 0) {
135       // Let's add some process noise
136       XBT_DEBUG("Popping %d noise processes before running job %d (app '%s')", noise_between_jobs,
137                 job->unique_job_number, job->smpi_app_name.c_str());
138       pop_some_processes(noise_between_jobs, hosts[job->allocation[0]]);
139     }
140
141     // Let's finally run the job executor
142     std::string job_process_name = "job_" + job->smpi_app_name;
143     XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
144     simgrid::s4u::Actor::create(job_process_name.c_str(), hosts[job->allocation[0]], job_executor_process, job);
145   }
146
147   return 0;
148 }
149
150 // Reads jobs from a workload file and returns them
151 static std::vector<Job*> all_jobs(const std::string& workload_file)
152 {
153   std::ifstream f(workload_file);
154   xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str());
155   std::vector<Job*> jobs;
156
157   simgrid::xbt::Path path(workload_file);
158   std::string dir = path.get_dir_name();
159
160   std::string line;
161   while (std::getline(f, line)) {
162     std::string app_name;
163     std::string filename_unprefixed;
164     int app_size;
165     int starting_time;
166     std::string alloc;
167
168     std::istringstream is(line);
169     if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) {
170       try {
171         Job job;
172         job.smpi_app_name = app_name;
173         job.filename      = dir + "/" + filename_unprefixed;
174         job.app_size      = app_size;
175         job.starting_time = starting_time;
176
177         std::vector<std::string> subparts;
178         boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on);
179
180         if ((int)subparts.size() != job.app_size)
181           throw std::invalid_argument("size/alloc inconsistency");
182
183         job.allocation.resize(subparts.size());
184         for (unsigned int i = 0; i < subparts.size(); ++i)
185           job.allocation[i] = stoi(subparts[i]);
186
187         // Let's read the filename
188         std::ifstream traces_file(job.filename);
189         if (!traces_file.is_open())
190           throw std::invalid_argument("Cannot open file " + job.filename);
191
192         std::string traces_line;
193         while (std::getline(traces_file, traces_line)) {
194           boost::trim_right(traces_line);
195           job.traces_filenames.push_back(dir + "/" + traces_line);
196         }
197
198         if (static_cast<int>(job.traces_filenames.size()) < job.app_size)
199           throw std::invalid_argument("size/tracefiles inconsistency");
200         job.traces_filenames.resize(job.app_size);
201
202         XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, "
203                  "alloc='%s'",
204                  job.smpi_app_name.c_str(), filename_unprefixed.c_str(), job.app_size, job.starting_time,
205                  alloc.c_str());
206         jobs.push_back(new Job(std::move(job)));
207       } catch (const std::invalid_argument& e) {
208         xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
209       }
210     }
211   }
212
213   // Jobs are sorted by ascending date, then by lexicographical order of their
214   // application names
215   sort(jobs.begin(), jobs.end(), job_comparator);
216
217   for (unsigned int i = 0; i < jobs.size(); ++i)
218     jobs[i]->unique_job_number = i;
219
220   return jobs;
221 }
222
223 int main(int argc, char* argv[])
224 {
225   xbt_assert(argc > 4,
226              "Usage: %s platform_file workload_file initial_noise noise_between_jobs\n"
227              "\tExample: %s platform.xml workload_compute\n",
228              argv[0], argv[0]);
229
230   //  Simulation setting
231   simgrid::s4u::Engine e(&argc, argv);
232   e.load_platform(argv[1]);
233   hosts = e.get_all_hosts();
234   xbt_assert(hosts.size() >= 4, "The given platform should contain at least 4 hosts (found %zu).", hosts.size());
235
236   // Let's retrieve all SMPI jobs
237   std::vector<Job*> jobs = all_jobs(argv[2]);
238
239   // Let's register them
240   for (const Job* job : jobs)
241     SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size);
242
243   SMPI_init();
244
245   // Read noise arguments
246   int initial_noise = std::stoi(argv[3]);
247   xbt_assert(initial_noise >= 0, "Invalid initial_noise argument");
248
249   noise_between_jobs = std::stoi(argv[4]);
250   xbt_assert(noise_between_jobs >= 0, "Invalid noise_between_jobs argument");
251
252   if (initial_noise > 0) {
253     XBT_DEBUG("Popping %d noise processes", initial_noise);
254     pop_some_processes(initial_noise, hosts[0]);
255   }
256
257   // Let's execute the workload
258   simgrid::s4u::Actor::create("workload_executor", hosts[0], workload_executor_process, &jobs);
259
260   e.run();
261   XBT_INFO("Simulation finished! Final time: %g", e.get_clock());
262
263   SMPI_finalize();
264
265   for (const Job* job : jobs)
266     delete job;
267
268   return 0;
269 }