Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Define overridable Activity::complete() to be called on activity completion.
authorArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Wed, 21 Apr 2021 07:57:56 +0000 (09:57 +0200)
committerArnaud Giersch <arnaud.giersch@univ-fcomte.fr>
Thu, 22 Apr 2021 08:07:32 +0000 (10:07 +0200)
This adds some assignments of State::FINISHED, but it looks like they were
previously forgotten.  No test is broken.

include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Exec.hpp
include/simgrid/s4u/Io.hpp
src/s4u/s4u_Activity.cpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp
src/s4u/s4u_Io.cpp

index 4608f49..74ca77e 100644 (file)
@@ -32,12 +32,23 @@ class XBT_PUBLIC Activity {
   friend Exec;
   friend Io;
 
+public:
+  // enum class State { ... }
+  XBT_DECLARE_ENUM_CLASS(State, INITED, STARTING, STARTED, CANCELED, FINISHED);
+
 protected:
   Activity()  = default;
   virtual ~Activity() = default;
 
   virtual bool is_assigned() const = 0;
 
+  virtual void complete(Activity::State state)
+  {
+    state_ = state;
+    if (state == State::FINISHED)
+      release_dependencies();
+  }
+
   void release_dependencies()
   {
     while (not successors_.empty()) {
@@ -91,9 +102,6 @@ public:
   Activity& operator=(Activity const&) = delete;
 #endif
 
-  // enum class State { ... }
-  XBT_DECLARE_ENUM_CLASS(State, INITED, STARTING, STARTED, CANCELED, FINISHED);
-
   /** Starts a previously created activity.
    *
    * This function is optional: you can call wait() even if you didn't call start()
index 6e33e01..47668bf 100644 (file)
@@ -37,6 +37,9 @@ class XBT_PUBLIC Comm : public Activity_T<Comm> {
 
   Comm() = default;
 
+protected:
+  void complete(Activity::State state) override;
+
 public:
 #ifndef DOXYGEN
   friend Mailbox; // Factory of comms
index ae3e534..9a51dd0 100644 (file)
@@ -38,6 +38,8 @@ class XBT_PUBLIC Exec : public Activity_T<Exec> {
 protected:
   explicit Exec(kernel::activity::ExecImplPtr pimpl);
 
+  void complete(Activity::State state) override;
+
 public:
 #ifndef DOXYGEN
   Exec(Exec const&) = delete;
index 6137056..685f555 100644 (file)
@@ -25,6 +25,8 @@ class XBT_PUBLIC Io : public Activity_T<Io> {
 protected:
   explicit Io(kernel::activity::IoImplPtr pimpl);
 
+  void complete(Activity::State state) override;
+
 public:
   enum class OpType { READ, WRITE };
 
index 55bba1e..002035c 100644 (file)
@@ -34,8 +34,7 @@ bool Activity::test()
     this->vetoable_start();
 
   if (kernel::actor::simcall([this] { return this->get_impl()->test(); })) {
-    state_ = State::FINISHED;
-    this->release_dependencies();
+    complete(State::FINISHED);
     return true;
   }
 
index 23eaf0d..039dbc8 100644 (file)
@@ -19,6 +19,12 @@ namespace s4u {
 xbt::signal<void(Comm const&, bool is_sender)> Comm::on_start;
 xbt::signal<void(Comm const&)> Comm::on_completion;
 
+void Comm::complete(Activity::State state)
+{
+  Activity::complete(state);
+  on_completion(*this);
+}
+
 Comm::~Comm()
 {
   if (state_ == State::STARTED && not detached_ &&
@@ -38,10 +44,8 @@ int Comm::wait_any_for(const std::vector<CommPtr>* comms, double timeout)
   std::transform(begin(*comms), end(*comms), begin(rcomms),
                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
   int changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
-  if (changed_pos != -1) {
-    on_completion(*(comms->at(changed_pos)));
-    comms->at(changed_pos)->release_dependencies();
-  }
+  if (changed_pos != -1)
+    comms->at(changed_pos)->complete(State::FINISHED);
   return changed_pos;
 }
 
@@ -210,14 +214,10 @@ Comm* Comm::wait_for(double timeout)
         simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
                           get_user_data(), timeout, rate_);
       }
-      state_ = State::FINISHED;
-      this->release_dependencies();
       break;
 
     case State::STARTED:
       simcall_comm_wait(get_impl(), timeout);
-      state_ = State::FINISHED;
-      this->release_dependencies();
       break;
 
     case State::CANCELED:
@@ -226,7 +226,7 @@ Comm* Comm::wait_for(double timeout)
     default:
       THROW_IMPOSSIBLE;
   }
-  on_completion(*this);
+  complete(State::FINISHED);
   return this;
 }
 
@@ -237,7 +237,7 @@ int Comm::test_any(const std::vector<CommPtr>* comms)
                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
   int changed_pos = simcall_comm_testany(rcomms.data(), rcomms.size());
   if (changed_pos != -1)
-    comms->at(changed_pos)->release_dependencies();
+    comms->at(changed_pos)->complete(State::FINISHED);
   return changed_pos;
 }
 
@@ -257,7 +257,7 @@ Comm* Comm::cancel()
     if (pimpl_)
       boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->cancel();
   });
-  state_ = State::CANCELED;
+  complete(State::CANCELED);
   return this;
 }
 
@@ -273,8 +273,7 @@ bool Comm::test()
     this->vetoable_start();
 
   if (simcall_comm_test(get_impl())) {
-    state_ = State::FINISHED;
-    this->release_dependencies();
+    complete(State::FINISHED);
     return true;
   }
   return false;
index 192b8d2..324e2ab 100644 (file)
@@ -24,6 +24,12 @@ Exec::Exec(kernel::activity::ExecImplPtr pimpl)
   pimpl_ = pimpl;
 }
 
+void Exec::complete(Activity::State state)
+{
+  Activity::complete(state);
+  on_completion(*this);
+}
+
 ExecPtr Exec::init()
 {
   auto pimpl = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl());
@@ -60,9 +66,7 @@ Exec* Exec::wait_for(double timeout)
 
   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
   kernel::actor::simcall_blocking([this, issuer, timeout] { this->get_impl()->wait_for(issuer, timeout); });
-  state_ = State::FINISHED;
-  on_completion(*this);
-  this->release_dependencies();
+  complete(State::FINISHED);
   return this;
 }
 
@@ -79,18 +83,15 @@ int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
         kernel::activity::ExecImpl::wait_any_for(observer.get_issuer(), observer.get_execs(), observer.get_timeout());
       },
       &observer);
-  if (changed_pos != -1) {
-    on_completion(*(execs->at(changed_pos)));
-    execs->at(changed_pos)->release_dependencies();
-  }
+  if (changed_pos != -1)
+    execs->at(changed_pos)->complete(State::FINISHED);
   return changed_pos;
 }
 
 Exec* Exec::cancel()
 {
   kernel::actor::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
-  state_ = State::CANCELED;
-  on_completion(*this);
+  complete(State::CANCELED);
   return this;
 }
 
index 8af99e8..570070d 100644 (file)
@@ -20,6 +20,12 @@ Io::Io(kernel::activity::IoImplPtr pimpl)
   pimpl_ = pimpl;
 }
 
+void Io::complete(Activity::State state)
+{
+  Activity::complete(state);
+  on_completion(*this);
+}
+
 IoPtr Io::init()
 {
   auto pimpl = kernel::activity::IoImplPtr(new kernel::activity::IoImpl());
@@ -42,8 +48,7 @@ Io* Io::start()
 Io* Io::cancel()
 {
   kernel::actor::simcall([this] { boost::static_pointer_cast<kernel::activity::IoImpl>(pimpl_)->cancel(); });
-  state_ = State::CANCELED;
-  on_completion(*this);
+  complete(State::CANCELED);
   return this;
 }
 
@@ -59,10 +64,7 @@ Io* Io::wait_for(double timeout)
 
   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
   kernel::actor::simcall_blocking([this, issuer, timeout] { this->get_impl()->wait_for(issuer, timeout); });
-  state_ = State::FINISHED;
-  this->release_dependencies();
-
-  on_completion(*this);
+  complete(state_ = State::FINISHED);
   return this;
 }