static void test()
{
+ std::vector<simgrid::s4u::IoPtr> pending_ios;
+
simgrid::s4u::ExecPtr bob_compute = simgrid::s4u::this_actor::exec_init(1e9);
simgrid::s4u::IoPtr bob_write =
simgrid::s4u::Host::current()->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::WRITE);
+ pending_ios.push_back(bob_write);
simgrid::s4u::IoPtr carl_read =
simgrid::s4u::Host::by_name("carl")->get_disks().front()->io_init(4000000, simgrid::s4u::Io::OpType::READ);
+ pending_ios.push_back(carl_read);
simgrid::s4u::ExecPtr carl_compute = simgrid::s4u::Host::by_name("carl")->exec_init(1e9);
// Name the activities (for logging purposes only)
carl_read->vetoable_start();
carl_compute->vetoable_start();
- // Wait for their completion (should be replaced by a wait_any_for at some point)
+ // wait for the completion of all activities
bob_compute->wait();
- bob_write->wait();
- carl_read->wait();
+ while (not pending_ios.empty()) {
+ int changed_pos = simgrid::s4u::Io::wait_any(&pending_ios);
+ XBT_INFO("Io '%s' is complete", pending_ios[changed_pos]->get_cname());
+ pending_ios.erase(pending_ios.begin() + changed_pos);
+ }
carl_compute->wait();
}
> [ 1.000000] (1:bob@bob) 'bob write' is assigned to a resource and all dependencies are solved. Let's start
> [ 1.000000] (1:bob@bob) Remove a dependency from 'bob compute' on 'bob write'
> [ 1.100000] (1:bob@bob) 'carl read' is assigned to a resource and all dependencies are solved. Let's start
+> [ 1.100000] (1:bob@bob) Io 'bob write' is complete
> [ 1.100000] (1:bob@bob) Remove a dependency from 'bob write' on 'carl read'
+> [ 1.140000] (1:bob@bob) Io 'carl read' is complete
> [ 1.140000] (1:bob@bob) 'carl compute' is assigned to a resource and all dependencies are solved. Let's start
> [ 1.140000] (1:bob@bob) Remove a dependency from 'carl read' on 'carl compute'
> [ 2.140000] (0:maestro@) Simulation time 2.14
static IoPtr init();
Io* start() override;
+ /*! take a vector of s4u::IoPtr and return when one of them is finished.
+ * The return value is the rank of the first finished IoPtr. */
+ static int wait_any(std::vector<IoPtr>* ios) { return wait_any_for(ios, -1); }
+ /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+ static int wait_any_for(std::vector<IoPtr>* ios, double timeout);
double get_remaining() const override;
sg_size_t get_performed_ioops() const;
#include "simgrid/kernel/resource/Action.hpp"
#include "simgrid/s4u/Host.hpp"
#include "simgrid/s4u/Io.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
#include "src/kernel/resource/DiskImpl.hpp"
#include "src/mc/mc_replay.hpp"
#include "src/simix/smx_private.hpp"
{
XBT_DEBUG("IoImpl::finish() in state %s", to_c_str(state_));
while (not simcalls_.empty()) {
- const s_smx_simcall* simcall = simcalls_.front();
+ smx_simcall_t simcall = simcalls_.front();
simcalls_.pop_front();
+
+ /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+ * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+ * simcall */
+
+ if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
+ continue; // if process handling comm is killed
+ if (auto* observer = dynamic_cast<kernel::actor::IoWaitanySimcall*>(simcall->observer_)) { // simcall is a wait_any?
+ const auto& ios = observer->get_ios();
+
+ for (auto* io : ios) {
+ io->unregister_simcall(simcall);
+
+ if (simcall->timeout_cb_) {
+ simcall->timeout_cb_->remove();
+ simcall->timeout_cb_ = nullptr;
+ }
+ }
+
+ if (not MC_is_active() && not MC_record_replay_is_active()) {
+ auto element = std::find(ios.begin(), ios.end(), this);
+ int rank = element != ios.end() ? static_cast<int>(std::distance(ios.begin(), element)) : -1;
+ observer->set_result(rank);
+ }
+ }
+
switch (state_) {
case State::FAILED:
simcall->issuer_->context_->set_wannadie();
}
}
+void IoImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout)
+{
+ if (timeout < 0.0) {
+ issuer->simcall_.timeout_cb_ = nullptr;
+ } else {
+ issuer->simcall_.timeout_cb_ = simix::Timer::set(SIMIX_get_clock() + timeout, [issuer, &ios]() {
+ issuer->simcall_.timeout_cb_ = nullptr;
+ for (auto* io : ios)
+ io->unregister_simcall(&issuer->simcall_);
+ // default result (-1) is set in actor::IoWaitanySimcall
+ issuer->simcall_answer();
+ });
+ }
+
+ for (auto* io : ios) {
+ /* associate this simcall to the the synchro */
+ io->simcalls_.push_back(&issuer->simcall_);
+
+ /* see if the synchro is already finished */
+ if (io->state_ != State::WAITING && io->state_ != State::RUNNING) {
+ io->finish();
+ break;
+ }
+ }
+}
+
} // namespace activity
} // namespace kernel
} // namespace simgrid
IoImpl* start();
void post() override;
void finish() override;
+ static void wait_any_for(actor::ActorImpl* issuer, const std::vector<IoImpl*>& ios, double timeout);
};
} // namespace activity
} // namespace kernel
{
return SimcallObserver::dot_label() + "Execution WAITANY";
}
+
+std::string IoWaitanySimcall::to_string(int times_considered) const
+{
+ std::string res = SimcallObserver::to_string(times_considered) + "I/O WAITANY";
+ res += "(" + (timeout_ == -1.0 ? "" : std::to_string(timeout_)) + ")";
+ return res;
+}
+
+std::string IoWaitanySimcall::dot_label() const
+{
+ return SimcallObserver::dot_label() + "I/O WAITANY";
+}
+
} // namespace actor
} // namespace kernel
} // namespace simgrid
const std::vector<activity::ExecImpl*>& get_execs() const { return execs_; }
double get_timeout() const { return timeout_; }
};
+
+class IoWaitanySimcall : public ResultingSimcall<int> {
+ const std::vector<activity::IoImpl*>& ios_;
+ const double timeout_;
+
+public:
+ IoWaitanySimcall(smx_actor_t actor, const std::vector<activity::IoImpl*>& ios, double timeout)
+ : ResultingSimcall(actor, -1), ios_(ios), timeout_(timeout)
+ {
+ }
+ bool is_visible() const override { return false; }
+ std::string to_string(int times_considered) const override;
+ std::string dot_label() const override;
+ const std::vector<activity::IoImpl*>& get_ios() const { return ios_; }
+ double get_timeout() const { return timeout_; }
+};
} // namespace actor
} // namespace kernel
} // namespace simgrid
#include "simgrid/s4u/Io.hpp"
#include "src/kernel/activity/IoImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
#include "xbt/log.h"
namespace simgrid {
return this;
}
+int Io::wait_any_for(std::vector<IoPtr>* ios, double timeout)
+{
+ std::vector<kernel::activity::IoImpl*> rios(ios->size());
+ std::transform(begin(*ios), end(*ios), begin(rios),
+ [](const IoPtr& io) { return static_cast<kernel::activity::IoImpl*>(io->pimpl_.get()); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::IoWaitanySimcall observer{issuer, rios, timeout};
+ int changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::IoImpl::wait_any_for(observer.get_issuer(), observer.get_ios(), observer.get_timeout());
+ },
+ &observer);
+ if (changed_pos != -1)
+ ios->at(changed_pos)->complete(State::FINISHED);
+ return changed_pos;
+}
+
IoPtr Io::set_disk(const_sg_disk_t disk)
{
xbt_assert(state_ == State::INITED || state_ == State::STARTING, "Cannot set disk once the Io is started");