1. Sleep until job's starting time is reached (if needed)
2. Launch the replay of the corresponding time-indepent trace.
3. Create inter-process noise, by spawning useless actors.
- 4. Wait for completion (implicitly, via MSG_main's return)
+ 4. Wait for completion (via s4u::Engine's run method)
*/
#include <algorithm>
#include <fstream>
-#include <iostream>
+#include <sstream>
#include <stdexcept>
#include <vector>
#include <boost/algorithm/string.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/regex.hpp>
-#include <simgrid/msg.h>
#include <simgrid/s4u.hpp>
#include <smpi/smpi.h>
+#include <xbt/file.hpp>
XBT_LOG_NEW_DEFAULT_CATEGORY(replay_multiple_manual, "Messages specific for this example");
};
// ugly globals to avoid creating structures for giving args to processes
-std::vector<simgrid::s4u::Host*> hosts;
-int noise_between_jobs;
+static std::vector<simgrid::s4u::Host*> hosts;
+static int noise_between_jobs;
static bool job_comparator(const Job* j1, const Job* j2)
{
return j1->starting_time < j2->starting_time;
}
-struct s_smpi_replay_process_args {
- Job* job;
- msg_sem_t semaphore;
- int rank;
-};
-
-static int smpi_replay_process(int argc, char* argv[])
+static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank)
{
- s_smpi_replay_process_args* args = static_cast<s_smpi_replay_process_args*>(MSG_process_get_data(MSG_process_self()));
-
- if (args->semaphore != nullptr)
- MSG_sem_acquire(args->semaphore);
-
- XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
- args->job->smpi_app_name.c_str());
-
+ // Prepare data for smpi_replay_run
+ int argc = 5;
+ char** argv = xbt_new(char*, argc);
+ argv[0] = xbt_strdup("1"); // log only?
+ argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance
+ argv[2] = bprintf("%d", rank); // rank
+ argv[3] = xbt_strdup(job->traces_filenames[rank].c_str()); // smpi trace file for this rank
+ argv[4] = xbt_strdup("0"); // ?
+
+ XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str());
smpi_replay_run(&argc, &argv);
- XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
- args->job->smpi_app_name.c_str());
-
- if (args->semaphore != nullptr)
- MSG_sem_release(args->semaphore);
+ XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number,
+ job->smpi_app_name.c_str());
- delete args;
- return 0;
+ barrier->wait();
}
// Sleeps for a given amount of time
static int job_executor_process(Job* job)
{
- msg_sem_t job_semaphore = MSG_sem_init(1);
XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
- for (int i = 0; i < job->app_size; ++i) {
- char** argv = xbt_new(char*, 5);
- argv[0] = xbt_strdup("1"); // log only?
- argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance
- argv[2] = bprintf("%d", i); // rank
- argv[3] = xbt_strdup(job->traces_filenames[i].c_str()); // smpi trace file for this rank
- argv[4] = xbt_strdup("0"); // ?
-
- s_smpi_replay_process_args* args = new s_smpi_replay_process_args;
- args->job = job;
- args->semaphore = nullptr;
- args->rank = i;
-
- if (i == 0)
- args->semaphore = job_semaphore;
+ simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1);
+ for (int i = 0; i < job->app_size; ++i) {
char* str_pname = bprintf("%d_%d", job->unique_job_number, i);
- MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv);
+ simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i);
xbt_free(str_pname);
}
- MSG_sem_acquire(job_semaphore);
- MSG_sem_destroy(job_semaphore);
+ barrier->wait();
XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str());
std::vector<Job*> jobs;
- boost::filesystem::path path(workload_file);
- std::string dir = path.parent_path().native();
+ simgrid::xbt::Path path(workload_file);
+ std::string dir = path.get_dir_name();
- boost::regex r(R"(^\s*(\S+)\s+(\S+\.txt)\s+(\d+)\s+(\d+)\s+(\d+(?:,\d+)*).*$)");
std::string line;
while (std::getline(f, line)) {
- boost::smatch m;
-
- if (boost::regex_match(line, m, r)) {
+ std::string app_name;
+ std::string filename_unprefixed;
+ int app_size;
+ int starting_time;
+ std::string alloc;
+
+ std::istringstream is(line);
+ if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) {
try {
- Job* job = new Job;
- job->smpi_app_name = m[1];
- job->filename = dir + "/" + std::string(m[2]);
- job->app_size = stoi(m[3]);
- job->starting_time = stoi(m[4]);
- std::string alloc = m[5];
-
- std::string filename_unprefixed = m[2];
+ Job job;
+ job.smpi_app_name = app_name;
+ job.filename = dir + "/" + filename_unprefixed;
+ job.app_size = app_size;
+ job.starting_time = starting_time;
std::vector<std::string> subparts;
boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on);
- if ((int)subparts.size() != job->app_size)
- throw std::runtime_error("size/alloc inconsistency");
+ if ((int)subparts.size() != job.app_size)
+ throw std::invalid_argument("size/alloc inconsistency");
- job->allocation.resize(subparts.size());
+ job.allocation.resize(subparts.size());
for (unsigned int i = 0; i < subparts.size(); ++i)
- job->allocation[i] = stoi(subparts[i]);
+ job.allocation[i] = stoi(subparts[i]);
// Let's read the filename
- std::ifstream traces_file(job->filename);
+ std::ifstream traces_file(job.filename);
if (!traces_file.is_open())
- throw std::runtime_error("Cannot open file " + job->filename);
+ throw std::invalid_argument("Cannot open file " + job.filename);
std::string traces_line;
while (std::getline(traces_file, traces_line)) {
boost::trim_right(traces_line);
- job->traces_filenames.push_back(dir + "/" + traces_line);
+ job.traces_filenames.push_back(dir + "/" + traces_line);
}
- if (static_cast<int>(job->traces_filenames.size()) < job->app_size)
- throw std::runtime_error("size/tracefiles inconsistency");
- job->traces_filenames.resize(job->app_size);
+ if (static_cast<int>(job.traces_filenames.size()) < job.app_size)
+ throw std::invalid_argument("size/tracefiles inconsistency");
+ job.traces_filenames.resize(job.app_size);
XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, "
"alloc='%s'",
- job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time,
+ job.smpi_app_name.c_str(), filename_unprefixed.c_str(), job.app_size, job.starting_time,
alloc.c_str());
- jobs.push_back(job);
- } catch (const std::exception& e) {
- printf("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
+ jobs.push_back(new Job(std::move(job)));
+ } catch (const std::invalid_argument& e) {
+ xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what());
}
}
}
argv[0], argv[0]);
// Simulation setting
- MSG_init(&argc, argv);
simgrid::s4u::Engine e(&argc, argv);
e.load_platform(argv[1]);
hosts = e.get_all_hosts();
// Let's register them
for (const Job* job : jobs)
- SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size);
+ SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size);
SMPI_init();