Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Minimal change to support host on-off and permanent mailboxes
authorFabien Chaix <chaix@ics.forth.gr>
Mon, 28 Feb 2022 21:36:34 +0000 (23:36 +0200)
committerFabien Chaix <chaix@ics.forth.gr>
Mon, 28 Feb 2022 21:36:34 +0000 (23:36 +0200)
src/bindings/python/simgrid_python.cpp
src/kernel/EngineImpl.cpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/MailboxImpl.cpp
src/kernel/activity/MailboxImpl.hpp
src/kernel/actor/ActorImpl.cpp
src/kernel/actor/ActorImpl.hpp
teshsuite/s4u/host-on-off-actors/host-on-off-actors.tesh

index 1930f15..e6a3e8e 100644 (file)
@@ -84,12 +84,13 @@ PYBIND11_MODULE(simgrid, m)
   // Swapped contexts are broken, starting from pybind11 v2.8.0.  Use thread contexts by default.
   simgrid::s4u::Engine::set_config("contexts/factory:thread");
 
-  // Internal exception used to kill actors and sweep the RAII chimney (free objects living on the stack)
-  static py::object pyForcefulKillEx(py::register_exception<simgrid::ForcefulKillException>(m, "ActorKilled"));
-
   py::register_exception<simgrid::NetworkFailureException>(m, "NetworkFailureException");
   py::register_exception<simgrid::TimeoutException>(m, "TimeoutException");
-
+  py::register_exception<simgrid::HostFailureException>(m, "HostFailureException");
+  py::register_exception<simgrid::StorageFailureException>(m, "StorageFailureException");
+  py::register_exception<simgrid::VmFailureException>(m, "VmFailureException");
+  py::register_exception<simgrid::CancelException>(m, "CancelException");
+  
   /* this_actor namespace */
   m.def_submodule("this_actor", "Bindings of the s4u::this_actor namespace. See the C++ documentation for details.")
       .def(
@@ -217,10 +218,8 @@ PYBIND11_MODULE(simgrid, m)
                 if (py::isinstance<py::function>(res))
                   res();
               } catch (const py::error_already_set& ex) {
-                if (ex.matches(pyForcefulKillEx)) {
-                  XBT_VERB("Actor killed");
-                  simgrid::ForcefulKillException::do_throw(); // Forward that ForcefulKill exception
-                }
+                XBT_VERB("Actor killed");
+                simgrid::ForcefulKillException::do_throw(); // Forward that ForcefulKill exception
                 throw;
               }
             });
@@ -743,10 +742,8 @@ PYBIND11_MODULE(simgrid, m)
                 py::gil_scoped_acquire py_context;
                 fun(*args);
               } catch (const py::error_already_set& ex) {
-                if (ex.matches(pyForcefulKillEx)) {
-                  XBT_VERB("Actor killed");
-                  simgrid::ForcefulKillException::do_throw(); // Forward that ForcefulKill exception
-                }
+                XBT_VERB("Actor killed");
+                simgrid::ForcefulKillException::do_throw(); // Forward that ForcefulKill exception
                 throw;
               }
             });
index 1e669cb..b35092d 100644 (file)
@@ -297,7 +297,7 @@ void EngineImpl::shutdown()
   XBT_DEBUG("EngineImpl::shutdown() called. Simulation's over.");
 #if HAVE_SMPI
   if (not instance_->actor_list_.empty()) {
-    if (smpi_process()->initialized()) {
+    if (smpi_process() && smpi_process()->initialized()) {
       xbt_die("Process exited without calling MPI_Finalize - Killing simulation");
     } else {
       XBT_WARN("Process called exit when leaving - Skipping cleanups");
index efdec76..c34c4fe 100644 (file)
@@ -120,7 +120,10 @@ CommImpl* CommImpl::start()
   /* If both the sender and the receiver are already there, start the communication */
   if (get_state() == State::READY) {
     from_ = from_ != nullptr ? from_ : src_actor_->get_host();
+    xbt_assert(from_->is_on());
     to_   = to_ != nullptr ? to_ : dst_actor_->get_host();
+    xbt_assert(to_->is_on());
+
     /* Getting the network_model from the origin host
      * Valid while we have a single network model, otherwise we would need to change this function to first get the
      * routes and later create the respective surf actions */
@@ -420,14 +423,17 @@ void CommImpl::post()
     set_state(State::SRC_TIMEOUT);
   else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED)
     set_state(State::DST_TIMEOUT);
-  else if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED)
+  else if ((from_ && !from_->is_on()) || (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED))
     set_state(State::SRC_HOST_FAILURE);
-  else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED)
+  else if ((to_ && !to_->is_on()) || (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED))
     set_state(State::DST_HOST_FAILURE);
   else if (surf_action_ && surf_action_->get_state() == resource::Action::State::FAILED) {
     set_state(State::LINK_FAILURE);
-  } else
+  } else if(get_state() == State::RUNNING ) {
+    xbt_assert(from_ && from_->is_on());
+    xbt_assert(to_ && to_->is_on());
     set_state(State::DONE);
+  }
 
   XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
             src_actor_.get(), dst_actor_.get(), detached_);
index febedbe..20c12e3 100644 (file)
@@ -20,15 +20,28 @@ namespace activity {
 
 unsigned MailboxImpl::next_id_ = 0;
 
+MailboxImpl::~MailboxImpl() {
+  clear();
+  set_receiver(nullptr);
+}
+
+
 /** @brief set the receiver of the mailbox to allow eager sends
  *  @param actor The receiving dude
  */
 void MailboxImpl::set_receiver(s4u::ActorPtr actor)
 {
+  if(this->permanent_receiver_) {
+    std::vector<MailboxImpl*>& mboxes=this->permanent_receiver_->mailboxes;
+    mboxes.erase(std::remove(mboxes.begin(), mboxes.end(),this), mboxes.end());
+  }
+
   if (actor != nullptr)
     this->permanent_receiver_ = actor->get_impl();
   else
     this->permanent_receiver_ = nullptr;
+
 }
 /** @brief Pushes a communication activity into a mailbox
  *  @param comm What to add
@@ -55,6 +68,28 @@ void MailboxImpl::remove(const CommImplPtr& comm)
     }
   xbt_die("Comm %p not found in mailbox %s", comm.get(), this->get_cname());
 }
+/** @brief Removes all communication activities from a mailbox
+*/
+void MailboxImpl::clear()
+{
+  for( auto comm : done_comm_queue_ ) {
+    comm->cancel();
+    comm->set_state(State::DST_HOST_FAILURE);
+  }
+  done_comm_queue_.clear();
+
+  //CommImpl::cancel() will remove the comm from the mailbox..
+  while( !comm_queue_.empty() ) {
+    auto comm=comm_queue_.back();
+    if(comm->get_state() == State::WAITING && !comm->detached()) {
+      comm->cancel();
+      comm->set_state(State::DST_HOST_FAILURE);
+    } else
+      comm_queue_.pop_back();
+  }
+
+}
 
 CommImplPtr MailboxImpl::iprobe(int type, bool (*match_fun)(void*, void*, CommImpl*), void* data)
 {
index 3247b05..21a8c45 100644 (file)
@@ -43,6 +43,8 @@ public:
   /** @brief Public interface */
   unsigned get_id() const { return id_; }
 
+  ~MailboxImpl();
+
   const s4u::Mailbox* get_iface() const { return &piface_; }
   s4u::Mailbox* get_iface() { return &piface_; }
 
@@ -52,6 +54,7 @@ public:
   void push(CommImplPtr comm);
   void push_done(CommImplPtr done_comm) { done_comm_queue_.push_back(done_comm); }
   void remove(const CommImplPtr& comm);
+  void clear();
   CommImplPtr iprobe(int type, bool (*match_fun)(void*, void*, CommImpl*), void* data);
   CommImplPtr find_matching_comm(CommImpl::Type type, bool (*match_fun)(void*, void*, CommImpl*), void* this_user_data,
                                  const CommImplPtr& my_synchro, bool done, bool remove_matching);
index efe1fa4..a81c74c 100644 (file)
@@ -169,6 +169,9 @@ void ActorImpl::cleanup()
     activity->cancel();
   activities_.clear();
 
+  while(!mailboxes.empty())
+    mailboxes.back()->set_receiver(nullptr);
+
   XBT_DEBUG("%s@%s(%ld) should not run anymore", get_cname(), get_host()->get_cname(), get_pid());
 
   if (EngineImpl::get_instance()->is_maestro(this)) /* Do not cleanup maestro */
@@ -212,6 +215,9 @@ void ActorImpl::exit()
     activity->cancel();
   activities_.clear();
 
+  while(!mailboxes.empty())
+    mailboxes.back()->set_receiver(nullptr);
+
   // Forcefully kill the actor if its host is turned off. Not a HostFailureException because you should not survive that
   this->throw_exception(std::make_exception_ptr(ForcefulKillException(host_->is_on() ? "exited" : "host failed")));
 }
index a1672be..7385792 100644 (file)
@@ -28,6 +28,9 @@ class XBT_PUBLIC ActorImpl : public xbt::PropertyHolder {
   bool auto_restart_ = false;
   unsigned stacksize_; // set to default value in constructor
 
+  std::vector<activity::MailboxImpl*> mailboxes;
+  friend activity::MailboxImpl;
+
 public:
   xbt::string name_;
   ActorImpl(xbt::string name, s4u::Host* host);
index 8dee4ed..3eb77ab 100644 (file)
@@ -27,7 +27,7 @@ $ ./host-on-off-actors ${platfdir}/small_platform.xml 4 --log=no_loc
 > [Tremblay:test_launcher:(1) 20.000000] [s4u_test/INFO]   Turn Jupiter off
 > [Tremblay:test_launcher:(1) 20.000000] [s4u_test/INFO] Test 4 is ok.  (number of actors : 2, it should be 1 or 2 if RX has not been satisfied). An exception is raised when we turn off a node that has an actor sleeping
 > [Tremblay:test_launcher:(1) 20.000000] [s4u_test/INFO]   Test done. See you!
-> [Tremblay:commRX:(2) 25.033047] [s4u_test/INFO]   Receive message: COMM
+> [Tremblay:commRX:(2) 25.033047] [s4u_test/INFO]   Receive message: TRANSFER_FAILURE
 > [Tremblay:commRX:(2) 25.033047] [s4u_test/INFO]   RX Done
 > [25.033047] [s4u_test/INFO] Simulation time 25.033