Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Implement I/O as asynchronous activities
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Tue, 31 Jul 2018 19:46:08 +0000 (21:46 +0200)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 2 Aug 2018 22:00:13 +0000 (00:00 +0200)
13 files changed:
include/simgrid/forward.h
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Io.hpp [new file with mode: 0644]
include/simgrid/s4u/Storage.hpp
include/simgrid/simix.h
src/kernel/activity/IoImpl.cpp
src/kernel/activity/IoImpl.hpp
src/s4u/s4u_Io.cpp [new file with mode: 0644]
src/s4u/s4u_Storage.cpp
src/simix/libsmx.cpp
src/simix/smx_io.cpp
src/simix/smx_io_private.hpp
tools/cmake/DefinePackages.cmake

index 19e0b5b..0d07099 100644 (file)
@@ -31,6 +31,10 @@ using ExecPtr = boost::intrusive_ptr<Exec>;
 XBT_PUBLIC void intrusive_ptr_release(Exec* e);
 XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
 class Host;
 XBT_PUBLIC void intrusive_ptr_release(Exec* e);
 XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
 class Host;
+class Io;
+using IoPtr = boost::intrusive_ptr<Io>;
+XBT_PUBLIC void intrusive_ptr_release(Io* i);
+XBT_PUBLIC void intrusive_ptr_add_ref(Io* i);
 class Link;
 class Mailbox;
 using MailboxPtr = boost::intrusive_ptr<Mailbox>;
 class Link;
 class Mailbox;
 using MailboxPtr = boost::intrusive_ptr<Mailbox>;
index 7e53d03..615f7bb 100644 (file)
@@ -33,6 +33,9 @@ class XBT_PUBLIC Activity {
   friend Exec;
   friend XBT_PUBLIC void intrusive_ptr_release(Exec * e);
   friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec * e);
   friend Exec;
   friend XBT_PUBLIC void intrusive_ptr_release(Exec * e);
   friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec * e);
+  friend Io;
+  friend XBT_PUBLIC void intrusive_ptr_release(Io* i);
+  friend XBT_PUBLIC void intrusive_ptr_add_ref(Io* i);
 
 protected:
   Activity()  = default;
 
 protected:
   Activity()  = default;
diff --git a/include/simgrid/s4u/Io.hpp b/include/simgrid/s4u/Io.hpp
new file mode 100644 (file)
index 0000000..6e3df22
--- /dev/null
@@ -0,0 +1,41 @@
+/* Copyright (c) 2017-2018. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_IO_HPP
+#define SIMGRID_S4U_IO_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Activity.hpp>
+
+#include <atomic>
+
+namespace simgrid {
+namespace s4u {
+
+class XBT_PUBLIC Io : public Activity {
+  Io() : Activity() {}
+public:
+  friend XBT_PUBLIC void intrusive_ptr_release(simgrid::s4u::Io* i);
+  friend XBT_PUBLIC void intrusive_ptr_add_ref(simgrid::s4u::Io* i);
+  friend Storage; // Factory of IOs
+
+  ~Io() = default;
+
+  Activity* start() override;
+  Activity* wait() override;
+  Activity* wait(double timeout) override;
+  Activity* cancel() override;
+
+  double get_remaining() override;
+
+private:
+  sg_size_t size_       = 0;
+  sg_storage_t storage_ = nullptr;
+  std::atomic_int_fast32_t refcount_{0};
+}; // class
+}
+}; // Namespace simgrid::s4u
+
+#endif /* SIMGRID_S4U_IO_HPP */
index bd6b092..f9d55c4 100644 (file)
@@ -9,6 +9,7 @@
 #include <simgrid/forward.h>
 #include <xbt/Extendable.hpp>
 #include <xbt/base.h>
 #include <simgrid/forward.h>
 #include <xbt/Extendable.hpp>
 #include <xbt/base.h>
+#include <xbt/signal.hpp>
 
 #include <map>
 #include <string>
 
 #include <map>
 #include <string>
@@ -25,6 +26,7 @@ XBT_ATTRIB_DEPRECATED_v322("Please use Engine::get_all_storages()") XBT_PUBLIC v
 
 class XBT_PUBLIC Storage : public simgrid::xbt::Extendable<Storage> {
   friend s4u::Engine;
 
 class XBT_PUBLIC Storage : public simgrid::xbt::Extendable<Storage> {
   friend s4u::Engine;
+  friend s4u::Io;
   friend simgrid::surf::StorageImpl;
 
 public:
   friend simgrid::surf::StorageImpl;
 
 public:
@@ -61,6 +63,8 @@ public:
   void set_data(void* data) { userdata_ = data; }
   void* get_data() { return userdata_; }
 
   void set_data(void* data) { userdata_ = data; }
   void* get_data() { return userdata_; }
 
+  static IoPtr io_init(sg_size_t size);
+
   sg_size_t read(sg_size_t size);
   sg_size_t write(sg_size_t size);
   surf::StorageImpl* get_impl() { return pimpl_; }
   sg_size_t read(sg_size_t size);
   sg_size_t write(sg_size_t size);
   surf::StorageImpl* get_impl() { return pimpl_; }
index 4f5047b..d7c7525 100644 (file)
@@ -286,6 +286,7 @@ XBT_PUBLIC void simcall_sem_acquire(smx_sem_t sem);
 XBT_PUBLIC int simcall_sem_acquire_timeout(smx_sem_t sem, double max_duration);
 
 /*****************************   Storage   **********************************/
 XBT_PUBLIC int simcall_sem_acquire_timeout(smx_sem_t sem, double max_duration);
 
 /*****************************   Storage   **********************************/
+XBT_PUBLIC smx_activity_t simcall_io_start(sg_size_t size, sg_storage_t storage);
 XBT_PUBLIC sg_size_t simcall_storage_read(surf_storage_t st, sg_size_t size);
 XBT_PUBLIC sg_size_t simcall_storage_write(surf_storage_t fd, sg_size_t size);
 /************************** MC simcalls   **********************************/
 XBT_PUBLIC sg_size_t simcall_storage_read(surf_storage_t st, sg_size_t size);
 XBT_PUBLIC sg_size_t simcall_storage_write(surf_storage_t fd, sg_size_t size);
 /************************** MC simcalls   **********************************/
index eb61b83..fd99cca 100644 (file)
@@ -7,6 +7,23 @@
 #include "simgrid/kernel/resource/Action.hpp"
 #include "src/simix/smx_io_private.hpp"
 
 #include "simgrid/kernel/resource/Action.hpp"
 #include "src/simix/smx_io_private.hpp"
 
+simgrid::kernel::activity::IoImpl::IoImpl(std::string name, resource::Action* surf_action, s4u::Storage* storage)
+    : ActivityImpl(name), storage_(storage), surf_action_(surf_action)
+{
+  this->state_ = SIMIX_RUNNING;
+
+  surf_action_->set_data(this);
+
+  XBT_DEBUG("Create exec %p", this);
+}
+
+void simgrid::kernel::activity::IoImpl::cancel()
+{
+  XBT_VERB("This exec %p is canceled", this);
+  if (surf_action_ != nullptr)
+    surf_action_->cancel();
+}
+
 void simgrid::kernel::activity::IoImpl::suspend()
 {
   if (surf_action_ != nullptr)
 void simgrid::kernel::activity::IoImpl::suspend()
 {
   if (surf_action_ != nullptr)
@@ -19,6 +36,11 @@ void simgrid::kernel::activity::IoImpl::resume()
     surf_action_->resume();
 }
 
     surf_action_->resume();
 }
 
+double simgrid::kernel::activity::IoImpl::get_remaining()
+{
+  return surf_action_ ? surf_action_->get_remains() : 0;
+}
+
 void simgrid::kernel::activity::IoImpl::post()
 {
   for (smx_simcall_t const& simcall : simcalls_) {
 void simgrid::kernel::activity::IoImpl::post()
 {
   for (smx_simcall_t const& simcall : simcalls_) {
index 71f1a45..f44f090 100644 (file)
@@ -14,12 +14,23 @@ namespace kernel {
 namespace activity {
 
 class XBT_PUBLIC IoImpl : public ActivityImpl {
 namespace activity {
 
 class XBT_PUBLIC IoImpl : public ActivityImpl {
+  ~IoImpl() override;
+
+public:
+  explicit IoImpl(std::string name, resource::Action* surf_action, s4u::Storage* storage);
+
 public:
   void suspend() override;
   void resume() override;
   void post() override;
 public:
   void suspend() override;
   void resume() override;
   void post() override;
+  void cancel();
+  double get_remaining();
 
 
+  s4u::Storage* storage_                          = nullptr;
   simgrid::kernel::resource::Action* surf_action_ = nullptr;
   simgrid::kernel::resource::Action* surf_action_ = nullptr;
+
+  static simgrid::xbt::signal<void(kernel::activity::IoImplPtr)> on_creation;
+  static simgrid::xbt::signal<void(kernel::activity::IoImplPtr)> on_completion;
 };
 }
 }
 };
 }
 }
diff --git a/src/s4u/s4u_Io.cpp b/src/s4u/s4u_Io.cpp
new file mode 100644 (file)
index 0000000..1d99c57
--- /dev/null
@@ -0,0 +1,89 @@
+/* Copyright (c) 2018. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u/Io.hpp"
+#include "simgrid/s4u/Storage.hpp"
+#include "src/kernel/activity/IoImpl.hpp"
+#include "xbt/log.h"
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_io, s4u_activity, "S4U asynchronous IOs");
+
+namespace simgrid {
+namespace s4u {
+
+Activity* Io::start()
+{
+  pimpl_ = simcall_io_start(size_, storage_);
+  state_ = State::STARTED;
+  return this;
+}
+
+Activity* Io::cancel()
+{
+  simgrid::simix::simcall([this] { dynamic_cast<kernel::activity::IoImpl*>(pimpl_.get())->cancel(); });
+  state_ = State::CANCELED;
+  return this;
+}
+
+// Activity* Exec::wait()
+//{
+//  simcall_execution_wait(pimpl_);
+//  state_ = State::FINISHED;
+//  return this;
+//}
+//
+// Activity* Exec::wait(double timeout)
+//{
+//  THROW_UNIMPLEMENTED;
+//  return this;
+//}
+//
+///** @brief Returns whether the state of the exec is finished */
+// bool Exec::test()
+//{
+//  xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED);
+//
+//  if (state_ == State::FINISHED)
+//    return true;
+//
+//  if (state_ == State::INITED)
+//    this->start();
+//
+//  if (simcall_execution_test(pimpl_)) {
+//    state_ = State::FINISHED;
+//    return true;
+//  }
+//
+//  return false;
+//}
+
+/** @brief Returns the amount of flops that remain to be done */
+double Io::get_remaining()
+{
+  return simgrid::simix::simcall(
+      [this]() { return boost::static_pointer_cast<simgrid::kernel::activity::IoImpl>(pimpl_)->get_remaining(); });
+}
+
+// double Io::get_remaining_ratio()
+//{
+//  return simgrid::simix::simcall([this]() {
+//    return boost::static_pointer_cast<simgrid::kernel::activity::IoImpl>(pimpl_)->get_remaining_ratio();
+//  });
+//}
+
+void intrusive_ptr_release(simgrid::s4u::Io* e)
+{
+  if (e->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+    std::atomic_thread_fence(std::memory_order_acquire);
+    delete e;
+  }
+}
+
+void intrusive_ptr_add_ref(simgrid::s4u::Io* e)
+{
+  e->refcount_.fetch_add(1, std::memory_order_relaxed);
+}
+} // namespace s4u
+} // namespace simgrid
index 84319d8..e8aa233 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "simgrid/s4u/Engine.hpp"
 #include "simgrid/s4u/Host.hpp"
 
 #include "simgrid/s4u/Engine.hpp"
 #include "simgrid/s4u/Host.hpp"
+#include "simgrid/s4u/Io.hpp"
 #include "simgrid/s4u/Storage.hpp"
 #include "simgrid/storage.h"
 #include "src/surf/StorageImpl.hpp"
 #include "simgrid/s4u/Storage.hpp"
 #include "simgrid/storage.h"
 #include "src/surf/StorageImpl.hpp"
@@ -55,6 +56,14 @@ void Storage::set_property(std::string key, std::string value)
   simgrid::simix::simcall([this, key, value] { this->pimpl_->set_property(key, value); });
 }
 
   simgrid::simix::simcall([this, key, value] { this->pimpl_->set_property(key, value); });
 }
 
+IoPtr Storage::io_init(sg_size_t size)
+{
+  IoPtr res  = IoPtr(new Io());
+  res->size_ = size;
+  res->set_remaining(size);
+  return res;
+}
+
 sg_size_t Storage::read(sg_size_t size)
 {
   return simcall_storage_read(pimpl_, size);
 sg_size_t Storage::read(sg_size_t size)
 {
   return simcall_storage_read(pimpl_, size);
index 51b08fa..0ec3f39 100644 (file)
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/activity/ConditionVariableImpl.hpp"
 #include "src/kernel/activity/ExecImpl.hpp"
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/activity/ConditionVariableImpl.hpp"
 #include "src/kernel/activity/ExecImpl.hpp"
+#include "src/kernel/activity/IoImpl.hpp"
 #include "src/kernel/activity/MutexImpl.hpp"
 #include "src/mc/mc_replay.hpp"
 #include "src/plugins/vm/VirtualMachineImpl.hpp"
 #include "src/simix/smx_host_private.hpp"
 #include "src/kernel/activity/MutexImpl.hpp"
 #include "src/mc/mc_replay.hpp"
 #include "src/plugins/vm/VirtualMachineImpl.hpp"
 #include "src/simix/smx_host_private.hpp"
+#include "src/simix/smx_io_private.hpp"
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix);
 
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix);
 
@@ -466,6 +468,14 @@ int simcall_sem_acquire_timeout(smx_sem_t sem, double timeout)
   return simcall_BODY_sem_acquire_timeout(sem, timeout);
 }
 
   return simcall_BODY_sem_acquire_timeout(sem, timeout);
 }
 
+smx_activity_t simcall_io_start(std::string name, sg_size_t size, simgrid::s4u::Storage* storage)
+{
+  /* checking for infinite values */
+  xbt_assert(std::isfinite(size), "size is not finite!");
+
+  return simgrid::simix::simcall([name, size, storage] { return SIMIX_io_start(name, size, storage); });
+}
+
 sg_size_t simcall_storage_read(surf_storage_t st, sg_size_t size)
 {
   return simcall_BODY_storage_read(st, size);
 sg_size_t simcall_storage_read(surf_storage_t st, sg_size_t size)
 {
   return simcall_BODY_storage_read(st, size);
index f210733..1f9d6fd 100644 (file)
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, "Logging specific to SIMIX (io)");
 
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_io, simix, "Logging specific to SIMIX (io)");
 
+simgrid::kernel::activity::IoImplPtr SIMIX_io_start(std::string name, sg_size_t size, sg_storage_t storage)
+{
+  /* set surf's action */
+  simgrid::kernel::resource::Action* surf_action = storage->pimpl_->io_start(size);
+
+  simgrid::kernel::activity::IoImplPtr io =
+      simgrid::kernel::activity::IoImplPtr(new simgrid::kernel::activity::IoImpl(name, surf_action, storage));
+
+  XBT_DEBUG("Create IO synchro %p %s", io.get(), name.c_str());
+  simgrid::kernel::activity::IoImpl::on_creation(io);
+
+  return io;
+}
+
 void simcall_HANDLER_storage_read(smx_simcall_t simcall, surf_storage_t st, sg_size_t size)
 {
   smx_activity_t synchro = SIMIX_storage_read(st, size);
 void simcall_HANDLER_storage_read(smx_simcall_t simcall, surf_storage_t st, sg_size_t size)
 {
   smx_activity_t synchro = SIMIX_storage_read(st, size);
index cae0df5..dc62c72 100644 (file)
@@ -10,7 +10,7 @@
 
 #include "popping_private.hpp"
 #include "simgrid/simix.h"
 
 #include "popping_private.hpp"
 #include "simgrid/simix.h"
-
+XBT_PRIVATE simgrid::kernel::activity::IoImplPtr SIMIX_io_start(std::string name, sg_size_t size, sg_storage_t storage);
 XBT_PRIVATE smx_activity_t SIMIX_storage_read(surf_storage_t fd, sg_size_t size);
 XBT_PRIVATE smx_activity_t SIMIX_storage_write(surf_storage_t fd, sg_size_t size);
 
 XBT_PRIVATE smx_activity_t SIMIX_storage_read(surf_storage_t fd, sg_size_t size);
 XBT_PRIVATE smx_activity_t SIMIX_storage_write(surf_storage_t fd, sg_size_t size);
 
index 2cd646e..f05744b 100644 (file)
@@ -429,6 +429,7 @@ set(S4U_SRC
   src/s4u/s4u_Engine.cpp
   src/s4u/s4u_Exec.cpp
   src/s4u/s4u_Host.cpp
   src/s4u/s4u_Engine.cpp
   src/s4u/s4u_Exec.cpp
   src/s4u/s4u_Host.cpp
+  src/s4u/s4u_Io.cpp
   src/s4u/s4u_Link.cpp
   src/s4u/s4u_Mailbox.cpp
   src/s4u/s4u_Mutex.cpp
   src/s4u/s4u_Link.cpp
   src/s4u/s4u_Mailbox.cpp
   src/s4u/s4u_Mutex.cpp
@@ -695,6 +696,7 @@ set(headers_to_install
   include/simgrid/s4u/Engine.hpp
   include/simgrid/s4u/Exec.hpp
   include/simgrid/s4u/Host.hpp
   include/simgrid/s4u/Engine.hpp
   include/simgrid/s4u/Exec.hpp
   include/simgrid/s4u/Host.hpp
+  include/simgrid/s4u/Io.hpp
   include/simgrid/s4u/Link.hpp
   include/simgrid/s4u/Mailbox.hpp
   include/simgrid/s4u/Mutex.hpp
   include/simgrid/s4u/Link.hpp
   include/simgrid/s4u/Mailbox.hpp
   include/simgrid/s4u/Mutex.hpp