1. Sleep until job's starting time is reached (if needed)
2. Launch the replay of the corresponding time-indepent trace.
3. Create inter-process noise, by spawning useless actors.
- 4. Wait for completion (implicitly, via MSG_main's return)
+ 4. Wait for completion (via s4u::Engine's run method)
*/
#include <algorithm>
#include <boost/algorithm/string.hpp>
-#include <simgrid/msg.h>
#include <simgrid/s4u.hpp>
#include <smpi/smpi.h>
#include <xbt/file.hpp>
};
// ugly globals to avoid creating structures for giving args to processes
-std::vector<simgrid::s4u::Host*> hosts;
-int noise_between_jobs;
+static std::vector<simgrid::s4u::Host*> hosts;
+static int noise_between_jobs;
static bool job_comparator(const Job* j1, const Job* j2)
{
return j1->starting_time < j2->starting_time;
}
-struct s_smpi_replay_process_args {
- Job* job;
- msg_sem_t semaphore;
- int rank;
-};
-
-static int smpi_replay_process(int argc, char* argv[])
+static void smpi_replay_process(Job* job, simgrid::s4u::BarrierPtr barrier, int rank)
{
- s_smpi_replay_process_args* args = static_cast<s_smpi_replay_process_args*>(MSG_process_get_data(MSG_process_self()));
-
- if (args->semaphore != nullptr)
- MSG_sem_acquire(args->semaphore);
-
- XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
- args->job->smpi_app_name.c_str());
-
+ // Prepare data for smpi_replay_run
+ int argc = 5;
+ char** argv = xbt_new(char*, argc);
+ argv[0] = xbt_strdup("1"); // log only?
+ argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance
+ argv[2] = bprintf("%d", rank); // rank
+ argv[3] = xbt_strdup(job->traces_filenames[rank].c_str()); // smpi trace file for this rank
+ argv[4] = xbt_strdup("0"); // ?
+
+ XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number, job->smpi_app_name.c_str());
smpi_replay_run(&argc, &argv);
- XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
- args->job->smpi_app_name.c_str());
-
- if (args->semaphore != nullptr)
- MSG_sem_release(args->semaphore);
+ XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", rank, job->unique_job_number,
+ job->smpi_app_name.c_str());
- delete args;
- return 0;
+ barrier->wait();
}
// Sleeps for a given amount of time
static int job_executor_process(Job* job)
{
- msg_sem_t job_semaphore = MSG_sem_init(1);
XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
- for (int i = 0; i < job->app_size; ++i) {
- char** argv = xbt_new(char*, 5);
- argv[0] = xbt_strdup("1"); // log only?
- argv[1] = xbt_strdup(job->smpi_app_name.c_str()); // application instance
- argv[2] = bprintf("%d", i); // rank
- argv[3] = xbt_strdup(job->traces_filenames[i].c_str()); // smpi trace file for this rank
- argv[4] = xbt_strdup("0"); // ?
-
- s_smpi_replay_process_args* args = new s_smpi_replay_process_args;
- args->job = job;
- args->semaphore = nullptr;
- args->rank = i;
-
- if (i == 0)
- args->semaphore = job_semaphore;
+ simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1);
+ for (int i = 0; i < job->app_size; ++i) {
char* str_pname = bprintf("%d_%d", job->unique_job_number, i);
- MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv);
+ simgrid::s4u::Actor::create(str_pname, hosts[job->allocation[i]], smpi_replay_process, job, barrier, i);
xbt_free(str_pname);
}
- MSG_sem_acquire(job_semaphore);
- MSG_sem_destroy(job_semaphore);
+ barrier->wait();
XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
argv[0], argv[0]);
// Simulation setting
- MSG_init(&argc, argv);
simgrid::s4u::Engine e(&argc, argv);
e.load_platform(argv[1]);
hosts = e.get_all_hosts();
// Let's register them
for (const Job* job : jobs)
- SMPI_app_instance_register(job->smpi_app_name.c_str(), smpi_replay_process, job->app_size);
+ SMPI_app_instance_register(job->smpi_app_name.c_str(), nullptr, job->app_size);
SMPI_init();