From: Frederic Suter Date: Tue, 31 Jul 2018 19:46:08 +0000 (+0200) Subject: Implement I/O as asynchronous activities X-Git-Tag: v3_21~302^2^2~6 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/939476f1fc7630eb237535b07ed0deef77ce1b24 Implement I/O as asynchronous activities --- diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index 19e0b5b3fe..0d07099a0c 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -31,6 +31,10 @@ using ExecPtr = boost::intrusive_ptr; XBT_PUBLIC void intrusive_ptr_release(Exec* e); XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e); class Host; +class Io; +using IoPtr = boost::intrusive_ptr; +XBT_PUBLIC void intrusive_ptr_release(Io* i); +XBT_PUBLIC void intrusive_ptr_add_ref(Io* i); class Link; class Mailbox; using MailboxPtr = boost::intrusive_ptr; diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index 7e53d03c78..615f7bb3ba 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -33,6 +33,9 @@ class XBT_PUBLIC Activity { friend Exec; friend XBT_PUBLIC void intrusive_ptr_release(Exec * e); friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec * e); + friend Io; + friend XBT_PUBLIC void intrusive_ptr_release(Io* i); + friend XBT_PUBLIC void intrusive_ptr_add_ref(Io* i); protected: Activity() = default; diff --git a/include/simgrid/s4u/Io.hpp b/include/simgrid/s4u/Io.hpp new file mode 100644 index 0000000000..6e3df222a4 --- /dev/null +++ b/include/simgrid/s4u/Io.hpp @@ -0,0 +1,41 @@ +/* Copyright (c) 2017-2018. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_S4U_IO_HPP +#define SIMGRID_S4U_IO_HPP + +#include +#include + +#include + +namespace simgrid { +namespace s4u { + +class XBT_PUBLIC Io : public Activity { + Io() : Activity() {} +public: + friend XBT_PUBLIC void intrusive_ptr_release(simgrid::s4u::Io* i); + friend XBT_PUBLIC void intrusive_ptr_add_ref(simgrid::s4u::Io* i); + friend Storage; // Factory of IOs + + ~Io() = default; + + Activity* start() override; + Activity* wait() override; + Activity* wait(double timeout) override; + Activity* cancel() override; + + double get_remaining() override; + +private: + sg_size_t size_ = 0; + sg_storage_t storage_ = nullptr; + std::atomic_int_fast32_t refcount_{0}; +}; // class +} +}; // Namespace simgrid::s4u + +#endif /* SIMGRID_S4U_IO_HPP */ diff --git a/include/simgrid/s4u/Storage.hpp b/include/simgrid/s4u/Storage.hpp index bd6b092c21..f9d55c4848 100644 --- a/include/simgrid/s4u/Storage.hpp +++ b/include/simgrid/s4u/Storage.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -25,6 +26,7 @@ XBT_ATTRIB_DEPRECATED_v322("Please use Engine::get_all_storages()") XBT_PUBLIC v class XBT_PUBLIC Storage : public simgrid::xbt::Extendable { friend s4u::Engine; + friend s4u::Io; friend simgrid::surf::StorageImpl; public: @@ -61,6 +63,8 @@ public: void set_data(void* data) { userdata_ = data; } void* get_data() { return userdata_; } + static IoPtr io_init(sg_size_t size); + sg_size_t read(sg_size_t size); sg_size_t write(sg_size_t size); surf::StorageImpl* get_impl() { return pimpl_; } diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index 4f5047b54c..d7c752524c 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -286,6 +286,7 @@ XBT_PUBLIC void simcall_sem_acquire(smx_sem_t sem); XBT_PUBLIC int simcall_sem_acquire_timeout(smx_sem_t sem, double max_duration); /***************************** Storage **********************************/ +XBT_PUBLIC smx_activity_t simcall_io_start(sg_size_t size, sg_storage_t storage); XBT_PUBLIC sg_size_t simcall_storage_read(surf_storage_t st, sg_size_t size); XBT_PUBLIC sg_size_t simcall_storage_write(surf_storage_t fd, sg_size_t size); /************************** MC simcalls **********************************/ diff --git a/src/kernel/activity/IoImpl.cpp b/src/kernel/activity/IoImpl.cpp index eb61b839f3..fd99cca2e7 100644 --- a/src/kernel/activity/IoImpl.cpp +++ b/src/kernel/activity/IoImpl.cpp @@ -7,6 +7,23 @@ #include "simgrid/kernel/resource/Action.hpp" #include "src/simix/smx_io_private.hpp" +simgrid::kernel::activity::IoImpl::IoImpl(std::string name, resource::Action* surf_action, s4u::Storage* storage) + : ActivityImpl(name), storage_(storage), surf_action_(surf_action) +{ + this->state_ = SIMIX_RUNNING; + + surf_action_->set_data(this); + + XBT_DEBUG("Create exec %p", this); +} + +void simgrid::kernel::activity::IoImpl::cancel() +{ + XBT_VERB("This exec %p is canceled", this); + if (surf_action_ != nullptr) + surf_action_->cancel(); +} + void simgrid::kernel::activity::IoImpl::suspend() { if (surf_action_ != nullptr) @@ -19,6 +36,11 @@ void simgrid::kernel::activity::IoImpl::resume() surf_action_->resume(); } +double simgrid::kernel::activity::IoImpl::get_remaining() +{ + return surf_action_ ? surf_action_->get_remains() : 0; +} + void simgrid::kernel::activity::IoImpl::post() { for (smx_simcall_t const& simcall : simcalls_) { diff --git a/src/kernel/activity/IoImpl.hpp b/src/kernel/activity/IoImpl.hpp index 71f1a45377..f44f09044a 100644 --- a/src/kernel/activity/IoImpl.hpp +++ b/src/kernel/activity/IoImpl.hpp @@ -14,12 +14,23 @@ namespace kernel { namespace activity { class XBT_PUBLIC IoImpl : public ActivityImpl { + ~IoImpl() override; + +public: + explicit IoImpl(std::string name, resource::Action* surf_action, s4u::Storage* storage); + public: void suspend() override; void resume() override; void post() override; + void cancel(); + double get_remaining(); + s4u::Storage* storage_ = nullptr; simgrid::kernel::resource::Action* surf_action_ = nullptr; + + static simgrid::xbt::signal on_creation; + static simgrid::xbt::signal on_completion; }; } } diff --git a/src/s4u/s4u_Io.cpp b/src/s4u/s4u_Io.cpp new file mode 100644 index 0000000000..1d99c57f88 --- /dev/null +++ b/src/s4u/s4u_Io.cpp @@ -0,0 +1,89 @@ +/* Copyright (c) 2018. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "simgrid/s4u/Io.hpp" +#include "simgrid/s4u/Storage.hpp" +#include "src/kernel/activity/IoImpl.hpp" +#include "xbt/log.h" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_io, s4u_activity, "S4U asynchronous IOs"); + +namespace simgrid { +namespace s4u { + +Activity* Io::start() +{ + pimpl_ = simcall_io_start(size_, storage_); + state_ = State::STARTED; + return this; +} + +Activity* Io::cancel() +{ + simgrid::simix::simcall([this] { dynamic_cast(pimpl_.get())->cancel(); }); + state_ = State::CANCELED; + return this; +} + +// Activity* Exec::wait() +//{ +// simcall_execution_wait(pimpl_); +// state_ = State::FINISHED; +// return this; +//} +// +// Activity* Exec::wait(double timeout) +//{ +// THROW_UNIMPLEMENTED; +// return this; +//} +// +///** @brief Returns whether the state of the exec is finished */ +// bool Exec::test() +//{ +// xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED); +// +// if (state_ == State::FINISHED) +// return true; +// +// if (state_ == State::INITED) +// this->start(); +// +// if (simcall_execution_test(pimpl_)) { +// state_ = State::FINISHED; +// return true; +// } +// +// return false; +//} + +/** @brief Returns the amount of flops that remain to be done */ +double Io::get_remaining() +{ + return simgrid::simix::simcall( + [this]() { return boost::static_pointer_cast(pimpl_)->get_remaining(); }); +} + +// double Io::get_remaining_ratio() +//{ +// return simgrid::simix::simcall([this]() { +// return boost::static_pointer_cast(pimpl_)->get_remaining_ratio(); +// }); +//} + +void intrusive_ptr_release(simgrid::s4u::Io* e) +{ + if (e->refcount_.fetch_sub(1, std::memory_order_release) == 1) { + std::atomic_thread_fence(std::memory_order_acquire); + delete e; + } +} + +void intrusive_ptr_add_ref(simgrid::s4u::Io* e) +{ + e->refcount_.fetch_add(1, std::memory_order_relaxed); +} +} // namespace s4u +} // namespace simgrid diff --git a/src/s4u/s4u_Storage.cpp b/src/s4u/s4u_Storage.cpp index 84319d81a6..e8aa233177 100644 --- a/src/s4u/s4u_Storage.cpp +++ b/src/s4u/s4u_Storage.cpp @@ -5,6 +5,7 @@ #include "simgrid/s4u/Engine.hpp" #include "simgrid/s4u/Host.hpp" +#include "simgrid/s4u/Io.hpp" #include "simgrid/s4u/Storage.hpp" #include "simgrid/storage.h" #include "src/surf/StorageImpl.hpp" @@ -55,6 +56,14 @@ void Storage::set_property(std::string key, std::string value) simgrid::simix::simcall([this, key, value] { this->pimpl_->set_property(key, value); }); } +IoPtr Storage::io_init(sg_size_t size) +{ + IoPtr res = IoPtr(new Io()); + res->size_ = size; + res->set_remaining(size); + return res; +} + sg_size_t Storage::read(sg_size_t size) { return simcall_storage_read(pimpl_, size); diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index 51b08fa8fc..0ec3f3987a 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -15,10 +15,12 @@ #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/activity/ConditionVariableImpl.hpp" #include "src/kernel/activity/ExecImpl.hpp" +#include "src/kernel/activity/IoImpl.hpp" #include "src/kernel/activity/MutexImpl.hpp" #include "src/mc/mc_replay.hpp" #include "src/plugins/vm/VirtualMachineImpl.hpp" #include "src/simix/smx_host_private.hpp" +#include "src/simix/smx_io_private.hpp" XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix); @@ -466,6 +468,14 @@ int simcall_sem_acquire_timeout(smx_sem_t sem, double timeout) return simcall_BODY_sem_acquire_timeout(sem, timeout); } +smx_activity_t simcall_io_start(std::string name, sg_size_t size, simgrid::s4u::Storage* storage) +{ + /* checking for infinite values */ + xbt_assert(std::isfinite(size), "size is not finite!"); + + return simgrid::simix::simcall([name, size, storage] { return SIMIX_io_start(name, size, storage); }); +} + sg_size_t simcall_storage_read(surf_storage_t st, sg_size_t size) { return simcall_BODY_storage_read(st, size); diff --git a/src/simix/smx_io.cpp b/src/simix/smx_io.cpp index f210733aa6..1f9d6fd955 100644 --- a/src/simix/smx_io.cpp +++ b/src/simix/smx_io.cpp @@ -13,6 +13,20 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, "Logging specific to SIMIX (io)"); +simgrid::kernel::activity::IoImplPtr SIMIX_io_start(std::string name, sg_size_t size, sg_storage_t storage) +{ + /* set surf's action */ + simgrid::kernel::resource::Action* surf_action = storage->pimpl_->io_start(size); + + simgrid::kernel::activity::IoImplPtr io = + simgrid::kernel::activity::IoImplPtr(new simgrid::kernel::activity::IoImpl(name, surf_action, storage)); + + XBT_DEBUG("Create IO synchro %p %s", io.get(), name.c_str()); + simgrid::kernel::activity::IoImpl::on_creation(io); + + return io; +} + void simcall_HANDLER_storage_read(smx_simcall_t simcall, surf_storage_t st, sg_size_t size) { smx_activity_t synchro = SIMIX_storage_read(st, size); diff --git a/src/simix/smx_io_private.hpp b/src/simix/smx_io_private.hpp index cae0df5558..dc62c72e6a 100644 --- a/src/simix/smx_io_private.hpp +++ b/src/simix/smx_io_private.hpp @@ -10,7 +10,7 @@ #include "popping_private.hpp" #include "simgrid/simix.h" - +XBT_PRIVATE simgrid::kernel::activity::IoImplPtr SIMIX_io_start(std::string name, sg_size_t size, sg_storage_t storage); XBT_PRIVATE smx_activity_t SIMIX_storage_read(surf_storage_t fd, sg_size_t size); XBT_PRIVATE smx_activity_t SIMIX_storage_write(surf_storage_t fd, sg_size_t size); diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 2cd646e7f3..f05744b917 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -429,6 +429,7 @@ set(S4U_SRC src/s4u/s4u_Engine.cpp src/s4u/s4u_Exec.cpp src/s4u/s4u_Host.cpp + src/s4u/s4u_Io.cpp src/s4u/s4u_Link.cpp src/s4u/s4u_Mailbox.cpp src/s4u/s4u_Mutex.cpp @@ -695,6 +696,7 @@ set(headers_to_install include/simgrid/s4u/Engine.hpp include/simgrid/s4u/Exec.hpp include/simgrid/s4u/Host.hpp + include/simgrid/s4u/Io.hpp include/simgrid/s4u/Link.hpp include/simgrid/s4u/Mailbox.hpp include/simgrid/s4u/Mutex.hpp