Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge ActivityImpl::post() and ::finish()
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 23 Feb 2023 22:59:27 +0000 (23:59 +0100)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Thu, 23 Feb 2023 22:59:27 +0000 (23:59 +0100)
23 files changed:
src/kernel/EngineImpl.cpp
src/kernel/activity/ActivityImpl.cpp
src/kernel/activity/ActivityImpl.hpp
src/kernel/activity/BarrierImpl.cpp
src/kernel/activity/BarrierImpl.hpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/CommImpl.hpp
src/kernel/activity/ExecImpl.cpp
src/kernel/activity/ExecImpl.hpp
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp
src/kernel/activity/MailboxImpl.cpp
src/kernel/activity/MailboxImpl.hpp
src/kernel/activity/MutexImpl.cpp
src/kernel/activity/MutexImpl.hpp
src/kernel/activity/SemaphoreImpl.cpp
src/kernel/activity/SemaphoreImpl.hpp
src/kernel/activity/SleepImpl.cpp
src/kernel/activity/SleepImpl.hpp
src/kernel/activity/Synchro.cpp
src/kernel/activity/Synchro.hpp
src/kernel/actor/ActorImpl.cpp
src/s4u/s4u_Exec.cpp

index d493956..ff519e4 100644 (file)
@@ -363,7 +363,7 @@ void EngineImpl::handle_ended_actions() const
         if (action->get_activity()->get_actor() == maestro_)
           action->get_activity()->get_iface()->complete(s4u::Activity::State::FAILED);
 
-        activity::ActivityImplPtr(action->get_activity())->post();
+        activity::ActivityImplPtr(action->get_activity())->finish();
       }
     }
     XBT_DEBUG("Handling the terminated actions (if any)");
@@ -376,7 +376,7 @@ void EngineImpl::handle_ended_actions() const
         if (action->get_activity()->get_actor() == maestro_)
           action->get_activity()->get_iface()->complete(s4u::Activity::State::FINISHED);
 
-        activity::ActivityImplPtr(action->get_activity())->post();
+        activity::ActivityImplPtr(action->get_activity())->finish();
       }
     }
   }
index 39f6d47..f425dc8 100644 (file)
@@ -63,7 +63,7 @@ const char* ActivityImpl::get_state_str() const
 bool ActivityImpl::test(actor::ActorImpl* issuer)
 {
   if (state_ != State::WAITING && state_ != State::RUNNING) {
-    post();
+    finish();
     issuer->exception_ = nullptr; // Do not propagate exception in that case
     return true;
   }
@@ -106,7 +106,7 @@ void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout)
 
   /* If the synchro is already finished then perform the error handling */
   if (state_ != State::WAITING && state_ != State::RUNNING) {
-    post();
+    finish();
   } else {
     /* we need a sleep action (even when the timeout is infinite) to be notified of host failures */
     /* Comms handle that a bit differently of the other activities */
@@ -145,7 +145,7 @@ void ActivityImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<Acti
       act->simcalls_.push_back(&issuer->simcall_);
       observer->set_result(idx);
       act->set_state(State::DONE);
-      act->post();
+      act->finish();
     }
     return;
   }
@@ -167,7 +167,7 @@ void ActivityImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<Acti
     act->simcalls_.push_back(&issuer->simcall_);
     /* see if the synchro is already finished */
     if (act->get_state() != State::WAITING && act->get_state() != State::RUNNING) {
-      act->post();
+      act->finish();
       break;
     }
   }
index d974e29..cbabdcd 100644 (file)
@@ -79,11 +79,8 @@ public:
   virtual void resume();
   virtual void cancel();
 
-  virtual void post() = 0; // Called by the main loop when the activity is marked as terminated or failed by its model.
-                           // Setups the status, clean things up, and call finish()
   virtual void set_exception(actor::ActorImpl* issuer) = 0; // Raising exceptions and stuff
-  virtual void finish() = 0; // Unlock all simcalls blocked on that activity, either because it was marked as done by
-                             // the model or because it terminated without waiting for the model
+  virtual void finish() = 0; // Setups the status, clean things up, unlock all simcalls blocked on that activity.
 
   s4u::Host* get_host() const { return hosts_.front(); }
   const std::vector<s4u::Host*>& get_hosts() const { return hosts_; };
index fda107f..7b6436e 100644 (file)
@@ -28,10 +28,6 @@ void BarrierAcquisitionImpl::wait_for(actor::ActorImpl* issuer, double timeout)
     // Already in the queue
   }
 }
-void BarrierAcquisitionImpl::post()
-{
-  finish();
-}
 
 void BarrierAcquisitionImpl::finish()
 {
@@ -60,7 +56,7 @@ BarrierAcquisitionImplPtr BarrierImpl::acquire_async(actor::ActorImpl* issuer)
     for (auto const& acqui : ongoing_acquisitions_) {
       acqui->granted_ = true;
       if (acqui == acqui->get_issuer()->waiting_synchro_)
-        acqui->post();
+        acqui->finish();
       // else, the issuer is not blocked on this acquisition so no need to release it
     }
     ongoing_acquisitions_.clear(); // Rearm the barier for subsequent uses
index 7455c9d..c4dd9e0 100644 (file)
@@ -32,7 +32,6 @@ public:
 
   bool test(actor::ActorImpl* issuer = nullptr) override;
   void wait_for(actor::ActorImpl* issuer, double timeout) override;
-  void post() override;
   void finish() override;
   void set_exception(actor::ActorImpl* issuer) override
   { /* nothing to do */
index 0d81366..3a0b63c 100644 (file)
@@ -139,7 +139,7 @@ CommImpl* CommImpl::start()
       XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", from_->get_cname(),
                 to_->get_cname());
       set_state(State::LINK_FAILURE);
-      post();
+      finish();
 
     } else if ((src_actor_ != nullptr && src_actor_->is_suspended()) ||
                (dst_actor_ != nullptr && dst_actor_->is_suspended())) {
@@ -355,7 +355,7 @@ void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
   if (MC_is_active() || MC_record_replay_is_active()) {
     // FIXME: what about timeouts?
     set_state(State::DONE);
-    post();
+    finish();
     return;
   }
   ActivityImpl::wait_for(issuer, timeout);
@@ -397,36 +397,6 @@ void CommImpl::cancel()
   }
 }
 
-void CommImpl::post()
-{
-  on_completion(*this);
-
-  /* Update synchro state */
-  if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
-    set_state(State::SRC_TIMEOUT);
-  else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED)
-    set_state(State::DST_TIMEOUT);
-  else if ((from_ && not from_->is_on()) || (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED))
-    set_state(State::SRC_HOST_FAILURE);
-  else if ((to_ && not to_->is_on()) || (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED))
-    set_state(State::DST_HOST_FAILURE);
-  else if (model_action_ && model_action_->get_state() == resource::Action::State::FAILED) {
-    set_state(State::LINK_FAILURE);
-  } else if (get_state() == State::RUNNING) {
-    xbt_assert(from_ && from_->is_on());
-    xbt_assert(to_ && to_->is_on());
-    set_state(State::DONE);
-  }
-
-  XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
-            src_actor_.get(), dst_actor_.get(), detached_);
-
-  /* destroy the model actions associated with the communication activity */
-  clean_action();
-
-  /* Answer all simcalls associated with the synchro */
-  finish();
-}
 void CommImpl::set_exception(actor::ActorImpl* issuer)
 {
   switch (get_state()) {
@@ -494,7 +464,32 @@ void CommImpl::set_exception(actor::ActorImpl* issuer)
 
 void CommImpl::finish()
 {
-  XBT_DEBUG("CommImpl::finish() in state %s", get_state_str());
+  XBT_DEBUG("CommImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
+            src_actor_.get(), dst_actor_.get(), detached_);
+
+  on_completion(*this);
+
+  /* Update synchro state */
+  if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
+    set_state(State::SRC_TIMEOUT);
+  else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED)
+    set_state(State::DST_TIMEOUT);
+  else if ((from_ && not from_->is_on()) ||
+           (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED))
+    set_state(State::SRC_HOST_FAILURE);
+  else if ((to_ && not to_->is_on()) || (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED))
+    set_state(State::DST_HOST_FAILURE);
+  else if (model_action_ && model_action_->get_state() == resource::Action::State::FAILED) {
+    set_state(State::LINK_FAILURE);
+  } else if (get_state() == State::RUNNING) {
+    xbt_assert(from_ && from_->is_on());
+    xbt_assert(to_ && to_->is_on());
+    set_state(State::DONE);
+  }
+
+  /* destroy the model actions associated with the communication activity */
+  clean_action();
+
   /* If the synchro is still in a rendez-vous point then remove from it */
   if (mbox_)
     mbox_->remove(this);
index 7540c4a..32fef45 100644 (file)
@@ -70,7 +70,6 @@ public:
   void suspend() override;
   void resume() override;
   void cancel() override;
-  void post() override;
   void set_exception(actor::ActorImpl* issuer) override;
   void finish() override;
 
index a6789b1..6d66a12 100644 (file)
@@ -133,33 +133,6 @@ ExecImpl& ExecImpl::update_sharing_penalty(double sharing_penalty)
   return *this;
 }
 
-void ExecImpl::post()
-{
-  if (model_action_ != nullptr) {
-    if (auto const& hosts = get_hosts();
-        std::any_of(hosts.begin(), hosts.end(), [](const s4u::Host* host) { return not host->is_on(); })) {
-      /* If one of the hosts running the synchro failed, notice it. This way, the asking
-       * process can be killed if it runs on that host itself */
-      set_state(State::FAILED);
-    } else if (model_action_->get_state() == resource::Action::State::FAILED) {
-      /* If all the hosts are running the synchro didn't fail, then the synchro was canceled */
-      set_state(State::CANCELED);
-    } else {
-      set_state(State::DONE);
-    }
-
-    clean_action();
-  }
-
-  if (get_actor() != nullptr)
-    get_actor()->activities_.erase(this);
-
-  if (get_state() != State::FAILED && cb_id_ >= 0)
-    s4u::Host::on_state_change.disconnect(cb_id_);
-  /* Answer all simcalls associated with the synchro */
-  finish();
-}
-
 void ExecImpl::set_exception(actor::ActorImpl* issuer)
 {
   switch (get_state()) {
@@ -187,6 +160,28 @@ void ExecImpl::set_exception(actor::ActorImpl* issuer)
 void ExecImpl::finish()
 {
   XBT_DEBUG("ExecImpl::finish() in state %s", get_state_str());
+  if (model_action_ != nullptr) {
+    if (auto const& hosts = get_hosts();
+        std::any_of(hosts.begin(), hosts.end(), [](const s4u::Host* host) { return not host->is_on(); })) {
+      /* If one of the hosts running the synchro failed, notice it. This way, the asking
+       * process can be killed if it runs on that host itself */
+      set_state(State::FAILED);
+    } else if (model_action_->get_state() == resource::Action::State::FAILED) {
+      /* If all the hosts are running the synchro didn't fail, then the synchro was canceled */
+      set_state(State::CANCELED);
+    } else {
+      set_state(State::DONE);
+    }
+
+    clean_action();
+  }
+
+  if (get_actor() != nullptr)
+    get_actor()->activities_.erase(this);
+
+  if (get_state() != State::FAILED && cb_id_ >= 0)
+    s4u::Host::on_state_change.disconnect(cb_id_);
+
   while (not simcalls_.empty()) {
     actor::Simcall* simcall = simcalls_.front();
     simcalls_.pop_front();
index d15280c..286bf33 100644 (file)
@@ -46,7 +46,6 @@ public:
   virtual ActivityImpl* migrate(s4u::Host* to);
 
   ExecImpl* start();
-  void post() override;
   void set_exception(actor::ActorImpl* issuer) override;
   void finish() override;
 
index c7ef218..1a7e46f 100644 (file)
@@ -95,30 +95,6 @@ IoImpl* IoImpl::start()
   return this;
 }
 
-void IoImpl::post()
-{
-  if (model_action_ != nullptr) {
-    performed_ioops_ = model_action_->get_cost();
-    if (model_action_->get_state() == resource::Action::State::FAILED) {
-      if (host_ && dst_host_) { // this is an I/O stream
-        if (not host_->is_on())
-          set_state(State::SRC_HOST_FAILURE);
-        else if (not dst_host_->is_on())
-          set_state(State::DST_HOST_FAILURE);
-      } else if ((disk_ && not disk_->is_on()) || (dst_disk_ && not dst_disk_->is_on()))
-        set_state(State::FAILED);
-      else
-        set_state(State::CANCELED);
-    } else {
-      set_state(State::DONE);
-    }
-
-    clean_action();
-  }
-
-  /* Answer all simcalls associated with the synchro */
-  finish();
-}
 void IoImpl::set_exception(actor::ActorImpl* issuer)
 {
   switch (get_state()) {
@@ -148,6 +124,25 @@ void IoImpl::set_exception(actor::ActorImpl* issuer)
 void IoImpl::finish()
 {
   XBT_DEBUG("IoImpl::finish() in state %s", get_state_str());
+  if (model_action_ != nullptr) {
+    performed_ioops_ = model_action_->get_cost();
+    if (model_action_->get_state() == resource::Action::State::FAILED) {
+      if (host_ && dst_host_) { // this is an I/O stream
+        if (not host_->is_on())
+          set_state(State::SRC_HOST_FAILURE);
+        else if (not dst_host_->is_on())
+          set_state(State::DST_HOST_FAILURE);
+      } else if ((disk_ && not disk_->is_on()) || (dst_disk_ && not dst_disk_->is_on()))
+        set_state(State::FAILED);
+      else
+        set_state(State::CANCELED);
+    } else {
+      set_state(State::DONE);
+    }
+
+    clean_action();
+  }
+
   while (not simcalls_.empty()) {
     actor::Simcall* simcall = simcalls_.front();
     simcalls_.pop_front();
index 22e46a3..6562f2e 100644 (file)
@@ -40,7 +40,6 @@ public:
   resource::DiskImpl* get_disk() const { return disk_; }
 
   IoImpl* start();
-  void post() override;
   void set_exception(actor::ActorImpl* issuer) override;
   void finish() override;
 };
index 7bbb815..cb49f3b 100644 (file)
@@ -71,14 +71,14 @@ void MailboxImpl::remove(const CommImplPtr& comm)
 
 /** @brief Removes all communication activities from a mailbox
  */
-void MailboxImpl::clear( bool do_post )
+void MailboxImpl::clear(bool do_finish)
 {
   // CommImpl::cancel() will remove the comm from the mailbox..
   for (auto comm : done_comm_queue_) {
     comm->cancel();
     comm->set_state(State::FAILED);
-    if(do_post)
-      comm->post();
+    if (do_finish)
+      comm->finish();
   }
   done_comm_queue_.clear();
 
@@ -87,8 +87,8 @@ void MailboxImpl::clear( bool do_post )
     if (comm->get_state() == State::WAITING && not comm->is_detached()) {
       comm->cancel();
       comm->set_state(State::FAILED);
-      if(do_post)
-        comm->post();
+      if (do_finish)
+        comm->finish();
     } else
       comm_queue_.pop_back();
   }
index fb84db7..199fcd1 100644 (file)
@@ -53,7 +53,7 @@ public:
   void push(CommImplPtr comm);
   void push_done(CommImplPtr done_comm) { done_comm_queue_.push_back(done_comm); }
   void remove(const CommImplPtr& comm);
-  void clear(bool do_post );
+  void clear(bool do_finish);
   CommImplPtr iprobe(int type, const std::function<bool(void*, void*, CommImpl*)>& match_fun, void* data);
   CommImplPtr find_matching_comm(CommImplType type, const std::function<bool(void*, void*, CommImpl*)>& match_fun,
                                  void* this_user_data, const CommImplPtr& my_synchro, bool done, bool remove_matching);
index 1c9235b..43a511b 100644 (file)
@@ -25,15 +25,11 @@ void MutexAcquisitionImpl::wait_for(actor::ActorImpl* issuer, double timeout)
   this->register_simcall(&issuer_->simcall_); // Block on that acquisition
 
   if (mutex_->get_owner() == issuer_) { // I'm the owner
-    post();
+    finish();
   } else {
     // Already in the queue
   }
 }
-void MutexAcquisitionImpl::post()
-{
-  finish();
-}
 
 void MutexAcquisitionImpl::finish()
 {
@@ -99,7 +95,7 @@ void MutexImpl::unlock(actor::ActorImpl* issuer)
 
     owner_ = acq->get_issuer();
     if (acq == owner_->waiting_synchro_)
-      acq->post();
+      acq->finish();
     // else, the issuer is not blocked on this acquisition so no need to release it
 
   } else {
index 66851a6..3eebec2 100644 (file)
@@ -50,7 +50,6 @@ public:
 
   bool test(actor::ActorImpl* issuer = nullptr) override;
   void wait_for(actor::ActorImpl* issuer, double timeout) override;
-  void post() override;
   void finish() override;
   void set_exception(actor::ActorImpl* issuer) override
   { /* nothing to do */
index 2bb6396..8cba364 100644 (file)
@@ -29,7 +29,7 @@ void SemAcquisitionImpl::wait_for(actor::ActorImpl* issuer, double timeout)
   this->register_simcall(&issuer_->simcall_); // Block on that acquisition
 
   if (granted_) {
-    post();
+    finish();
   } else if (timeout > 0) {
     model_action_ = get_issuer()->get_host()->get_cpu()->sleep(timeout);
     model_action_->set_activity(this);
@@ -38,10 +38,6 @@ void SemAcquisitionImpl::wait_for(actor::ActorImpl* issuer, double timeout)
     // Already in the queue
   }
 }
-void SemAcquisitionImpl::post()
-{
-  finish();
-}
 void SemAcquisitionImpl::finish()
 {
   xbt_assert(simcalls_.size() == 1, "Unexpected number of simcalls waiting: %zu", simcalls_.size());
@@ -108,7 +104,7 @@ void SemaphoreImpl::release()
 
     acqui->granted_ = true;
     if (acqui == acqui->get_issuer()->waiting_synchro_)
-      acqui->post();
+      acqui->finish();
     // else, the issuer is not blocked on this acquisition so no need to release it
 
   } else {
index fb69aad..ebb439b 100644 (file)
@@ -35,7 +35,6 @@ public:
 
   bool test(actor::ActorImpl* issuer = nullptr) override { return granted_; }
   void wait_for(actor::ActorImpl* issuer, double timeout) override;
-  void post() override;
   void finish() override;
   void cancel() override;
   void set_exception(actor::ActorImpl* issuer) override
index 5c5b88b..6a529e8 100644 (file)
@@ -33,7 +33,11 @@ SleepImpl* SleepImpl::start()
   return this;
 }
 
-void SleepImpl::post()
+void SleepImpl::set_exception(actor::ActorImpl* issuer)
+{
+  /* FIXME: Really, nothing bad can happen while we sleep? */
+}
+void SleepImpl::finish()
 {
   if (model_action_->get_state() == resource::Action::State::FAILED) {
     if (host_ && not host_->is_on())
@@ -45,15 +49,6 @@ void SleepImpl::post()
   }
 
   clean_action();
-  /* Answer all simcalls associated with the synchro */
-  finish();
-}
-void SleepImpl::set_exception(actor::ActorImpl* issuer)
-{
-  /* FIXME: Really, nothing bad can happen while we sleep? */
-}
-void SleepImpl::finish()
-{
   XBT_DEBUG("SleepImpl::finish() in state %s", get_state_str());
   while (not simcalls_.empty()) {
     const actor::Simcall* simcall = simcalls_.front();
index eca98d2..42aed85 100644 (file)
@@ -17,7 +17,6 @@ class XBT_PUBLIC SleepImpl : public ActivityImpl_T<SleepImpl> {
 public:
   SleepImpl& set_host(s4u::Host* host);
   SleepImpl& set_duration(double duration);
-  void post() override;
   void set_exception(actor::ActorImpl* issuer) override;
   void finish() override;
   SleepImpl* start();
index 2067f07..aa57662 100644 (file)
@@ -49,17 +49,6 @@ void SynchroImpl::cancel()
   /* I cannot cancel raw synchros directly. */
 }
 
-void SynchroImpl::post()
-{
-  if (model_action_->get_state() == resource::Action::State::FAILED)
-    set_state(State::FAILED);
-  else if (model_action_->get_state() == resource::Action::State::FINISHED)
-    set_state(State::SRC_TIMEOUT);
-
-  clean_action();
-  /* Answer all simcalls associated with the synchro */
-  finish();
-}
 void SynchroImpl::set_exception(actor::ActorImpl* issuer)
 {
   if (get_state() == State::FAILED) {
@@ -74,6 +63,13 @@ void SynchroImpl::set_exception(actor::ActorImpl* issuer)
 void SynchroImpl::finish()
 {
   XBT_DEBUG("SynchroImpl::finish() in state %s", get_state_str());
+  if (model_action_->get_state() == resource::Action::State::FAILED)
+    set_state(State::FAILED);
+  else if (model_action_->get_state() == resource::Action::State::FINISHED)
+    set_state(State::SRC_TIMEOUT);
+
+  clean_action();
+
   xbt_assert(simcalls_.size() == 1, "Unexpected number of simcalls waiting: %zu", simcalls_.size());
   actor::Simcall* simcall = simcalls_.front();
   simcalls_.pop_front();
index de958cc..a2f8daa 100644 (file)
@@ -27,7 +27,6 @@ public:
   void suspend() override;
   void resume() override;
   void cancel() override;
-  void post() override;
   void set_exception(actor::ActorImpl* issuer) override;
   void finish() override;
 };
index 05de3b4..c2e2b48 100644 (file)
@@ -183,7 +183,7 @@ void ActorImpl::exit()
     activity::ActivityImplPtr activity = waiting_synchro_;
     activity->cancel();
     activity->set_state(activity::State::FAILED);
-    activity->post();
+    activity->finish();
 
     activities_.erase(waiting_synchro_);
     waiting_synchro_ = nullptr;
index 9ea3b10..9e76966 100644 (file)
@@ -35,7 +35,7 @@ ExecPtr Exec::init()
     if (not h.is_on() && pimpl->get_state() == kernel::activity::State::RUNNING &&
         std::find(pimpl->get_hosts().begin(), pimpl->get_hosts().end(), &h) != pimpl->get_hosts().end()) {
       pimpl->set_state(kernel::activity::State::FAILED);
-      pimpl->post();
+      pimpl->finish();
     }
   });
   pimpl->set_cb_id(cb_id);