Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add a (modern) wait_any for Io activities. Thx agier!
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 27 Apr 2021 13:39:39 +0000 (15:39 +0200)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Tue, 27 Apr 2021 13:39:39 +0000 (15:39 +0200)
examples/cpp/io-dependent/s4u-io-dependent.cpp
examples/cpp/io-dependent/s4u-io-dependent.tesh
include/simgrid/s4u/Io.hpp
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp
src/kernel/actor/SimcallObserver.cpp
src/kernel/actor/SimcallObserver.hpp
src/s4u/s4u_Io.cpp

index 5b296f3..62fce80 100644 (file)
@@ -11,11 +11,15 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example")
 
 static void test()
 {
+  std::vector<simgrid::s4u::IoPtr> pending_ios;
+
   simgrid::s4u::ExecPtr bob_compute = simgrid::s4u::this_actor::exec_init(1e9);
   simgrid::s4u::IoPtr bob_write =
       simgrid::s4u::Host::current()->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::WRITE);
+  pending_ios.push_back(bob_write);
   simgrid::s4u::IoPtr carl_read =
       simgrid::s4u::Host::by_name("carl")->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::READ);
+  pending_ios.push_back(carl_read);
   simgrid::s4u::ExecPtr carl_compute = simgrid::s4u::Host::by_name("carl")->exec_init(1e9);
 
   // Name the activities (for logging purposes only)
@@ -38,10 +42,13 @@ static void test()
   carl_read->vetoable_start();
   carl_compute->vetoable_start();
 
-  // Wait for their completion (should be replaced by a wait_any_for at some point)
+  // wait for the completion of all activities
   bob_compute->wait();
-  bob_write->wait();
-  carl_read->wait();
+  while (not pending_ios.empty()) {
+    int changed_pos = simgrid::s4u::Io::wait_any(&pending_ios);
+    XBT_INFO("Io '%s' is complete", pending_ios[changed_pos]->get_cname());
+    pending_ios.erase(pending_ios.begin() + changed_pos);
+  }
   carl_compute->wait();
 }
 
index ba01e1d..c9b9ecc 100644 (file)
@@ -5,7 +5,9 @@ $ ${bindir:=.}/s4u-io-dependent ${platfdir}/hosts_with_disks.xml --log=s4u_activ
 > [  1.000000] (1:bob@bob) 'bob write' is assigned to a resource and all dependencies are solved. Let's start
 > [  1.000000] (1:bob@bob) Remove a dependency from 'bob compute' on 'bob write'
 > [  1.100000] (1:bob@bob) 'carl read' is assigned to a resource and all dependencies are solved. Let's start
+> [  1.100000] (1:bob@bob) Io 'bob write' is complete
 > [  1.100000] (1:bob@bob) Remove a dependency from 'bob write' on 'carl read'
+> [  1.140000] (1:bob@bob) Io 'carl read' is complete
 > [  1.140000] (1:bob@bob) 'carl compute' is assigned to a resource and all dependencies are solved. Let's start
 > [  1.140000] (1:bob@bob) Remove a dependency from 'carl read' on 'carl compute'
 > [  2.140000] (0:maestro@) Simulation time 2.14
index 5752373..cb08c3d 100644 (file)
@@ -35,6 +35,11 @@ public:
 
   static IoPtr init();
   Io* start() override;
+  /*! take a vector of s4u::IoPtr and return when one of them is finished.
+   * The return value is the rank of the first finished IoPtr. */
+  static int wait_any(std::vector<IoPtr>* ios) { return wait_any_for(ios, -1); }
+  /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+  static int wait_any_for(std::vector<IoPtr>* ios, double timeout);
 
   double get_remaining() const override;
   sg_size_t get_performed_ioops() const;
index 527aa19..425ae10 100644 (file)
@@ -8,6 +8,7 @@
 #include "simgrid/kernel/resource/Action.hpp"
 #include "simgrid/s4u/Host.hpp"
 #include "simgrid/s4u/Io.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
 #include "src/kernel/resource/DiskImpl.hpp"
 #include "src/mc/mc_replay.hpp"
 #include "src/simix/smx_private.hpp"
@@ -94,8 +95,34 @@ void IoImpl::finish()
 {
   XBT_DEBUG("IoImpl::finish() in state %s", to_c_str(state_));
   while (not simcalls_.empty()) {
-    const s_smx_simcall* simcall = simcalls_.front();
+    smx_simcall_t simcall = simcalls_.front();
     simcalls_.pop_front();
+
+    /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+     * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+     * simcall */
+
+    if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
+      continue;                                 // if process handling comm is killed
+    if (auto* observer = dynamic_cast<kernel::actor::IoWaitanySimcall*>(simcall->observer_)) { // simcall is a wait_any?
+      const auto& ios = observer->get_ios();
+
+      for (auto* io : ios) {
+        io->unregister_simcall(simcall);
+
+        if (simcall->timeout_cb_) {
+          simcall->timeout_cb_->remove();
+          simcall->timeout_cb_ = nullptr;
+        }
+      }
+
+      if (not MC_is_active() && not MC_record_replay_is_active()) {
+        auto element = std::find(ios.begin(), ios.end(), this);
+        int rank     = element != ios.end() ? static_cast<int>(std::distance(ios.begin(), element)) : -1;
+        observer->set_result(rank);
+      }
+    }
+
     switch (state_) {
       case State::FAILED:
         simcall->issuer_->context_->set_wannadie();
@@ -118,6 +145,32 @@ void IoImpl::finish()
   }
 }
 
+void IoImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout)
+{
+  if (timeout < 0.0) {
+    issuer->simcall_.timeout_cb_ = nullptr;
+  } else {
+    issuer->simcall_.timeout_cb_ = simix::Timer::set(SIMIX_get_clock() + timeout, [issuer, &ios]() {
+      issuer->simcall_.timeout_cb_ = nullptr;
+      for (auto* io : ios)
+        io->unregister_simcall(&issuer->simcall_);
+      // default result (-1) is set in actor::IoWaitanySimcall
+      issuer->simcall_answer();
+    });
+  }
+
+  for (auto* io : ios) {
+    /* associate this simcall to the the synchro */
+    io->simcalls_.push_back(&issuer->simcall_);
+
+    /* see if the synchro is already finished */
+    if (io->state_ != State::WAITING && io->state_ != State::RUNNING) {
+      io->finish();
+      break;
+    }
+  }
+}
+
 } // namespace activity
 } // namespace kernel
 } // namespace simgrid
index fc4df57..758d9ec 100644 (file)
@@ -37,6 +37,7 @@ public:
   IoImpl* start();
   void post() override;
   void finish() override;
+  static void wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout);
 };
 } // namespace activity
 } // namespace kernel
index bb763ef..11e9e2c 100644 (file)
@@ -132,6 +132,19 @@ std::string ExecutionWaitanySimcall::dot_label() const
 {
   return SimcallObserver::dot_label() + "Execution WAITANY";
 }
+
+std::string IoWaitanySimcall::to_string(int times_considered) const
+{
+  std::string res = SimcallObserver::to_string(times_considered) + "I/O WAITANY";
+  res += "(" + (timeout_ == -1.0 ? "" : std::to_string(timeout_)) + ")";
+  return res;
+}
+
+std::string IoWaitanySimcall::dot_label() const
+{
+  return SimcallObserver::dot_label() + "I/O WAITANY";
+}
+
 } // namespace actor
 } // namespace kernel
 } // namespace simgrid
index d386a30..a1a5936 100644 (file)
@@ -153,6 +153,22 @@ public:
   const std::vector<activity::ExecImpl*>& get_execs() const { return execs_; }
   double get_timeout() const { return timeout_; }
 };
+
+class IoWaitanySimcall : public ResultingSimcall<int> {
+  const std::vector<activity::IoImpl*>& ios_;
+  const double timeout_;
+
+public:
+  IoWaitanySimcall(smx_actor_t actor, const std::vector<activity::IoImpl*>& ios, double timeout)
+      : ResultingSimcall(actor, -1), ios_(ios), timeout_(timeout)
+  {
+  }
+  bool is_visible() const override { return false; }
+  std::string to_string(int times_considered) const override;
+  std::string dot_label() const override;
+  const std::vector<activity::IoImpl*>& get_ios() const { return ios_; }
+  double get_timeout() const { return timeout_; }
+};
 } // namespace actor
 } // namespace kernel
 } // namespace simgrid
index 733b481..ef4846c 100644 (file)
@@ -8,6 +8,7 @@
 #include "simgrid/s4u/Io.hpp"
 #include "src/kernel/activity/IoImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
 #include "xbt/log.h"
 
 namespace simgrid {
@@ -45,6 +46,24 @@ Io* Io::start()
   return this;
 }
 
+int Io::wait_any_for(std::vector<IoPtr>* ios, double timeout)
+{
+  std::vector<kernel::activity::IoImpl*> rios(ios->size());
+  std::transform(begin(*ios), end(*ios), begin(rios),
+                 [](const IoPtr& io) { return static_cast<kernel::activity::IoImpl*>(io->pimpl_.get()); });
+
+  kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+  kernel::actor::IoWaitanySimcall observer{issuer, rios, timeout};
+  int changed_pos = kernel::actor::simcall_blocking(
+      [&observer] {
+        kernel::activity::IoImpl::wait_any_for(observer.get_issuer(), observer.get_ios(), observer.get_timeout());
+      },
+      &observer);
+  if (changed_pos != -1)
+    ios->at(changed_pos)->complete(State::FINISHED);
+  return changed_pos;
+}
+
 IoPtr Io::set_disk(const_sg_disk_t disk)
 {
   xbt_assert(state_ == State::INITED || state_ == State::STARTING, "Cannot set disk once the Io is started");