Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix make dist.
[simgrid.git] / examples / smpi / replay_multiple_manual_deploy / replay_multiple_manual.cpp
1 /* Copyright (c) 2009-2018. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /* This example shows how to replay SMPI time-independent traces in a dynamic
8    fashion. It is inspired from Batsim (https://github.com/oar-team/batsim).
9
10    The program workflow can be summarized as:
11    1. Read an input workload (set of jobs).
12       Each job is a time-independent trace and a starting time.
13    2. Create initial noise, by spawning useless actors.
14       This is done to avoid SMPI actors to start at actor_id=0.
15    3. For each job:
16         1. Sleep until job's starting time is reached (if needed)
17         2. Launch the replay of the corresponding time-indepent trace.
18         3. Create inter-process noise, by spawning useless actors.
19    4. Wait for completion (implicitly, via MSG_main's return)
20 */
21
22 #include <algorithm>
23 #include <fstream>
24 #include <iostream>
25 #include <stdexcept>
26 #include <vector>
27
28 #include <boost/algorithm/string.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/regex.hpp>
31
32 #include <simgrid/msg.h>
33 #include <simgrid/s4u.hpp>
34 #include <smpi/smpi.h>
35
36 XBT_LOG_NEW_DEFAULT_CATEGORY(replay_multiple_manual, "Messages specific for this example");
37
38 struct Job {
39   std::string smpi_app_name;   //!< The unique name of the SMPI application
40   std::string filename;        //!<  The filename of the main trace file (which contains other filenames for each rank)
41   int app_size;                //!< The number of processes (actors) of the job
42   int starting_time;           //!< When the job should start
43   std::vector<int> allocation; //!< Where the job should be executed. Values are hosts indexes.
44   std::vector<std::string> traces_filenames; //!< The filenames of the different action files. Read from filename.
45   int unique_job_number;                     //!< The job unique number in [0, n[.
46 };
47
48 // ugly globals to avoid creating structures for giving args to processes
49 std::vector<simgrid::s4u::Host*> hosts;
50 int noise_between_jobs;
51
52 static bool job_comparator(const Job* j1, const Job* j2)
53 {
54   if (j1->starting_time == j2->starting_time)
55     return j1->smpi_app_name < j2->smpi_app_name;
56   return j1->starting_time < j2->starting_time;
57 }
58
59 struct s_smpi_replay_process_args {
60   Job* job;
61   msg_sem_t semaphore;
62   int rank;
63 };
64
65 static int smpi_replay_process(int argc, char* argv[])
66 {
67   s_smpi_replay_process_args* args = static_cast<s_smpi_replay_process_args*>(MSG_process_get_data(MSG_process_self()));
68
69   if (args->semaphore != nullptr)
70     MSG_sem_acquire(args->semaphore);
71
72   XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
73            args->job->smpi_app_name.c_str());
74
75   smpi_replay_run(&argc, &argv);
76   XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
77            args->job->smpi_app_name.c_str());
78
79   if (args->semaphore != nullptr)
80     MSG_sem_release(args->semaphore);
81
82   delete args;
83   return 0;
84 }
85
86 // Sleeps for a given amount of time
87 static int sleeper_process(int* param)
88 {
89   XBT_DEBUG("Sleeping for %d seconds", *param);
90   simgrid::s4u::this_actor::sleep_for(*param);
91
92   delete param;
93
94   return 0;
95 }
96
97 // Launches some sleeper processes
98 static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host)
99 {
100   for (int i = 0; i < nb_processes; ++i) {
101     int* param = new int;
102     *param     = i + 1;
103     simgrid::s4u::Actor::create("meh", host, sleeper_process, param);
104   }
105 }
106
107 static int job_executor_process(Job* job)
108 {
109   msg_sem_t job_semaphore = MSG_sem_init(1);
110   XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
111
112   for (int i = 0; i < job->app_size; ++i) {
113     char *str_instance_name, *str_rank, *str_pname, *str_tfname;
114     int err = asprintf(&str_instance_name, "%s", job->smpi_app_name.c_str());
115     xbt_assert(err != -1, "asprintf error");
116     err = asprintf(&str_rank, "%d", i);
117     xbt_assert(err != -1, "asprintf error");
118     err = asprintf(&str_pname, "%d_%d", job->unique_job_number, i);
119     xbt_assert(err != -1, "asprintf error");
120     err = asprintf(&str_tfname, "%s", job->traces_filenames[i].c_str());
121     xbt_assert(err != -1, "asprintf error");
122
123     char** argv = xbt_new(char*, 5);
124     argv[0]     = xbt_strdup("1");   // log only?
125     argv[1]     = str_instance_name; // application instance
126     argv[2]     = str_rank;          // rank
127     argv[3]     = str_tfname;        // smpi trace file for this rank
128     argv[4]     = xbt_strdup("0");   // ?
129
130     s_smpi_replay_process_args* args = new s_smpi_replay_process_args;
131     args->job                        = job;
132     args->semaphore                  = nullptr;
133     args->rank                       = i;
134
135     if (i == 0)
136       args->semaphore = job_semaphore;
137
138     MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv);
139     free(str_pname);
140   }
141
142   MSG_sem_acquire(job_semaphore);
143   MSG_sem_destroy(job_semaphore);
144
145   XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
146
147   return 0;
148 }
149
150 // Executes a workload of SMPI processes
151 static int workload_executor_process(std::vector<Job*>* workload)
152 {
153   for (Job* job : *workload) {
154     // Let's wait until the job's waiting time if needed
155     double curr_time = simgrid::s4u::Engine::get_clock();
156     if (job->starting_time > curr_time) {
157       double time_to_sleep = (double)job->starting_time - curr_time;
158       XBT_INFO("Sleeping %g seconds (waiting for job %d, app '%s')", time_to_sleep, job->starting_time,
159                job->smpi_app_name.c_str());
160       simgrid::s4u::this_actor::sleep_for(time_to_sleep);
161     }
162
163     if (noise_between_jobs > 0) {
164       // Let's add some process noise
165       XBT_DEBUG("Popping %d noise processes before running job %d (app '%s')", noise_between_jobs,
166                 job->unique_job_number, job->smpi_app_name.c_str());
167       pop_some_processes(noise_between_jobs, hosts[job->allocation[0]]);
168     }
169
170     // Let's finally run the job executor
171     std::string job_process_name = "job_" + job->smpi_app_name;
172     XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
173     simgrid::s4u::Actor::create(job_process_name.c_str(), hosts[job->allocation[0]], job_executor_process, job);
174   }
175
176   return 0;
177 }
178
179 // Reads jobs from a workload file and returns them
180 static std::vector<Job*> all_jobs(const std::string& workload_file)
181 {
182   std::ifstream f(workload_file);
183   xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str());
184   std::vector<Job*> jobs;
185
186   boost::filesystem::path path(workload_file);
187   std::string dir = path.parent_path().native();
188
189   boost::regex r(R"(^\s*(\S+)\s+(\S+\.txt)\s+(\d+)\s+(\d+)\s+(\d+(?:,\d+)*).*$)");
190   std::string line;
191   while (std::getline(f, line)) {
192     boost::smatch m;
193
194     if (boost::regex_match(line, m, r)) {
195       try {
196         Job* job           = new Job;
197         job->smpi_app_name = m[1];
198         job->filename      = dir + "/" + std::string(m[2]);
199         job->app_size      = stoi(m[3]);
200         job->starting_time = stoi(m[4]);
201         std::string alloc  = m[5];
202
203         std::string filename_unprefixed = m[2];
204
205         std::vector<std::string> subparts;
206         boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on);
207
208         if ((int)subparts.size() != job->app_size)
209           throw std::runtime_error("size/alloc inconsistency");
210
211         job->allocation.resize(subparts.size());
212         for (unsigned int i = 0; i < subparts.size(); ++i)
213           job->allocation[i] = stoi(subparts[i]);
214
215         // Let's read the filename
216         std::ifstream traces_file(job->filename);
217         if (!traces_file.is_open())
218           throw std::runtime_error("Cannot open file " + job->filename);
219
220         std::string traces_line;
221         while (std::getline(traces_file, traces_line)) {
222           boost::trim_right(traces_line);
223           job->traces_filenames.push_back(dir + "/" + traces_line);
224         }
225
226         if (static_cast<int>(job->traces_filenames.size()) < job->app_size)
227           throw std::runtime_error("size/tracefiles inconsistency");
228         job->traces_filenames.resize(job->app_size);
229
230         XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, "
231                  "alloc='%s'",
232                  job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time,
233                  alloc.c_str());
234         jobs.push_back(job);
235       } catch (const std::exception& e) {
236         printf("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
237       }
238     }
239   }
240
241   // Jobs are sorted by ascending date, then by lexicographical order of their
242   // application names
243   sort(jobs.begin(), jobs.end(), job_comparator);
244
245   for (unsigned int i = 0; i < jobs.size(); ++i)
246     jobs[i]->unique_job_number = i;
247
248   return jobs;
249 }
250
251 int main(int argc, char* argv[])
252 {
253   xbt_assert(argc > 4,
254              "Usage: %s platform_file workload_file initial_noise noise_between_jobs\n"
255              "\tExample: %s platform.xml workload_compute\n",
256              argv[0], argv[0]);
257
258   //  Simulation setting
259   MSG_init(&argc, argv);
260   simgrid::s4u::Engine e(&argc, argv);
261   e.load_platform(argv[1]);
262   hosts = e.get_all_hosts();
263   xbt_assert(hosts.size() >= 4, "The given platform should contain at least 4 hosts (found %zu).", hosts.size());
264
265   // Let's retrieve all SMPI jobs
266   std::vector<Job*> jobs = all_jobs(argv[2]);
267
268   // Let's register them
269   for (const Job* job : jobs)
270     SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size);
271
272   SMPI_init();
273
274   // Read noise arguments
275   int initial_noise = std::stoi(argv[3]);
276   xbt_assert(initial_noise >= 0, "Invalid initial_noise argument");
277
278   noise_between_jobs = std::stoi(argv[4]);
279   xbt_assert(noise_between_jobs >= 0, "Invalid noise_between_jobs argument");
280
281   if (initial_noise > 0) {
282     XBT_DEBUG("Popping %d noise processes", initial_noise);
283     pop_some_processes(initial_noise, hosts[0]);
284   }
285
286   // Let's execute the workload
287   simgrid::s4u::Actor::create("workload_executor", hosts[0], workload_executor_process, &jobs);
288
289   e.run();
290   XBT_INFO("Simulation finished! Final time: %g", e.get_clock());
291
292   SMPI_finalize();
293
294   for (const Job* job : jobs)
295     delete job;
296
297   return 0;
298 }