X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/0eded6d7feab3322a1734444599475a24707367b..359e5754327037ea72945a6df353124b25562266:/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp diff --git a/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp b/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp index c218cbe565..7707303610 100644 --- a/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp +++ b/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2009-2018. The SimGrid Team. +/* Copyright (c) 2009-2023. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it @@ -14,20 +14,20 @@ This is done to avoid SMPI actors to start at actor_id=0. 3. For each job: 1. Sleep until job's starting time is reached (if needed) - 2. Launch the replay of the corresponding time-indepent trace. + 2. Launch the replay of the corresponding time-independent trace. 3. Create inter-process noise, by spawning useless actors. - 4. Wait for completion (implicitly, via MSG_main's return) + 4. Wait for completion (via s4u::Engine's run method) */ #include #include +#include #include #include #include #include -#include #include #include #include @@ -44,52 +44,21 @@ struct Job { int unique_job_number; //!< The job unique number in [0, n[. }; -// ugly globals to avoid creating structures for giving args to processes -std::vector hosts; -int noise_between_jobs; - -static bool job_comparator(const Job* j1, const Job* j2) +static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank) { - if (j1->starting_time == j2->starting_time) - return j1->smpi_app_name < j2->smpi_app_name; - return j1->starting_time < j2->starting_time; -} - -struct s_smpi_replay_process_args { - Job* job; - msg_sem_t semaphore; - int rank; -}; - -static int smpi_replay_process(int argc, char* argv[]) -{ - s_smpi_replay_process_args* args = static_cast(MSG_process_get_data(MSG_process_self())); - - if (args->semaphore != nullptr) - MSG_sem_acquire(args->semaphore); - - XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number, - args->job->smpi_app_name.c_str()); - - smpi_replay_run(&argc, &argv); - XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number, - args->job->smpi_app_name.c_str()); + XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str()); + smpi_replay_run(job->smpi_app_name.c_str(), rank, 0, job->traces_filenames[rank].c_str()); + XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, + job->smpi_app_name.c_str()); - if (args->semaphore != nullptr) - MSG_sem_release(args->semaphore); - - delete args; - return 0; + barrier->wait(); } // Sleeps for a given amount of time -static int sleeper_process(int* param) +static int sleeper_process(int param) { - XBT_DEBUG("Sleeping for %d seconds", *param); - simgrid::s4u::this_actor::sleep_for(*param); - - delete param; - + XBT_DEBUG("Sleeping for %d seconds", param); + simgrid::s4u::this_actor::sleep_for(param); return 0; } @@ -97,53 +66,38 @@ static int sleeper_process(int* param) static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host) { for (int i = 0; i < nb_processes; ++i) { - int* param = new int; - *param = i + 1; + int param = i + 1; simgrid::s4u::Actor::create("meh", host, sleeper_process, param); } } -static int job_executor_process(Job* job) +static int job_executor_process(const std::vector& hosts, Job* job) { - msg_sem_t job_semaphore = MSG_sem_init(1); XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); + simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1); + for (int i = 0; i < job->app_size; ++i) { - char** argv = xbt_new(char*, 5); - argv[0] = xbt_strdup("1"); // log only? - argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance - argv[2] = bprintf("%d", i); // rank - argv[3] = xbt_strdup(job->traces_filenames[i].c_str()); // smpi trace file for this rank - argv[4] = xbt_strdup("0"); // ? - - s_smpi_replay_process_args* args = new s_smpi_replay_process_args; - args->job = job; - args->semaphore = nullptr; - args->rank = i; - - if (i == 0) - args->semaphore = job_semaphore; - - char* str_pname = bprintf("%d_%d", job->unique_job_number, i); - MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv); + char* str_pname = bprintf("rank_%d_%d", job->unique_job_number, i); + simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i); xbt_free(str_pname); } - MSG_sem_acquire(job_semaphore); - MSG_sem_destroy(job_semaphore); + barrier->wait(); + simgrid::s4u::this_actor::sleep_for(1); XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); return 0; } // Executes a workload of SMPI processes -static int workload_executor_process(std::vector* workload) +static int workload_executor_process(const std::vector& hosts, + const std::vector>& workload, int noise_between_jobs) { - for (Job* job : *workload) { + for (auto const& job : workload) { // Let's wait until the job's waiting time if needed - double curr_time = simgrid::s4u::Engine::get_clock(); - if (job->starting_time > curr_time) { + if (double curr_time = simgrid::s4u::Engine::get_clock(); job->starting_time > curr_time) { double time_to_sleep = (double)job->starting_time - curr_time; XBT_INFO("Sleeping %g seconds (waiting for job %d, app '%s')", time_to_sleep, job->starting_time, job->smpi_app_name.c_str()); @@ -158,20 +112,22 @@ static int workload_executor_process(std::vector* workload) } // Let's finally run the job executor - std::string job_process_name = "job_" + job->smpi_app_name; + char* str_pname = bprintf("job_%04d", job->unique_job_number); XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); - simgrid::s4u::Actor::create(job_process_name.c_str(), hosts[job->allocation[0]], job_executor_process, job); + simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[0]], job_executor_process, std::cref(hosts), + job.get()); + xbt_free(str_pname); } return 0; } // Reads jobs from a workload file and returns them -static std::vector all_jobs(const std::string& workload_file) +static std::vector> all_jobs(const std::string& workload_file) { std::ifstream f(workload_file); xbt_assert(f.is_open(), "Cannot open file '%s'.", workload_file.c_str()); - std::vector jobs; + std::vector> jobs; simgrid::xbt::Path path(workload_file); std::string dir = path.get_dir_name(); @@ -187,42 +143,42 @@ static std::vector all_jobs(const std::string& workload_file) std::istringstream is(line); if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) { try { - Job job; - job.smpi_app_name = app_name; - job.filename = dir + "/" + filename_unprefixed; - job.app_size = app_size; - job.starting_time = starting_time; + auto job = std::make_unique(); + job->smpi_app_name = app_name; + job->filename = dir + "/" + filename_unprefixed; + job->app_size = app_size; + job->starting_time = starting_time; std::vector subparts; boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on); - if ((int)subparts.size() != job.app_size) + if ((int)subparts.size() != job->app_size) throw std::invalid_argument("size/alloc inconsistency"); - job.allocation.resize(subparts.size()); + job->allocation.resize(subparts.size()); for (unsigned int i = 0; i < subparts.size(); ++i) - job.allocation[i] = stoi(subparts[i]); + job->allocation[i] = stoi(subparts[i]); // Let's read the filename - std::ifstream traces_file(job.filename); - if (!traces_file.is_open()) - throw std::invalid_argument("Cannot open file " + job.filename); + std::ifstream traces_file(job->filename); + if (not traces_file.is_open()) + throw std::invalid_argument("Cannot open file " + job->filename); std::string traces_line; while (std::getline(traces_file, traces_line)) { boost::trim_right(traces_line); - job.traces_filenames.push_back(dir + "/" + traces_line); + job->traces_filenames.push_back(dir + "/" + traces_line); } - if (static_cast(job.traces_filenames.size()) < job.app_size) + if (static_cast(job->traces_filenames.size()) < job->app_size) throw std::invalid_argument("size/tracefiles inconsistency"); - job.traces_filenames.resize(job.app_size); + job->traces_filenames.resize(job->app_size); XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, " "alloc='%s'", - job.smpi_app_name.c_str(), filename_unprefixed.c_str(), job.app_size, job.starting_time, + job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time, alloc.c_str()); - jobs.push_back(new Job(std::move(job))); + jobs.emplace_back(std::move(job)); } catch (const std::invalid_argument& e) { xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what()); } @@ -231,8 +187,11 @@ static std::vector all_jobs(const std::string& workload_file) // Jobs are sorted by ascending date, then by lexicographical order of their // application names - sort(jobs.begin(), jobs.end(), job_comparator); - + sort(jobs.begin(), jobs.end(), [](auto const& j1, auto const& j2) { + if (j1->starting_time == j2->starting_time) + return j1->smpi_app_name < j2->smpi_app_name; + return j1->starting_time < j2->starting_time; + }); for (unsigned int i = 0; i < jobs.size(); ++i) jobs[i]->unique_job_number = i; @@ -247,18 +206,17 @@ int main(int argc, char* argv[]) argv[0], argv[0]); // Simulation setting - MSG_init(&argc, argv); simgrid::s4u::Engine e(&argc, argv); e.load_platform(argv[1]); - hosts = e.get_all_hosts(); + const auto hosts = e.get_all_hosts(); xbt_assert(hosts.size() >= 4, "The given platform should contain at least 4 hosts (found %zu).", hosts.size()); // Let's retrieve all SMPI jobs - std::vector jobs = all_jobs(argv[2]); + std::vector> jobs = all_jobs(argv[2]); // Let's register them - for (const Job* job : jobs) - SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size); + for (auto const& job : jobs) + SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size); SMPI_init(); @@ -266,7 +224,7 @@ int main(int argc, char* argv[]) int initial_noise = std::stoi(argv[3]); xbt_assert(initial_noise >= 0, "Invalid initial_noise argument"); - noise_between_jobs = std::stoi(argv[4]); + int noise_between_jobs = std::stoi(argv[4]); xbt_assert(noise_between_jobs >= 0, "Invalid noise_between_jobs argument"); if (initial_noise > 0) { @@ -275,15 +233,13 @@ int main(int argc, char* argv[]) } // Let's execute the workload - simgrid::s4u::Actor::create("workload_executor", hosts[0], workload_executor_process, &jobs); + simgrid::s4u::Actor::create("workload", hosts[0], workload_executor_process, std::cref(hosts), std::cref(jobs), + noise_between_jobs); e.run(); - XBT_INFO("Simulation finished! Final time: %g", e.get_clock()); + XBT_INFO("Simulation finished! Final time: %g", simgrid::s4u::Engine::get_clock()); SMPI_finalize(); - for (const Job* job : jobs) - delete job; - return 0; }