X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/644280e5101021d867bc00b5b9699416f7d5bc43..cbd8dae6d524fc62b0fb5fcf7b2604e98f953615:/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp diff --git a/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp b/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp index 9aeb8690ae..4ccbb8aa7a 100644 --- a/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp +++ b/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp @@ -16,7 +16,7 @@ 1. Sleep until job's starting time is reached (if needed) 2. Launch the replay of the corresponding time-indepent trace. 3. Create inter-process noise, by spawning useless actors. - 4. Wait for completion (implicitly, via MSG_main's return) + 4. Wait for completion (via s4u::Engine's run method) */ #include @@ -27,7 +27,6 @@ #include -#include #include #include #include @@ -45,8 +44,8 @@ struct Job { }; // ugly globals to avoid creating structures for giving args to processes -std::vector hosts; -int noise_between_jobs; +static std::vector hosts; +static int noise_between_jobs; static bool job_comparator(const Job* j1, const Job* j2) { @@ -55,31 +54,14 @@ static bool job_comparator(const Job* j1, const Job* j2) return j1->starting_time < j2->starting_time; } -struct s_smpi_replay_process_args { - Job* job; - msg_sem_t semaphore; - int rank; -}; - -static int smpi_replay_process(int argc, char* argv[]) +static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank) { - s_smpi_replay_process_args* args = static_cast(MSG_process_get_data(MSG_process_self())); - - if (args->semaphore != nullptr) - MSG_sem_acquire(args->semaphore); - - XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number, - args->job->smpi_app_name.c_str()); - - smpi_replay_run(&argc, &argv); - XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number, - args->job->smpi_app_name.c_str()); + XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str()); + smpi_replay_run(job->smpi_app_name.c_str(), rank, 0, job->traces_filenames[rank].c_str()); + XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, + job->smpi_app_name.c_str()); - if (args->semaphore != nullptr) - MSG_sem_release(args->semaphore); - - delete args; - return 0; + barrier->wait(); } // Sleeps for a given amount of time @@ -105,33 +87,19 @@ static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host) static int job_executor_process(Job* job) { - msg_sem_t job_semaphore = MSG_sem_init(1); XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); + simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1); + for (int i = 0; i < job->app_size; ++i) { - char** argv = xbt_new(char*, 5); - argv[0] = xbt_strdup("1"); // log only? - argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance - argv[2] = bprintf("%d", i); // rank - argv[3] = xbt_strdup(job->traces_filenames[i].c_str()); // smpi trace file for this rank - argv[4] = xbt_strdup("0"); // ? - - s_smpi_replay_process_args* args = new s_smpi_replay_process_args; - args->job = job; - args->semaphore = nullptr; - args->rank = i; - - if (i == 0) - args->semaphore = job_semaphore; - - char* str_pname = bprintf("%d_%d", job->unique_job_number, i); - MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv); + char* str_pname = bprintf("rank_%d_%d", job->unique_job_number, i); + simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i); xbt_free(str_pname); } - MSG_sem_acquire(job_semaphore); - MSG_sem_destroy(job_semaphore); + barrier->wait(); + simgrid::s4u::this_actor::sleep_for(1); XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); return 0; @@ -158,9 +126,10 @@ static int workload_executor_process(std::vector* workload) } // Let's finally run the job executor - std::string job_process_name = "job_" + job->smpi_app_name; + char* str_pname = bprintf("job_%04d", job->unique_job_number); XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); - simgrid::s4u::Actor::create(job_process_name.c_str(), hosts[job->allocation[0]], job_executor_process, job); + simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[0]], job_executor_process, job); + xbt_free(str_pname); } return 0; @@ -187,43 +156,42 @@ static std::vector all_jobs(const std::string& workload_file) std::istringstream is(line); if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) { try { - - Job* job = new Job; - job->smpi_app_name = app_name; - job->filename = dir + "/" + filename_unprefixed; - job->app_size = app_size; - job->starting_time = starting_time; + Job job; + job.smpi_app_name = app_name; + job.filename = dir + "/" + filename_unprefixed; + job.app_size = app_size; + job.starting_time = starting_time; std::vector subparts; boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on); - if ((int)subparts.size() != job->app_size) + if ((int)subparts.size() != job.app_size) throw std::invalid_argument("size/alloc inconsistency"); - job->allocation.resize(subparts.size()); + job.allocation.resize(subparts.size()); for (unsigned int i = 0; i < subparts.size(); ++i) - job->allocation[i] = stoi(subparts[i]); + job.allocation[i] = stoi(subparts[i]); // Let's read the filename - std::ifstream traces_file(job->filename); + std::ifstream traces_file(job.filename); if (!traces_file.is_open()) - throw std::invalid_argument("Cannot open file " + job->filename); + throw std::invalid_argument("Cannot open file " + job.filename); std::string traces_line; while (std::getline(traces_file, traces_line)) { boost::trim_right(traces_line); - job->traces_filenames.push_back(dir + "/" + traces_line); + job.traces_filenames.push_back(dir + "/" + traces_line); } - if (static_cast(job->traces_filenames.size()) < job->app_size) + if (static_cast(job.traces_filenames.size()) < job.app_size) throw std::invalid_argument("size/tracefiles inconsistency"); - job->traces_filenames.resize(job->app_size); + job.traces_filenames.resize(job.app_size); XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, " "alloc='%s'", - job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time, + job.smpi_app_name.c_str(), filename_unprefixed.c_str(), job.app_size, job.starting_time, alloc.c_str()); - jobs.push_back(job); + jobs.push_back(new Job(std::move(job))); } catch (const std::invalid_argument& e) { xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what()); } @@ -248,7 +216,6 @@ int main(int argc, char* argv[]) argv[0], argv[0]); // Simulation setting - MSG_init(&argc, argv); simgrid::s4u::Engine e(&argc, argv); e.load_platform(argv[1]); hosts = e.get_all_hosts(); @@ -259,7 +226,7 @@ int main(int argc, char* argv[]) // Let's register them for (const Job* job : jobs) - SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size); + SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size); SMPI_init(); @@ -276,7 +243,7 @@ int main(int argc, char* argv[]) } // Let's execute the workload - simgrid::s4u::Actor::create("workload_executor", hosts[0], workload_executor_process, &jobs); + simgrid::s4u::Actor::create("workload", hosts[0], workload_executor_process, &jobs); e.run(); XBT_INFO("Simulation finished! Final time: %g", e.get_clock());