From 6c98d311c94b09a185a36f90f7a4a56a6a70298a Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Fri, 13 Dec 2019 10:46:54 +0100 Subject: [PATCH] add waitfor of Io too --- ChangeLog | 1 + examples/s4u/io-async/s4u-io-async.cpp | 17 +++++++++++ examples/s4u/io-async/s4u-io-async.tesh | 11 ++++--- include/simgrid/simix.h | 2 +- src/kernel/activity/IoImpl.cpp | 39 +++++++++++++++++++++++-- src/kernel/activity/IoImpl.hpp | 3 ++ src/s4u/s4u_Io.cpp | 10 +++---- src/simix/libsmx.cpp | 4 +-- src/simix/popping_accessors.hpp | 14 ++++++++- src/simix/popping_bodies.cpp | 6 ++-- src/simix/popping_generated.cpp | 2 +- src/simix/simcalls.in | 2 +- 12 files changed, 90 insertions(+), 21 deletions(-) diff --git a/ChangeLog b/ChangeLog index 89cc700996..b8acaf6e51 100644 --- a/ChangeLog +++ b/ChangeLog @@ -7,6 +7,7 @@ S4U: - Actor: Rename migrate() into set_host() - Disk: Allow users to get the read and write nominal bandwidth values - Exec: Implement wait_for(timeout) +- Io: Implement wait_for(timeout) XML: - Parse errors now raise a simgrid::ParseError that you may want to catch. diff --git a/examples/s4u/io-async/s4u-io-async.cpp b/examples/s4u/io-async/s4u-io-async.cpp index 597284fcc9..8819f759aa 100644 --- a/examples/s4u/io-async/s4u-io-async.cpp +++ b/examples/s4u/io-async/s4u-io-async.cpp @@ -19,9 +19,25 @@ static void test(sg_size_t size) XBT_INFO("Goodbye now!"); } +static void test_waitfor(sg_size_t size) +{ + simgrid::s4u::Disk* disk = simgrid::s4u::Host::current()->get_disks().front(); + XBT_INFO("Hello! write %llu bytes from %s", size, disk->get_cname()); + + simgrid::s4u::IoPtr activity = disk->write_async(size); + try { + activity->wait_for(0.5); + } catch (simgrid::TimeoutException&) { + XBT_INFO("Asynchronous write: Timeout!"); + } + + XBT_INFO("Goodbye now!"); +} + static void test_cancel(sg_size_t size) { simgrid::s4u::Disk* disk = simgrid::s4u::Host::current()->get_disks().front(); + simgrid::s4u::this_actor::sleep_for(0.5); XBT_INFO("Hello! write %llu bytes from %s", size, disk->get_cname()); simgrid::s4u::IoPtr activity = disk->write_async(size); @@ -37,6 +53,7 @@ int main(int argc, char* argv[]) simgrid::s4u::Engine e(&argc, argv); e.load_platform(argv[1]); simgrid::s4u::Actor::create("test", simgrid::s4u::Host::by_name("bob"), test, 2e7); + simgrid::s4u::Actor::create("test_waitfor", simgrid::s4u::Host::by_name("alice"), test_waitfor, 5e7); simgrid::s4u::Actor::create("test_cancel", simgrid::s4u::Host::by_name("alice"), test_cancel, 5e7); e.run(); diff --git a/examples/s4u/io-async/s4u-io-async.tesh b/examples/s4u/io-async/s4u-io-async.tesh index 0ff50eb845..3b6c006543 100644 --- a/examples/s4u/io-async/s4u-io-async.tesh +++ b/examples/s4u/io-async/s4u-io-async.tesh @@ -2,8 +2,11 @@ $ ${bindir:=.}/s4u-io-async ${platfdir}/hosts_with_disks.xml "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" > [ 0.000000] (1:test@bob) Hello! read 20000000 bytes from Disk1 -> [ 0.000000] (2:test_cancel@alice) Hello! write 50000000 bytes from Disk1 +> [ 0.000000] (2:test_waitfor@alice) Hello! write 50000000 bytes from Disk1 > [ 0.200000] (1:test@bob) Goodbye now! -> [ 0.500000] (2:test_cancel@alice) I changed my mind, cancel! -> [ 0.500000] (2:test_cancel@alice) Goodbye now! -> [ 0.500000] (0:maestro@) Simulation time 0.5 +> [ 0.500000] (2:test_waitfor@alice) Asynchronous write: Timeout! +> [ 0.500000] (2:test_waitfor@alice) Goodbye now! +> [ 0.500000] (3:test_cancel@alice) Hello! write 50000000 bytes from Disk1 +> [ 1.000000] (3:test_cancel@alice) I changed my mind, cancel! +> [ 1.000000] (3:test_cancel@alice) Goodbye now! +> [ 1.000000] (0:maestro@) Simulation time 1 diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index 2e69f6ab94..cf9e3e0afa 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -191,7 +191,7 @@ SG_END_DECL /***************************** Io **************************************/ #ifdef __cplusplus -XBT_PUBLIC e_smx_state_t simcall_io_wait(const smx_activity_t& io); +XBT_PUBLIC e_smx_state_t simcall_io_wait(const smx_activity_t& io, double timeout); #endif /************************** MC simcalls **********************************/ SG_BEGIN_DECL diff --git a/src/kernel/activity/IoImpl.cpp b/src/kernel/activity/IoImpl.cpp index 783616f9b8..338999b786 100644 --- a/src/kernel/activity/IoImpl.cpp +++ b/src/kernel/activity/IoImpl.cpp @@ -11,22 +11,43 @@ #include "src/mc/mc_replay.hpp" #include "src/simix/smx_private.hpp" #include "src/surf/StorageImpl.hpp" +#include "src/surf/cpu_interface.hpp" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, "Logging specific to SIMIX (io)"); -void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* synchro) +void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* synchro, double timeout) { XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro, (int)synchro->state_); /* Associate this simcall to the synchro */ synchro->register_simcall(simcall); - if (MC_is_active() || MC_record_replay_is_active()) - synchro->state_ = simgrid::kernel::activity::State::DONE; + if (MC_is_active() || MC_record_replay_is_active()) { + int idx = SIMCALL_GET_MC_VALUE(*simcall); + if (idx == 0) { + synchro->state_ = simgrid::kernel::activity::State::DONE; + } else { + /* If we reached this point, the wait simcall must have a timeout */ + /* Otherwise it shouldn't be enabled and executed by the MC */ + if (timeout < 0.0) + THROW_IMPOSSIBLE; + synchro->state_ = simgrid::kernel::activity::State::TIMEOUT; + } + synchro->finish(); + } /* If the synchro is already finished then perform the error handling */ if (synchro->state_ != simgrid::kernel::activity::State::RUNNING) synchro->finish(); + else { + /* we need a sleep action (even when there is no timeout) to be notified of host failures */ + if (synchro->get_disk() != nullptr) + synchro->timeout_detector_ = synchro->get_disk()->get_host()->pimpl_cpu->sleep(timeout); + else + synchro->timeout_detector_ = + simgrid::s4u::Host::by_name(synchro->get_storage()->get_host())->pimpl_cpu->sleep(timeout); + synchro->timeout_detector_->set_activity(synchro); + } } namespace simgrid { @@ -82,7 +103,15 @@ void IoImpl::post() state_ = State::CANCELED; } else if (surf_action_->get_state() == resource::Action::State::FINISHED) { state_ = State::DONE; + } else if (timeout_detector_ && timeout_detector_->get_state() == resource::Action::State::FINISHED) { + state_ = State::TIMEOUT; } + + if (timeout_detector_) { + timeout_detector_->unref(); + timeout_detector_ = nullptr; + } + on_completion(*this); /* Answer all simcalls associated with the synchro */ @@ -106,6 +135,10 @@ void IoImpl::finish() case State::CANCELED: simcall->issuer_->exception_ = std::make_exception_ptr(CancelException(XBT_THROW_POINT, "I/O Canceled")); break; + case State::TIMEOUT: + XBT_DEBUG("IoImpl::finish(): execution timeouted"); + simcall->issuer_->exception_ = std::make_exception_ptr(simgrid::TimeoutException(XBT_THROW_POINT, "Timeouted")); + break; default: xbt_die("Internal error in IoImpl::finish(): unexpected synchro state %d", static_cast(state_)); } diff --git a/src/kernel/activity/IoImpl.hpp b/src/kernel/activity/IoImpl.hpp index cbc2f0d185..aaa715dad2 100644 --- a/src/kernel/activity/IoImpl.hpp +++ b/src/kernel/activity/IoImpl.hpp @@ -28,11 +28,14 @@ public: IoImpl& set_disk(resource::DiskImpl* disk); sg_size_t get_performed_ioops() { return performed_ioops_; } + resource::DiskImpl* get_disk() { return disk_; } + resource::StorageImpl* get_storage() { return storage_; } IoImpl* start(); void post() override; void finish() override; + resource::Action* timeout_detector_ = nullptr; static xbt::signal on_start; static xbt::signal on_completion; }; diff --git a/src/s4u/s4u_Io.cpp b/src/s4u/s4u_Io.cpp index cca68b0156..d02bce14a7 100644 --- a/src/s4u/s4u_Io.cpp +++ b/src/s4u/s4u_Io.cpp @@ -56,14 +56,14 @@ Io* Io::cancel() Io* Io::wait() { - simcall_io_wait(pimpl_); - state_ = State::FINISHED; - return this; + return this->wait_for(-1); } -Io* Io::wait_for(double) +Io* Io::wait_for(double timeout) { - THROW_UNIMPLEMENTED; + simcall_io_wait(pimpl_, timeout); + state_ = State::FINISHED; + return this; } bool Io::test() diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index 3616ba05e1..5cc3f8c8e5 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -303,9 +303,9 @@ int simcall_sem_acquire_timeout(smx_sem_t sem, double timeout) return simcall_BODY_sem_acquire_timeout(sem, timeout); } -e_smx_state_t simcall_io_wait(const smx_activity_t& io) +e_smx_state_t simcall_io_wait(const smx_activity_t& io, double timeout) { - return (e_smx_state_t)simcall_BODY_io_wait(static_cast(io.get())); + return (e_smx_state_t)simcall_BODY_io_wait(static_cast(io.get()), timeout); } void simcall_run_kernel(std::function const& code, simgrid::mc::SimcallInspector* t) diff --git a/src/simix/popping_accessors.hpp b/src/simix/popping_accessors.hpp index 96328c22e2..a3c8dd01a6 100644 --- a/src/simix/popping_accessors.hpp +++ b/src/simix/popping_accessors.hpp @@ -933,6 +933,18 @@ static inline void simcall_io_wait__set__io(smx_simcall_t simcall, simgrid::kern { simgrid::simix::marshal(simcall->args_[0], arg); } +static inline double simcall_io_wait__get__timeout(smx_simcall_t simcall) +{ + return simgrid::simix::unmarshal(simcall->args_[1]); +} +static inline double simcall_io_wait__getraw__timeout(smx_simcall_t simcall) +{ + return simgrid::simix::unmarshal_raw(simcall->args_[1]); +} +static inline void simcall_io_wait__set__timeout(smx_simcall_t simcall, double arg) +{ + simgrid::simix::marshal(simcall->args_[1], arg); +} static inline sg_size_t simcall_io_wait__get__result(smx_simcall_t simcall) { return simgrid::simix::unmarshal(simcall->result_); @@ -1029,5 +1041,5 @@ XBT_PRIVATE void simcall_HANDLER_cond_wait(smx_simcall_t simcall, smx_cond_t con XBT_PRIVATE void simcall_HANDLER_cond_wait_timeout(smx_simcall_t simcall, smx_cond_t cond, smx_mutex_t mutex, double timeout); XBT_PRIVATE void simcall_HANDLER_sem_acquire(smx_simcall_t simcall, smx_sem_t sem); XBT_PRIVATE void simcall_HANDLER_sem_acquire_timeout(smx_simcall_t simcall, smx_sem_t sem, double timeout); -XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io); +XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io, double timeout); XBT_PRIVATE int simcall_HANDLER_mc_random(smx_simcall_t simcall, int min, int max); diff --git a/src/simix/popping_bodies.cpp b/src/simix/popping_bodies.cpp index 4462018649..356a7b7c30 100644 --- a/src/simix/popping_bodies.cpp +++ b/src/simix/popping_bodies.cpp @@ -165,11 +165,11 @@ inline static int simcall_BODY_sem_acquire_timeout(smx_sem_t sem, double timeout return simcall(SIMCALL_SEM_ACQUIRE_TIMEOUT, sem, timeout); } -inline static sg_size_t simcall_BODY_io_wait(simgrid::kernel::activity::IoImpl* io) +inline static sg_size_t simcall_BODY_io_wait(simgrid::kernel::activity::IoImpl* io, double timeout) { if (0) /* Go to that function to follow the code flow through the simcall barrier */ - simcall_HANDLER_io_wait(&SIMIX_process_self()->simcall, io); - return simcall(SIMCALL_IO_WAIT, io); + simcall_HANDLER_io_wait(&SIMIX_process_self()->simcall, io, timeout); + return simcall(SIMCALL_IO_WAIT, io, timeout); } inline static int simcall_BODY_mc_random(int min, int max) diff --git a/src/simix/popping_generated.cpp b/src/simix/popping_generated.cpp index df1deab123..d6d25a0277 100644 --- a/src/simix/popping_generated.cpp +++ b/src/simix/popping_generated.cpp @@ -138,7 +138,7 @@ void simgrid::kernel::actor::ActorImpl::simcall_handle(int value) { break; case SIMCALL_IO_WAIT: - simcall_HANDLER_io_wait(&simcall, simgrid::simix::unmarshal(simcall.args_[0])); + simcall_HANDLER_io_wait(&simcall, simgrid::simix::unmarshal(simcall.args_[0]), simgrid::simix::unmarshal(simcall.args_[1])); break; case SIMCALL_MC_RANDOM: diff --git a/src/simix/simcalls.in b/src/simix/simcalls.in index bf2371e0ec..f0ebde69fc 100644 --- a/src/simix/simcalls.in +++ b/src/simix/simcalls.in @@ -58,7 +58,7 @@ int cond_wait_timeout(smx_cond_t cond, smx_mutex_t mutex, double timeout) void sem_acquire(smx_sem_t sem) [[block]]; int sem_acquire_timeout(smx_sem_t sem, double timeout) [[block]]; -sg_size_t io_wait(simgrid::kernel::activity::IoImpl* io) [[block]]; +sg_size_t io_wait(simgrid::kernel::activity::IoImpl* io, double timeout) [[block]]; int mc_random(int min, int max); -- 2.20.1