Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
SMPI: redesign the end of actors/ranks' lifetime
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 1 Aug 2019 07:34:53 +0000 (09:34 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 1 Aug 2019 07:54:44 +0000 (09:54 +0200)
The problem is that we don't use enough of refcounting in SMPI, so we
should not let any rank finish before the others, because it may be
involved in a communication or something.

Previously, there were a barrier at the end of the user code, so that
every ranks finishes exactly at the same time.

Now, the MPI instance keeps a reference on every actor it contains,
and the actor terminates with no delay after its code. The terminating
actors unregister from their MPI instance, but they are still
referenced until the last actor unregisters from the MPI instance.
Once the MPI instance is empty, it unregisters all the actors,
allowing their collection by the refcounting.

This commit changes the ending time of many ranks in many examples, as
expected. The ranks now terminate as soon as they are done, they are
not waiting the others anymore.

It introduces a segfault in ampi that I fail to understand. It seems
that a container is used after being collected in this example, but I
fail to see the reason so far.

12 files changed:
examples/smpi/ampi_test/ampi_test.tesh
examples/smpi/replay/replay-override-replayer.tesh
examples/smpi/replay/replay.tesh
examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual_coll1.tesh
examples/smpi/replay_multiple_manual_deploy/replay_multiple_manual_coll2_st_sr_noise.tesh
examples/smpi/smpi_msg_masterslave/msg_smpi.tesh
examples/smpi/trace/trace.tesh
src/smpi/include/private.hpp
src/smpi/include/smpi_actor.hpp
src/smpi/internals/smpi_actor.cpp
src/smpi/internals/smpi_deployment.cpp
teshsuite/smpi/fort_args/fort_args.f90

index 8e4a51a..5ed3165 100644 (file)
@@ -1,13 +1,8 @@
-# use the tested library, not the installed one
-# (since we want to pass it to the child, it has to be redefined before each command)
-# Go for the first test
-
 p Test if the load balancing code gets traced correctly
-! timeout 60
 
 $ rm -rf ${bindir:=.}/smpi_trace.trace ${bindir:=.}/smpi_trace.trace_files
 
-$ ../../smpi_script/bin/smpirun -trace-ti --cfg=tracing/filename:${bindir:=.}/smpi_trace.trace --cfg=tracing/smpi/format/ti-one-file:yes -no-privatize -replay ${srcdir:=.}/replay/actions_bcast.txt --log=replay.thresh:critical --log=smpi_replay.thresh:verbose --log=no_loc --cfg=smpi/simulate-computation:no -np 3 -platform ${srcdir:=.}/../platforms/small_platform.xml -hostfile ${srcdir:=.}/hostfile ./ampi_test/smpi_ampi_test --log=smpi_kernel.thres:warning --log=xbt_cfg.thres:warning --cfg=smpi/wtime:0
+$ ../../smpi_script/bin/smpirun -trace-ti --cfg=tracing/filename:${bindir:=.}/smpi_trace.trace --cfg=tracing/smpi/format/ti-one-file:yes -replay ${srcdir:=.}/replay/actions_bcast.txt --log=replay.thresh:critical --log=smpi_replay.thresh:verbose --log=no_loc --cfg=smpi/simulate-computation:no -np 3 -platform ${srcdir:=.}/../platforms/small_platform.xml -hostfile ${srcdir:=.}/hostfile ./ampi_test/smpi_ampi_test --log=smpi_kernel.thres:warning --log=xbt_cfg.thres:warning --cfg=smpi/wtime:0
 
 $ bash -c "cat ${bindir:=.}/smpi_trace.trace_files/*"
 > 0 init
index 7d1feac..2707a55 100644 (file)
@@ -195,16 +195,16 @@ $ tail -n +3 ./simgrid_override.trace
 > 12 13.138198 2 3 9
 > 13 14.286929 2 2
 > 12 14.286929 2 2 10
+> 13 14.286929 2 2
+> 7 14.286929 1 2
 > 13 18.250974 2 1
 > 12 18.250974 2 1 10
+> 13 18.250974 2 1
+> 7 18.250974 1 1
 > 13 19.691622 2 3
 > 12 19.691622 2 3 10
 > 13 19.691622 2 3
 > 7 19.691622 1 3
-> 13 19.691622 2 2
-> 7 19.691622 1 2
-> 13 19.691622 2 1
-> 7 19.691622 1 1
 
 $ rm -f ./simgrid_override.trace
 
index 38496fd..8768e56 100644 (file)
@@ -195,16 +195,16 @@ $ tail -n +3 ./simgrid.trace
 > 12 13.138198 2 3 9
 > 13 14.286929 2 2
 > 12 14.286929 2 2 10
+> 13 14.286929 2 2
+> 7 14.286929 1 2
 > 13 18.250974 2 1
 > 12 18.250974 2 1 10
+> 13 18.250974 2 1
+> 7 18.250974 1 1
 > 13 19.691622 2 3
 > 12 19.691622 2 3 10
 > 13 19.691622 2 3
 > 7 19.691622 1 3
-> 13 19.691622 2 2
-> 7 19.691622 1 2
-> 13 19.691622 2 1
-> 7 19.691622 1 1
 
 $ rm -f ./simgrid.trace
 
index 13f842d..1a865a0 100644 (file)
@@ -13,9 +13,9 @@ $ ./replay_multiple_manual ${platfdir}/small_platform_with_routers.xml  ${srcdir
 > [   0.000000] (job_0000@Bourassa) Executing job 0 (smpi_app 'alone')
 > [   0.000000] (rank_0_0@Bourassa) Replaying rank 0 of job 0 (smpi_app 'alone')
 > [   0.000000] (rank_0_1@Fafard) Replaying rank 1 of job 0 (smpi_app 'alone')
+> [  77.638391] (rank_0_1@Fafard) Finished replaying rank 1 of job 0 (smpi_app 'alone')
 > [  77.645196] (rank_0_0@Bourassa) Simulation time 77.645196
 > [  77.645196] (rank_0_0@Bourassa) Finished replaying rank 0 of job 0 (smpi_app 'alone')
-> [  77.645196] (rank_0_1@Fafard) Finished replaying rank 1 of job 0 (smpi_app 'alone')
 > [  78.645196] (job_0000@Bourassa) Finished job 0 (smpi_app 'alone')
 > [  78.645196] (maestro@) Simulation finished! Final time: 78.6452
 
@@ -29,9 +29,9 @@ $ ./replay_multiple_manual ${platfdir}/small_platform_with_routers.xml  ${srcdir
 > [   0.000000] (job_0000@Bourassa) Executing job 0 (smpi_app 'alone')
 > [   0.000000] (rank_0_0@Bourassa) Replaying rank 0 of job 0 (smpi_app 'alone')
 > [   0.000000] (rank_0_1@Fafard) Replaying rank 1 of job 0 (smpi_app 'alone')
+> [  77.638391] (rank_0_1@Fafard) Finished replaying rank 1 of job 0 (smpi_app 'alone')
 > [  77.645196] (rank_0_0@Bourassa) Simulation time 77.645196
 > [  77.645196] (rank_0_0@Bourassa) Finished replaying rank 0 of job 0 (smpi_app 'alone')
-> [  77.645196] (rank_0_1@Fafard) Finished replaying rank 1 of job 0 (smpi_app 'alone')
 > [  78.645196] (job_0000@Bourassa) Finished job 0 (smpi_app 'alone')
 > [  78.645196] (maestro@) Simulation finished! Final time: 78.6452
 
index 6ae0a9f..8d539ec 100644 (file)
@@ -19,11 +19,11 @@ $ ./replay_multiple_manual ${srcdir:=.}/../../platforms/small_platform_with_rout
 > [   0.000000] (job_0001@Bourassa) Executing job 1 (smpi_app 'job1')
 > [   0.000000] (rank_1_0@Bourassa) Replaying rank 0 of job 1 (smpi_app 'job1')
 > [   0.000000] (rank_1_1@Fafard) Replaying rank 1 of job 1 (smpi_app 'job1')
+> [ 155.249699] (rank_0_1@Fafard) Finished replaying rank 1 of job 0 (smpi_app 'job0')
+> [ 155.249699] (rank_1_1@Fafard) Finished replaying rank 1 of job 1 (smpi_app 'job1')
 > [ 155.256538] (rank_0_0@Bourassa) Simulation time 155.256538
 > [ 155.256538] (rank_0_0@Bourassa) Finished replaying rank 0 of job 0 (smpi_app 'job0')
-> [ 155.256538] (rank_0_1@Fafard) Finished replaying rank 1 of job 0 (smpi_app 'job0')
 > [ 155.256538] (rank_1_0@Bourassa) Finished replaying rank 0 of job 1 (smpi_app 'job1')
-> [ 155.256538] (rank_1_1@Fafard) Finished replaying rank 1 of job 1 (smpi_app 'job1')
 > [ 156.256538] (job_0000@Bourassa) Finished job 0 (smpi_app 'job0')
 > [ 156.256538] (job_0001@Bourassa) Finished job 1 (smpi_app 'job1')
 > [ 156.256538] (maestro@) Simulation finished! Final time: 156.257
index a603083..1fa33fa 100644 (file)
@@ -11,10 +11,10 @@ $ ./masterslave_mailbox_smpi ${srcdir:=.}/../../platforms/small_platform_with_ro
 > [Jupiter:alltoall_mpi:(7) 0.000000] [msg_test/INFO] alltoall for rank 2
 > [Fafard:alltoall_mpi:(8) 0.000000] [msg_test/INFO] alltoall for rank 3
 > [Ginette:master_mpi:(3) 0.000000] [msg_test/INFO] After comm 0
+> [Ginette:master_mpi:(3) 0.000000] [msg_test/INFO] After finalize 0 0
 > [Tremblay:master:(1) 0.002265] [msg_test/INFO] Sending "Task_1" (of 20) to mailbox "slave-0"
 > [Bourassa:master_mpi:(4) 0.016868] [msg_test/INFO] After comm 1
 > [Bourassa:master_mpi:(4) 0.016868] [msg_test/INFO] After finalize 1 0
-> [Ginette:master_mpi:(3) 0.016868] [msg_test/INFO] After finalize 0 0
 > [Ginette:alltoall_mpi:(5) 0.098642] [msg_test/INFO] after alltoall 0
 > [Fafard:alltoall_mpi:(8) 0.099069] [msg_test/INFO] after alltoall 3
 > [Bourassa:alltoall_mpi:(6) 0.127526] [msg_test/INFO] after alltoall 1
index 58dea5b..9bb7c5e 100644 (file)
@@ -1332,16 +1332,16 @@ $ tail -n +3 ${bindir:=.}/smpi_trace.trace
 > 12 11.902080 2 2 12
 > 13 11.904056 2 1
 > 12 11.904056 2 1 18
+> 13 11.904056 2 1
+> 7 11.904056 1 1
+> 13 11.905518 2 2
 > 13 11.905518 2 2
 > 12 11.905518 2 2 18
+> 7 11.905518 1 2
 > 13 11.906032 2 3
 > 12 11.906032 2 3 18
 > 13 11.906032 2 3
 > 7 11.906032 1 3
-> 13 11.906032 2 1
-> 7 11.906032 1 1
-> 13 11.906032 2 2
-> 7 11.906032 1 2
 $ rm -f ${bindir:=.}/smpi_trace.trace
 
 $ ${bindir:=.}/../../../smpi_script/bin/smpirun -trace -trace-resource -trace-file ${bindir:=.}/smpi_trace.trace -hostfile ${srcdir:=.}/../hostfile -platform ${platfdir:=.}/small_platform.xml --cfg=path:${srcdir:=.}/../msg --cfg=smpi/host-speed:1 -np 3 ${bindir:=.}/smpi_trace --log=smpi_kernel.thres:warning --log=xbt_cfg.thres:warning
index 5fcbe2f..4b0db69 100644 (file)
@@ -75,8 +75,9 @@ XBT_PRIVATE int smpi_get_universe_size();
 
 XBT_PRIVATE void smpi_deployment_register_process(const std::string& instance_id, int rank,
                                                   simgrid::s4u::ActorPtr actor);
+XBT_PRIVATE void smpi_deployment_unregister_process(const std::string& instance_id);
+
 XBT_PRIVATE MPI_Comm* smpi_deployment_comm_world(const std::string& instance_id);
-XBT_PRIVATE simgrid::s4u::Barrier* smpi_deployment_finalization_barrier(const std::string& instance_id);
 XBT_PRIVATE void smpi_deployment_cleanup_instances();
 
 XBT_PRIVATE void smpi_comm_copy_buffer_callback(simgrid::kernel::activity::CommImpl* comm, void* buff,
index 5d22d03..6528fc9 100644 (file)
@@ -27,7 +27,6 @@ class ActorExt {
   int sampling_ = 0; /* inside an SMPI_SAMPLE_ block? */
   std::string instance_id_;
   bool replaying_ = false; /* is the process replaying a trace */
-  s4u::Barrier* finalization_barrier_ = nullptr;
   smpi_trace_call_location_t trace_call_loc_;
   s4u::ActorPtr actor_                           = nullptr;
   smpi_privatization_region_t privatized_region_ = nullptr;
index 0b8ff88..90e2db2 100644 (file)
@@ -63,11 +63,7 @@ void ActorExt::finalize()
   state_ = SmpiProcessState::FINALIZED;
   XBT_DEBUG("<%ld> Process left the game", actor_->get_pid());
 
-  // This leads to an explosion of the search graph which cannot be reduced:
-  if (MC_is_active() || MC_record_replay_is_active())
-    return;
-  // wait for all pending asynchronous comms to finish
-  finalization_barrier_->wait();
+  smpi_deployment_unregister_process(instance_id_);
 }
 
 /** @brief Check if a process is finalized */
@@ -225,17 +221,13 @@ void ActorExt::init()
     SMPI_switch_data_segment(self);
   }
 
-  std::string instance_id = self->get_property("instance_id");
-  const int rank          = xbt_str_parse_int(self->get_property("rank"), "Cannot parse rank");
+  ext->instance_id_ = self->get_property("instance_id");
+  const int rank    = xbt_str_parse_int(self->get_property("rank"), "Cannot parse rank");
 
   ext->state_ = SmpiProcessState::INITIALIZING;
-  smpi_deployment_register_process(instance_id, rank, self);
+  smpi_deployment_register_process(ext->instance_id_, rank, self);
 
-  ext->instance_id_              = instance_id;
-  ext->comm_world_               = smpi_deployment_comm_world(instance_id);
-  simgrid::s4u::Barrier* barrier = smpi_deployment_finalization_barrier(instance_id);
-  if (barrier != nullptr) // don't overwrite the current one if the instance has none
-    ext->finalization_barrier_ = barrier;
+  ext->comm_world_ = smpi_deployment_comm_world(ext->instance_id_);
 
   // set the process attached to the mailbox
   ext->mailbox_small_->set_receiver(ext->actor_);
index c58c9cb..d2dd6b4 100644 (file)
@@ -10,6 +10,8 @@
 #include "smpi_comm.hpp"
 #include <map>
 
+XBT_LOG_EXTERNAL_CATEGORY(smpi);
+
 namespace simgrid {
 namespace smpi {
 namespace app {
@@ -18,12 +20,8 @@ static int universe_size = 0;
 
 class Instance {
 public:
-  Instance(const std::string& name, int max_no_processes, MPI_Comm comm, simgrid::s4u::Barrier* finalization_barrier)
-      : name_(name)
-      , size_(max_no_processes)
-      , present_processes_(0)
-      , comm_world_(comm)
-      , finalization_barrier_(finalization_barrier)
+  Instance(const std::string& name, int max_no_processes, MPI_Comm comm)
+      : name_(name), size_(max_no_processes), comm_world_(comm)
   {
     MPI_Group group = new simgrid::smpi::Group(size_);
     comm_world_     = new simgrid::smpi::Comm(group, nullptr, 0, -1);
@@ -36,9 +34,9 @@ public:
 
   const std::string name_;
   int size_;
-  int present_processes_;
+  std::vector<simgrid::s4u::ActorPtr> present_processes_;
+  unsigned int finalized_ranks_ = 0;
   MPI_Comm comm_world_;
-  simgrid::s4u::Barrier* finalization_barrier_;
 };
 }
 }
@@ -70,7 +68,7 @@ void SMPI_app_instance_register(const char *name, xbt_main_func_t code, int num_
       host->extension_set(new simgrid::smpi::Host(host));
   }
 
-  Instance instance(std::string(name), num_processes, MPI_COMM_NULL, new simgrid::s4u::Barrier(num_processes));
+  Instance instance(std::string(name), num_processes, MPI_COMM_NULL);
 
   smpi_instances.insert(std::pair<std::string, Instance>(name, instance));
 }
@@ -79,32 +77,36 @@ void smpi_deployment_register_process(const std::string& instance_id, int rank,
 {
   Instance& instance = smpi_instances.at(instance_id);
 
-  instance.present_processes_++;
+  instance.present_processes_.push_back(actor);
   instance.comm_world_->group()->set_mapping(actor, rank);
 }
 
-MPI_Comm* smpi_deployment_comm_world(const std::string& instance_id)
+void smpi_deployment_unregister_process(const std::string& instance_id)
 {
-  if (smpi_instances.empty()) { // no instance registered, we probably used smpirun.
-    return nullptr;
-  }
   Instance& instance = smpi_instances.at(instance_id);
-  return &instance.comm_world_;
+
+  instance.finalized_ranks_++;
+  if (instance.finalized_ranks_ == instance.present_processes_.size()) {
+    instance.present_processes_.clear();
+    simgrid::smpi::Comm::destroy(instance.comm_world_);
+    smpi_instances.erase(instance_id);
+  }
 }
 
-simgrid::s4u::Barrier* smpi_deployment_finalization_barrier(const std::string& instance_id)
+MPI_Comm* smpi_deployment_comm_world(const std::string& instance_id)
 {
   if (smpi_instances.empty()) { // no instance registered, we probably used smpirun.
     return nullptr;
   }
   Instance& instance = smpi_instances.at(instance_id);
-  return instance.finalization_barrier_;
+  return &instance.comm_world_;
 }
 
 void smpi_deployment_cleanup_instances(){
   for (auto const& item : smpi_instances) {
+    XBT_CINFO(smpi, "Stalling SMPI instance: %s. Do all your MPI ranks call MPI_Finalize()?", item.first.c_str());
     Instance instance = item.second;
-    delete instance.finalization_barrier_;
+    instance.present_processes_.clear();
     simgrid::smpi::Comm::destroy(instance.comm_world_);
   }
   smpi_instances.clear();
index d1f4aea..0e634ed 100644 (file)
@@ -1,9 +1,12 @@
 ! Check that getarg does somethig sensible.
 program getarg_1
+  use mpi
   CHARACTER*10 ARGS, ARGS2
   INTEGER*4 I
   INTEGER*2 I2
+  INTEGER ierr
   I = 0
+  call MPI_Init(ierr)
   CALL GETARG(I,ARGS)
   ! This should return the invoking command.  The actual value depends 
   ! on the OS, but a blank string is wrong no matter what.
@@ -22,4 +25,5 @@ program getarg_1
   I = 1000
   CALL GETARG(I,ARGS)
   if (args.ne.'') STOP 6
+  call MPI_Finalize(ierr)
 end