Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[examples] smpi-replay-mmd: use s4u::Barrier
[simgrid.git] / examples / smpi / replay_multiple_manual_deploy / replay_multiple_manual.cpp
index c218cbe..9506b99 100644 (file)
@@ -16,7 +16,7 @@
         1. Sleep until job's starting time is reached (if needed)
         2. Launch the replay of the corresponding time-indepent trace.
         3. Create inter-process noise, by spawning useless actors.
-   4. Wait for completion (implicitly, via MSG_main's return)
+   4. Wait for completion (via s4u::Engine's run method)
 */
 
 #include <algorithm>
@@ -57,7 +57,7 @@ static bool job_comparator(const Job* j1, const Job* j2)
 
 struct s_smpi_replay_process_args {
   Job* job;
-  msg_sem_t semaphore;
+  simgrid::s4u::BarrierPtr barrier;
   int rank;
 };
 
@@ -65,9 +65,6 @@ static int smpi_replay_process(int argc, char* argv[])
 {
   s_smpi_replay_process_args* args = static_cast<s_smpi_replay_process_args*>(MSG_process_get_data(MSG_process_self()));
 
-  if (args->semaphore != nullptr)
-    MSG_sem_acquire(args->semaphore);
-
   XBT_INFO("Replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
            args->job->smpi_app_name.c_str());
 
@@ -75,8 +72,7 @@ static int smpi_replay_process(int argc, char* argv[])
   XBT_INFO("Finished replaying rank %d of job %d (smpi_app '%s')", args->rank, args->job->unique_job_number,
            args->job->smpi_app_name.c_str());
 
-  if (args->semaphore != nullptr)
-    MSG_sem_release(args->semaphore);
+  args->barrier->wait();
 
   delete args;
   return 0;
@@ -105,9 +101,10 @@ static void pop_some_processes(int nb_processes, simgrid::s4u::Host* host)
 
 static int job_executor_process(Job* job)
 {
-  msg_sem_t job_semaphore = MSG_sem_init(1);
   XBT_INFO("Executing job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());
 
+  simgrid::s4u::BarrierPtr barrier = simgrid::s4u::Barrier::create(job->app_size + 1);
+
   for (int i = 0; i < job->app_size; ++i) {
     char** argv = xbt_new(char*, 5);
     argv[0]     = xbt_strdup("1");                              // log only?
@@ -118,19 +115,15 @@ static int job_executor_process(Job* job)
 
     s_smpi_replay_process_args* args = new s_smpi_replay_process_args;
     args->job                        = job;
-    args->semaphore                  = nullptr;
+    args->barrier                    = barrier;
     args->rank                       = i;
 
-    if (i == 0)
-      args->semaphore = job_semaphore;
-
     char* str_pname = bprintf("%d_%d", job->unique_job_number, i);
     MSG_process_create_with_arguments(str_pname, smpi_replay_process, (void*)args, hosts[job->allocation[i]], 5, argv);
     xbt_free(str_pname);
   }
 
-  MSG_sem_acquire(job_semaphore);
-  MSG_sem_destroy(job_semaphore);
+  barrier->wait();
 
   XBT_INFO("Finished job %d (smpi_app '%s')", job->unique_job_number, job->smpi_app_name.c_str());