Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' into CRTP
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Mon, 7 Oct 2019 08:04:41 +0000 (10:04 +0200)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Mon, 7 Oct 2019 08:04:41 +0000 (10:04 +0200)
1  2 
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Exec.hpp
include/simgrid/s4u/Io.hpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp

@@@ -6,7 -6,6 +6,7 @@@
  #ifndef SIMGRID_S4U_ACTIVITY_HPP
  #define SIMGRID_S4U_ACTIVITY_HPP
  
 +#include "xbt/asserts.h"
  #include <simgrid/forward.h>
  #include <xbt/signal.hpp>
  
@@@ -85,61 -84,21 +85,51 @@@ public
    Activity* set_remaining(double remains);
  
    /** Put some user data onto the Activity */
 -  Activity* set_user_data(void* data)
 -  {
 -    user_data_ = data;
 -    return this;
 -  }
 -  /** Retrieve the user data of the Activity */
 -  void* get_user_data() { return user_data_; }
  
-   kernel::activity::ActivityImplPtr get_impl() { return pimpl_; }
- #ifndef DOXYGEN
-   XBT_ATTRIB_DEPRECATED_v324("Please use Activity::wait_for()") virtual void wait(double timeout) = 0;
-   XBT_ATTRIB_DEPRECATED_v323("Please use Activity::get_state()") Activity::State getState() { return state_; }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Activity::get_remaining()") double getRemains() { return get_remaining(); }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Activity::set_remaining()") Activity* setRemains(double remains)
-   {
-     return set_remaining(remains);
-   }
- #endif
+   kernel::activity::ActivityImpl* get_impl() const { return pimpl_.get(); }
  
  private:
    kernel::activity::ActivityImplPtr pimpl_ = nullptr;
    Activity::State state_                   = Activity::State::INITED;
    double remains_                          = 0;
 -  void* user_data_                         = nullptr;
 +};
 +
 +template <class AnyActivity> class Activity_T : public Activity {
 +private:
 +  std::string name_             = "";
 +  std::string tracing_category_ = "";
 +  void* user_data_              = nullptr;
 +
 +public:
 +  AnyActivity* set_name(const std::string& name)
 +  {
 +    xbt_assert(get_state() == State::INITED, "Cannot change the name of an activity after its start");
 +    name_ = name;
 +    return static_cast<AnyActivity*>(this);
 +  }
 +  const std::string& get_name() { return name_; }
 +  const char* get_cname() { return name_.c_str(); }
 +
 +  AnyActivity* set_tracing_category(const std::string& category)
 +  {
 +    xbt_assert(get_state() == State::INITED, "Cannot change the tracing category of an activity after its start");
 +    tracing_category_ = category;
 +    return static_cast<AnyActivity*>(this);
 +  }
 +  const std::string& get_tracing_category() { return tracing_category_; }
 +
 +  AnyActivity* set_user_data(void* data)
 +  {
 +    user_data_ = data;
 +    return static_cast<AnyActivity*>(this);
 +  }
 +
 +  void* get_user_data() { return user_data_; }
 +  XBT_ATTRIB_DEPRECATED_v323("Please use Activity::set_user_data()") AnyActivity* setUserData(void* data)
 +  {
 +    return set_user_data(data);
 +  }
 +  XBT_ATTRIB_DEPRECATED_v323("Please use Activity::get_user_data()") void* getUserData() { return user_data_; }
  };
  
  } // namespace s4u
@@@ -10,6 -10,7 +10,7 @@@
  #include <simgrid/s4u/Activity.hpp>
  
  #include <atomic>
+ #include <string>
  #include <vector>
  
  namespace simgrid {
@@@ -18,7 -19,7 +19,7 @@@ namespace s4u 
   *
   * Represents all asynchronous communications, that you can test or wait onto.
   */
 -class XBT_PUBLIC Comm : public Activity {
 +class XBT_PUBLIC Comm : public Activity_T<Comm> {
    Mailbox* mailbox_                   = nullptr;
    kernel::actor::ActorImpl* sender_   = nullptr;
    kernel::actor::ActorImpl* receiver_ = nullptr;
    size_t dst_buff_size_               = 0;
    void* src_buff_                     = nullptr;
    size_t src_buff_size_               = sizeof(void*);
+   std::string tracing_category_       = "";
    std::atomic_int_fast32_t refcount_{0};
    /* FIXME: expose these elements in the API */
-   int detached_                                                           = 0;
+   bool detached_                                                          = false;
    int (*match_fun_)(void*, void*, kernel::activity::CommImpl*)            = nullptr;
    void (*clean_fun_)(void*)                                               = nullptr;
    void (*copy_data_function_)(kernel::activity::CommImpl*, void*, size_t) = nullptr;
  
 -  Comm() : Activity() {}
 +  Comm() = default;
  
  public:
+ #ifndef DOXYGEN
    friend XBT_PUBLIC void intrusive_ptr_release(Comm* c);
    friend XBT_PUBLIC void intrusive_ptr_add_ref(Comm* c);
    friend Mailbox; // Factory of comms
+ #endif
  
    virtual ~Comm();
  
     * That's a buffer where the sent data will be copied  */
    CommPtr set_dst_data(void** buff, size_t size);
  
+   CommPtr set_tracing_category(const std::string& category);
    /** Retrieve the mailbox on which this comm acts */
    Mailbox* get_mailbox();
    /** Retrieve the size of the received data. Not to be mixed with @ref Activity::set_remaining()  */
    size_t get_dst_data_size();
  
-   s4u::ActorPtr get_sender();
- #ifndef DOXYGEN
-   XBT_ATTRIB_DEPRECATED_v324("Please use Comm::wait_for()") void wait(double t) override { wait_for(t); }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_rate()") Activity* setRate(double rate)
-   {
-     return set_rate(rate).get();
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_src_data()") Activity* setSrcData(void* buff)
-   {
-     return set_src_data(buff).get();
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_src_data()") Activity* setSrcData(void* buff, size_t size)
-   {
-     return set_src_data(buff, size).get();
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_src_data_size()") Activity* setSrcDataSize(size_t size)
-   {
-     return set_src_data_size(size).get();
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_dst_data()") Activity* setDstData(void** buff)
-   {
-     return set_dst_data(buff).get();
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_dst_data()") Activity* setDstData(void** buff, size_t size)
-   {
-     return set_dst_data(buff, size).get();
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::get_dst_data_size()") size_t getDstDataSize()
-   {
-     return get_dst_data_size();
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Comm::get_mailbox()") Mailbox* getMailbox() { return get_mailbox(); }
- #endif
+   Actor* get_sender();
  };
  } // namespace s4u
  } // namespace simgrid
@@@ -8,6 -8,7 +8,7 @@@
  
  #include <simgrid/forward.h>
  #include <simgrid/s4u/Activity.hpp>
+ #include <simgrid/s4u/Actor.hpp>
  #include <xbt/ex.h>
  
  #include <atomic>
@@@ -20,10 -21,12 +21,10 @@@ namespace s4u 
   * They are generated from this_actor::exec_init() or Host::execute(), and can be used to model pools of threads or
   * similar mechanisms.
   */
 -class XBT_PUBLIC Exec : public Activity {
 -  std::string name_             = "";
 +class XBT_PUBLIC Exec : public Activity_T<Exec> {
    double priority_              = 1.0;
    double bound_                 = 0.0;
    double timeout_               = 0.0;
 -  std::string tracing_category_ = "";
    std::atomic_int_fast32_t refcount_{0};
    Host* host_ = nullptr;
  
@@@ -35,14 -38,14 +36,14 @@@ public
  #ifndef DOXYGEN
    Exec(Exec const&) = delete;
    Exec& operator=(Exec const&) = delete;
- #endif
  
    friend ExecSeq;
    friend ExecPar;
    friend XBT_PUBLIC void intrusive_ptr_release(Exec* e);
    friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
-   static xbt::signal<void(Actor const&)> on_start;
-   static xbt::signal<void(Actor const&)> on_completion;
+ #endif
+   static xbt::signal<void(Actor const&, Exec const&)> on_start;
+   static xbt::signal<void(Actor const&, Exec const&)> on_completion;
  
    virtual Exec* start() override          = 0;
    virtual double get_remaining_ratio()    = 0;
  
    Exec* wait() override;
    Exec* wait_for(double timeout) override;
+   /*! take a vector of s4u::ExecPtr and return when one of them is finished.
+    * The return value is the rank of the first finished ExecPtr. */
+   static int wait_any(std::vector<ExecPtr>* execs) { return wait_any_for(execs, -1); }
+   /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+   static int wait_any_for(std::vector<ExecPtr>* execs, double timeout);
    bool test() override;
  
    ExecPtr set_bound(double bound);
 -  ExecPtr set_name(const std::string& name);
    ExecPtr set_priority(double priority);
 -  ExecPtr set_tracing_category(const std::string& category);
    ExecPtr set_timeout(double timeout);
    Exec* cancel() override;
-   XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_priority()") ExecPtr setPriority(double priority)
-   {
-     return set_priority(priority);
-   }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_bound()") ExecPtr setBound(double bound) { return set_bound(bound); }
-   XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); }
+   const std::string& get_name() const { return name_; }
+   const char* get_cname() const { return name_.c_str(); }
+   Host* get_host() const;
+   unsigned int get_host_number() const;
+   double get_start_time() const;
+   double get_finish_time() const;
+   double get_cost() const;
  };
  
  class XBT_PUBLIC ExecSeq : public Exec {
@@@ -82,16 -93,6 +89,6 @@@ public
  
    double get_remaining() override;
    double get_remaining_ratio() override;
- #ifndef DOXYGEN
-   //////////////// Deprecated functions
-   XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_host()") ExecPtr setHost(Host* host) { return set_host(host); }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_host()") Host* getHost() { return get_host(); }
-   XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_remaining_ratio()") double getRemainingRatio()
-   {
-     return get_remaining_ratio();
-   }
- #endif
  };
  
  class XBT_PUBLIC ExecPar : public Exec {
    std::vector<double> bytes_amounts_;
    explicit ExecPar(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
                     const std::vector<double>& bytes_amounts);
-   ExecPtr set_host(Host* host) override { return this; }
+   ExecPtr set_host(Host*) override { /* parallel exec cannot be moved */ THROW_UNIMPLEMENTED; }
  
  public:
    ~ExecPar() = default;
@@@ -17,25 -17,31 +17,30 @@@ namespace s4u 
  
  /** I/O Activity, representing the asynchronous disk access.
   *
-  * They are generated from Storage::io_init() or Storage::read() and Storage::write().
+  * They are generated from Disk::io_init(), Disk::read() Disk::read_async(), Disk::write() and Disk::write_async().
   */
  
 -class XBT_PUBLIC Io : public Activity {
 +class XBT_PUBLIC Io : public Activity_T<Io> {
  public:
    enum class OpType { READ, WRITE };
  
  private:
    Storage* storage_ = nullptr;
+   Disk* disk_       = nullptr;
    sg_size_t size_   = 0;
    OpType type_      = OpType::READ;
 -  std::string name_ = "";
    std::atomic_int_fast32_t refcount_{0};
  
    explicit Io(sg_storage_t storage, sg_size_t size, OpType type);
+   explicit Io(sg_disk_t disk, sg_size_t size, OpType type);
  
  public:
+ #ifndef DOXYGEN
    friend XBT_PUBLIC void intrusive_ptr_release(simgrid::s4u::Io* i);
    friend XBT_PUBLIC void intrusive_ptr_add_ref(simgrid::s4u::Io* i);
+   friend Disk;    // Factory of IOs
    friend Storage; // Factory of IOs
+ #endif
  
    ~Io() = default;
  
  
    double get_remaining() override;
    sg_size_t get_performed_ioops();
- #ifndef DOXYGEN
-   XBT_ATTRIB_DEPRECATED_v324("Please use Io::wait_for()") void wait(double t) override { wait_for(t); }
- #endif
  };
  
  } // namespace s4u
diff --combined src/s4u/s4u_Comm.cpp
@@@ -106,20 -106,27 +106,27 @@@ CommPtr Comm::set_dst_data(void** buff
    return this;
  }
  
+ CommPtr Comm::set_tracing_category(const std::string& category)
+ {
+   xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
+   tracing_category_ = category;
+   return this;
+ }
  Comm* Comm::start()
  {
 -  xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
 +  xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)",
               __FUNCTION__);
  
    if (src_buff_ != nullptr) { // Sender side
      on_sender_start(*Actor::self());
      pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
 -                                clean_fun_, copy_data_function_, user_data_, detached_);
 +                                clean_fun_, copy_data_function_, get_user_data(), detached_);
    } else if (dst_buff_ != nullptr) { // Receiver side
      xbt_assert(not detached_, "Receive cannot be detached");
      on_receiver_start(*Actor::self());
      pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
 -                                copy_data_function_, user_data_, rate_);
 +                                copy_data_function_, get_user_data(), rate_);
  
    } else {
      xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
@@@ -150,12 -157,12 +157,12 @@@ Comm* Comm::wait_for(double timeout
        if (src_buff_ != nullptr) {
          on_sender_start(*Actor::self());
          simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
 -                          copy_data_function_, user_data_, timeout);
 +                          copy_data_function_, get_user_data(), timeout);
  
        } else { // Receiver
          on_receiver_start(*Actor::self());
          simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
 -                          user_data_, timeout, rate_);
 +                          get_user_data(), timeout, rate_);
        }
        state_ = State::FINISHED;
        break;
@@@ -193,7 -200,7 +200,7 @@@ Comm* Comm::detach(
  
  Comm* Comm::cancel()
  {
-   simix::simcall([this] {
+   kernel::actor::simcall([this] {
      if (pimpl_)
        boost::static_pointer_cast<kernel::activity::CommImpl>(pimpl_)->cancel();
    });
@@@ -223,9 -230,9 +230,9 @@@ Mailbox* Comm::get_mailbox(
    return mailbox_;
  }
  
- ActorPtr Comm::get_sender()
+ Actor* Comm::get_sender()
  {
-   return sender_ ? sender_->iface() : nullptr;
+   return sender_ ? sender_->ciface() : nullptr;
  }
  
  void intrusive_ptr_release(simgrid::s4u::Comm* c)
diff --combined src/s4u/s4u_Exec.cpp
@@@ -12,8 -12,8 +12,8 @@@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_exe
  
  namespace simgrid {
  namespace s4u {
- xbt::signal<void(Actor const&)> Exec::on_start;
- xbt::signal<void(Actor const&)> Exec::on_completion;
+ xbt::signal<void(Actor const&, Exec const&)> Exec::on_start;
+ xbt::signal<void(Actor const&, Exec const&)> Exec::on_completion;
  
  Exec::Exec()
  {
@@@ -44,7 -44,7 +44,7 @@@ Exec* Exec::wait(
      start();
    simcall_execution_wait(pimpl_);
    state_ = State::FINISHED;
-   on_completion(*Actor::self());
+   on_completion(*Actor::self(), *this);
    return this;
  }
  
@@@ -53,9 -53,17 +53,17 @@@ Exec* Exec::wait_for(double
    THROW_UNIMPLEMENTED;
  }
  
+ int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
+ {
+   std::unique_ptr<kernel::activity::ExecImpl* []> rexecs(new kernel::activity::ExecImpl*[execs->size()]);
+   std::transform(begin(*execs), end(*execs), rexecs.get(),
+                  [](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
+   return simcall_execution_waitany_for(rexecs.get(), execs->size(), timeout);
+ }
  Exec* Exec::cancel()
  {
-   simix::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
+   kernel::actor::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
    state_ = State::CANCELED;
    return this;
  }
@@@ -90,6 -98,34 +98,27 @@@ ExecPtr Exec::set_timeout(double timeou
    return this;
  }
  
 -ExecPtr Exec::set_name(const std::string& name)
 -{
 -  xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start");
 -  name_ = name;
 -  return this;
 -}
 -
+ Host* Exec::get_host() const
+ {
+   return static_cast<kernel::activity::ExecImpl*>(pimpl_.get())->get_host();
+ }
+ unsigned int Exec::get_host_number() const
+ {
+   return static_cast<kernel::activity::ExecImpl*>(pimpl_.get())->get_host_number();
+ }
+ double Exec::get_start_time() const
+ {
+   return (pimpl_->surf_action_ == nullptr) ? -1 : pimpl_->surf_action_->get_start_time();
+ }
+ double Exec::get_finish_time() const
+ {
+   return (pimpl_->surf_action_ == nullptr) ? -1 : pimpl_->surf_action_->get_finish_time();
+ }
+ double Exec::get_cost() const
+ {
+   return (pimpl_->surf_action_ == nullptr) ? -1 : pimpl_->surf_action_->get_cost();
+ }
  /** @brief  Change the execution priority, don't you think?
   *
   * An execution with twice the priority will get twice the amount of flops when the resource is shared.
@@@ -103,6 -139,13 +132,6 @@@ ExecPtr Exec::set_priority(double prior
    return this;
  }
  
 -ExecPtr Exec::set_tracing_category(const std::string& category)
 -{
 -  xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
 -  tracing_category_ = category;
 -  return this;
 -}
 -
  ///////////// SEQUENTIAL EXECUTIONS ////////
  ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount)
  {
  
  Exec* ExecSeq::start()
  {
-   simix::simcall([this] {
+   kernel::actor::simcall([this] {
      (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
          .set_name(name_)
          .set_tracing_category(tracing_category_)
-         .set_priority(1. / priority_)
+         .set_sharing_penalty(1. / priority_)
          .set_bound(bound_)
          .set_flops_amount(flops_amount_)
          .start();
    });
    state_ = State::STARTED;
-   on_start(*Actor::self());
+   on_start(*Actor::self(), *this);
    return this;
  }
  
@@@ -151,17 -194,17 +180,17 @@@ Host* ExecSeq::get_host(
  /** @brief Returns the amount of flops that remain to be done */
  double ExecSeq::get_remaining()
  {
-   return simgrid::simix::simcall(
+   return simgrid::kernel::actor::simcall(
        [this]() { return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_remaining(); });
  }
  
- /** @brief Returns the ratio of elements that are still to do
+ /** @brief Returns the ratio of elements that are still to do
   *
   * The returned value is between 0 (completely done) and 1 (nothing done yet).
   */
  double ExecSeq::get_remaining_ratio()
  {
-   return simgrid::simix::simcall([this]() {
+   return simgrid::kernel::actor::simcall([this]() {
      return boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->get_seq_remaining_ratio();
    });
  }
@@@ -175,7 -218,7 +204,7 @@@ ExecPar::ExecPar(const std::vector<s4u:
  
  Exec* ExecPar::start()
  {
-   simix::simcall([this] {
+   kernel::actor::simcall([this] {
      (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
          .set_hosts(hosts_)
          .set_timeout(timeout_)
          .start();
    });
    state_ = State::STARTED;
-   on_start(*Actor::self());
+   on_start(*Actor::self(), *this);
    return this;
  }
  
  double ExecPar::get_remaining_ratio()
  {
-   return simix::simcall(
+   return kernel::actor::simcall(
        [this]() { return boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->get_par_remaining_ratio(); });
  }