1 /* Copyright (c) 2009-2018. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 /* This example shows how to replay SMPI time-independent traces in a dynamic
8 fashion. It is inspired from Batsim (https://github.com/oar-team/batsim).
10 The program workflow can be summarized as:
11 1. Read an input workload (set of jobs).
12 Each job is a time-independent trace and a starting time.
13 2. Create initial noise, by spawning useless actors.
14 This is done to avoid SMPI actors to start at actor_id=0.
16 1. Sleep until job's starting time is reached (if needed)
17 2. Launch the replay of the corresponding time-indepent trace.
18 3. Create inter-process noise, by spawning useless actors.
19 4. Wait for completion (via s4u::Engine's run method)
28 #include <boost/algorithm/string.hpp>
30 #include <simgrid/msg.h>
31 #include <simgrid/s4u.hpp>
32 #include <smpi/smpi.h>
33 #include <xbt/file.hpp>
35 XBT_LOG_NEW_DEFAULT_CATEGORY(replay_multiple_manual, "Messages specific for this example");
38 std::string smpi_app_name; //!< The unique name of the SMPI application
39 std::string filename; //!< The filename of the main trace file (which contains other filenames for each rank)
40 int app_size; //!< The number of processes (actors) of the job
41 int starting_time; //!< When the job should start
42 std::vector<int> allocation; //!< Where the job should be executed. Values are hosts indexes.
43 std::vector<std::string> traces_filenames; //!< The filenames of the different action files. Read from filename.
44 int unique_job_number; //!< The job unique number in [0, n[.
47 // ugly globals to avoid creating structures for giving args to processes
48 std::vector<simgrid::s4u::Host*> hosts;
49 int noise_between_jobs;
51 static bool job_comparator(const Job* j1, const Job* j2)
53 if (j1->starting_time == j2->starting_time)
54 return j1->smpi_app_name < j2->smpi_app_name;
55 return j1->starting_time < j2->starting_time;
58 struct s_smpi_replay_process_args {
60 simgrid::s4u::BarrierPtr barrier;
64 static int smpi_replay_process(int argc, char* argv[])
66 s_smpi_replay_process_args* args = static_cast<s_smpi_replay_process_args*>(MSG_process_get_data(MSG_process_self()));
68 XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
69 args->job->smpi_app_name.c_str());
71 smpi_replay_run(&argc, &argv);
72 XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
73 args->job->smpi_app_name.c_str());
75 args->barrier->wait();
81 // Sleeps for a given amount of time
82 static int sleeper_process(int* param)
84 XBT_DEBUG("Sleeping for %d seconds", *param);
85 simgrid::s4u::this_actor::sleep_for(*param);
92 // Launches some sleeper processes
93 static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host)
95 for (int i = 0; i < nb_processes; ++i) {
98 simgrid::s4u::Actor::create("meh", host, sleeper_process, param);
102 static int job_executor_process(Job* job)
104 XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
106 simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1);
108 for (int i = 0; i < job->app_size; ++i) {
109 char** argv = xbt_new(char*, 5);
110 argv[0] = xbt_strdup("1"); // log only?
111 argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance
112 argv[2] = bprintf("%d", i); // rank
113 argv[3] = xbt_strdup(job->traces_filenames[i].c_str()); // smpi trace file for this rank
114 argv[4] = xbt_strdup("0"); // ?
116 s_smpi_replay_process_args* args = new s_smpi_replay_process_args;
118 args->barrier = barrier;
121 char* str_pname = bprintf("%d_%d", job->unique_job_number, i);
122 MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv);
128 XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
133 // Executes a workload of SMPI processes
134 static int workload_executor_process(std::vector<Job*>* workload)
136 for (Job* job : *workload) {
137 // Let's wait until the job's waiting time if needed
138 double curr_time = simgrid::s4u::Engine::get_clock();
139 if (job->starting_time > curr_time) {
140 double time_to_sleep = (double)job->starting_time - curr_time;
141 XBT_INFO("Sleeping %g seconds (waiting for job %d, app '%s')", time_to_sleep, job->starting_time,
142 job->smpi_app_name.c_str());
143 simgrid::s4u::this_actor::sleep_for(time_to_sleep);
146 if (noise_between_jobs > 0) {
147 // Let's add some process noise
148 XBT_DEBUG("Popping %d noise processes before running job %d (app '%s')", noise_between_jobs,
149 job->unique_job_number, job->smpi_app_name.c_str());
150 pop_some_processes(noise_between_jobs, hosts[job->allocation[0]]);
153 // Let's finally run the job executor
154 std::string job_process_name = "job_" + job->smpi_app_name;
155 XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
156 simgrid::s4u::Actor::create(job_process_name.c_str(), hosts[job->allocation[0]], job_executor_process, job);
162 // Reads jobs from a workload file and returns them
163 static std::vector<Job*> all_jobs(const std::string& workload_file)
165 std::ifstream f(workload_file);
166 xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str());
167 std::vector<Job*> jobs;
169 simgrid::xbt::Path path(workload_file);
170 std::string dir = path.get_dir_name();
173 while (std::getline(f, line)) {
174 std::string app_name;
175 std::string filename_unprefixed;
180 std::istringstream is(line);
181 if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) {
184 job.smpi_app_name = app_name;
185 job.filename = dir + "/" + filename_unprefixed;
186 job.app_size = app_size;
187 job.starting_time = starting_time;
189 std::vector<std::string> subparts;
190 boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on);
192 if ((int)subparts.size() != job.app_size)
193 throw std::invalid_argument("size/alloc inconsistency");
195 job.allocation.resize(subparts.size());
196 for (unsigned int i = 0; i < subparts.size(); ++i)
197 job.allocation[i] = stoi(subparts[i]);
199 // Let's read the filename
200 std::ifstream traces_file(job.filename);
201 if (!traces_file.is_open())
202 throw std::invalid_argument("Cannot open file " + job.filename);
204 std::string traces_line;
205 while (std::getline(traces_file, traces_line)) {
206 boost::trim_right(traces_line);
207 job.traces_filenames.push_back(dir + "/" + traces_line);
210 if (static_cast<int>(job.traces_filenames.size()) < job.app_size)
211 throw std::invalid_argument("size/tracefiles inconsistency");
212 job.traces_filenames.resize(job.app_size);
214 XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, "
216 job.smpi_app_name.c_str(), filename_unprefixed.c_str(), job.app_size, job.starting_time,
218 jobs.push_back(new Job(std::move(job)));
219 } catch (const std::invalid_argument& e) {
220 xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
225 // Jobs are sorted by ascending date, then by lexicographical order of their
227 sort(jobs.begin(), jobs.end(), job_comparator);
229 for (unsigned int i = 0; i < jobs.size(); ++i)
230 jobs[i]->unique_job_number = i;
235 int main(int argc, char* argv[])
238 "Usage: %s platform_file workload_file initial_noise noise_between_jobs\n"
239 "\tExample: %s platform.xml workload_compute\n",
242 // Simulation setting
243 MSG_init(&argc, argv);
244 simgrid::s4u::Engine e(&argc, argv);
245 e.load_platform(argv[1]);
246 hosts = e.get_all_hosts();
247 xbt_assert(hosts.size() >= 4, "The given platform should contain at least 4 hosts (found %zu).", hosts.size());
249 // Let's retrieve all SMPI jobs
250 std::vector<Job*> jobs = all_jobs(argv[2]);
252 // Let's register them
253 for (const Job* job : jobs)
254 SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size);
258 // Read noise arguments
259 int initial_noise = std::stoi(argv[3]);
260 xbt_assert(initial_noise >= 0, "Invalid initial_noise argument");
262 noise_between_jobs = std::stoi(argv[4]);
263 xbt_assert(noise_between_jobs >= 0, "Invalid noise_between_jobs argument");
265 if (initial_noise > 0) {
266 XBT_DEBUG("Popping %d noise processes", initial_noise);
267 pop_some_processes(initial_noise, hosts[0]);
270 // Let's execute the workload
271 simgrid::s4u::Actor::create("workload_executor", hosts[0], workload_executor_process, &jobs);
274 XBT_INFO("Simulation finished! Final time: %g", e.get_clock());
278 for (const Job* job : jobs)