X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/5e508dc082191ebc594cf43bf8f7a18f8ecf2923..1fa3f778c3669d0658cf13b6da1a5c35b452a678:/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp diff --git a/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp b/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp index 4d426f26f0..4d4d00ee96 100644 --- a/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp +++ b/examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual.cpp @@ -1,4 +1,4 @@ -/* Copyright (c) 2009-2018. The SimGrid Team. +/* Copyright (c) 2009-2019. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it @@ -16,19 +16,17 @@ 1. Sleep until job's starting time is reached (if needed) 2. Launch the replay of the corresponding time-indepent trace. 3. Create inter-process noise, by spawning useless actors. - 4. Wait for completion (implicitly, via MSG_main's return) + 4. Wait for completion (via s4u::Engine's run method) */ #include #include -#include #include #include #include #include -#include #include #include #include @@ -46,8 +44,8 @@ struct Job { }; // ugly globals to avoid creating structures for giving args to processes -std::vector hosts; -int noise_between_jobs; +static std::vector hosts; +static int noise_between_jobs; static bool job_comparator(const Job* j1, const Job* j2) { @@ -56,31 +54,14 @@ static bool job_comparator(const Job* j1, const Job* j2) return j1->starting_time < j2->starting_time; } -struct s_smpi_replay_process_args { - Job* job; - msg_sem_t semaphore; - int rank; -}; - -static int smpi_replay_process(int argc, char* argv[]) +static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank) { - s_smpi_replay_process_args* args = static_cast(MSG_process_get_data(MSG_process_self())); - - if (args->semaphore != nullptr) - MSG_sem_acquire(args->semaphore); - - XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number, - args->job->smpi_app_name.c_str()); - - smpi_replay_run(&argc, &argv); - XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number, - args->job->smpi_app_name.c_str()); + XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str()); + smpi_replay_run(job->smpi_app_name.c_str(), rank, 0, job->traces_filenames[rank].c_str()); + XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, + job->smpi_app_name.c_str()); - if (args->semaphore != nullptr) - MSG_sem_release(args->semaphore); - - delete args; - return 0; + barrier->wait(); } // Sleeps for a given amount of time @@ -106,33 +87,19 @@ static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host) static int job_executor_process(Job* job) { - msg_sem_t job_semaphore = MSG_sem_init(1); XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); + simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1); + for (int i = 0; i < job->app_size; ++i) { - char** argv = xbt_new(char*, 5); - argv[0] = xbt_strdup("1"); // log only? - argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance - argv[2] = bprintf("%d", i); // rank - argv[3] = xbt_strdup(job->traces_filenames[i].c_str()); // smpi trace file for this rank - argv[4] = xbt_strdup("0"); // ? - - s_smpi_replay_process_args* args = new s_smpi_replay_process_args; - args->job = job; - args->semaphore = nullptr; - args->rank = i; - - if (i == 0) - args->semaphore = job_semaphore; - - char* str_pname = bprintf("%d_%d", job->unique_job_number, i); - MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv); + char* str_pname = bprintf("rank_%d_%d", job->unique_job_number, i); + simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i); xbt_free(str_pname); } - MSG_sem_acquire(job_semaphore); - MSG_sem_destroy(job_semaphore); + barrier->wait(); + simgrid::s4u::this_actor::sleep_for(1); XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); return 0; @@ -159,9 +126,10 @@ static int workload_executor_process(std::vector* workload) } // Let's finally run the job executor - std::string job_process_name = "job_" + job->smpi_app_name; + char* str_pname = bprintf("job_%04d", job->unique_job_number); XBT_INFO("Launching the job executor of job %d (app '%s')", job->unique_job_number, job->smpi_app_name.c_str()); - simgrid::s4u::Actor::create(job_process_name.c_str(), hosts[job->allocation[0]], job_executor_process, job); + simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[0]], job_executor_process, job); + xbt_free(str_pname); } return 0; @@ -188,45 +156,44 @@ static std::vector all_jobs(const std::string& workload_file) std::istringstream is(line); if (is >> app_name >> filename_unprefixed >> app_size >> starting_time >> alloc) { try { - - Job* job = new Job; - job->smpi_app_name = app_name; - job->filename = dir + "/" + filename_unprefixed; - job->app_size = app_size; - job->starting_time = starting_time; + Job job; + job.smpi_app_name = app_name; + job.filename = dir + "/" + filename_unprefixed; + job.app_size = app_size; + job.starting_time = starting_time; std::vector subparts; boost::split(subparts, alloc, boost::is_any_of(","), boost::token_compress_on); - if ((int)subparts.size() != job->app_size) - throw std::runtime_error("size/alloc inconsistency"); + if ((int)subparts.size() != job.app_size) + throw std::invalid_argument("size/alloc inconsistency"); - job->allocation.resize(subparts.size()); + job.allocation.resize(subparts.size()); for (unsigned int i = 0; i < subparts.size(); ++i) - job->allocation[i] = stoi(subparts[i]); + job.allocation[i] = stoi(subparts[i]); // Let's read the filename - std::ifstream traces_file(job->filename); + std::ifstream traces_file(job.filename); if (!traces_file.is_open()) - throw std::runtime_error("Cannot open file " + job->filename); + throw std::invalid_argument("Cannot open file " + job.filename); std::string traces_line; while (std::getline(traces_file, traces_line)) { boost::trim_right(traces_line); - job->traces_filenames.push_back(dir + "/" + traces_line); + job.traces_filenames.push_back(dir + "/" + traces_line); } - if (static_cast(job->traces_filenames.size()) < job->app_size) - throw std::runtime_error("size/tracefiles inconsistency"); - job->traces_filenames.resize(job->app_size); + if (static_cast(job.traces_filenames.size()) < job.app_size) + throw std::invalid_argument("size/tracefiles inconsistency"); + job.traces_filenames.resize(job.app_size); XBT_INFO("Job read: app='%s', file='%s', size=%d, start=%d, " "alloc='%s'", - job->smpi_app_name.c_str(), filename_unprefixed.c_str(), job->app_size, job->starting_time, + job.smpi_app_name.c_str(), filename_unprefixed.c_str(), job.app_size, job.starting_time, alloc.c_str()); - jobs.push_back(job); - } catch (const std::exception& e) { - printf("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what()); + jobs.push_back(new Job(std::move(job))); + } catch (const std::invalid_argument& e) { + xbt_die("Bad line '%s' of file '%s': %s.\n", line.c_str(), workload_file.c_str(), e.what()); } } } @@ -249,7 +216,6 @@ int main(int argc, char* argv[]) argv[0], argv[0]); // Simulation setting - MSG_init(&argc, argv); simgrid::s4u::Engine e(&argc, argv); e.load_platform(argv[1]); hosts = e.get_all_hosts(); @@ -260,7 +226,7 @@ int main(int argc, char* argv[]) // Let's register them for (const Job* job : jobs) - SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size); + SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size); SMPI_init(); @@ -277,7 +243,7 @@ int main(int argc, char* argv[]) } // Let's execute the workload - simgrid::s4u::Actor::create("workload_executor", hosts[0], workload_executor_process, &jobs); + simgrid::s4u::Actor::create("workload", hosts[0], workload_executor_process, &jobs); e.run(); XBT_INFO("Simulation finished! Final time: %g", e.get_clock());