Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of https://framagit.org/simgrid/simgrid into CRTP
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Tue, 2 Apr 2019 11:24:56 +0000 (13:24 +0200)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Tue, 2 Apr 2019 11:24:56 +0000 (13:24 +0200)
1  2 
include/simgrid/s4u/Comm.hpp
include/simgrid/s4u/Exec.hpp
src/s4u/s4u_Comm.cpp
src/s4u/s4u_Exec.cpp

@@@ -18,7 -18,7 +18,7 @@@ namespace s4u 
   *
   * Represents all asynchronous communications, that you can test or wait onto.
   */
 -class XBT_PUBLIC Comm : public Activity {
 +class XBT_PUBLIC Comm : public Activity_T<Comm> {
    Mailbox* mailbox_                   = nullptr;
    kernel::actor::ActorImpl* sender_   = nullptr;
    kernel::actor::ActorImpl* receiver_ = nullptr;
@@@ -35,7 -35,7 +35,7 @@@
    void (*clean_fun_)(void*)                                               = nullptr;
    void (*copy_data_function_)(kernel::activity::CommImpl*, void*, size_t) = nullptr;
  
 -  Comm() : Activity() {}
 +  Comm() = default;
  
  public:
    friend XBT_PUBLIC void intrusive_ptr_release(Comm* c);
@@@ -44,9 -44,9 +44,9 @@@
  
    virtual ~Comm();
  
-   static xbt::signal<void(ActorPtr)> on_sender_start;
-   static xbt::signal<void(ActorPtr)> on_receiver_start;
-   static xbt::signal<void(ActorPtr)> on_completion;
+   static xbt::signal<void(Actor const&)> on_sender_start;
+   static xbt::signal<void(Actor const&)> on_receiver_start;
+   static xbt::signal<void(Actor const&)> on_completion;
  
    /*! take a vector s4u::CommPtr and return when one of them is finished.
     * The return value is the rank of the first finished CommPtr. */
@@@ -20,10 -20,12 +20,10 @@@ namespace s4u 
   * They are generated from this_actor::exec_init() or Host::execute(), and can be used to model pools of threads or
   * similar mechanisms.
   */
 -class XBT_PUBLIC Exec : public Activity {
 -  std::string name_             = "";
 +class XBT_PUBLIC Exec : public Activity_T<Exec> {
    double priority_              = 1.0;
    double bound_                 = 0.0;
    double timeout_               = 0.0;
 -  std::string tracing_category_ = "";
    std::atomic_int_fast32_t refcount_{0};
    Host* host_ = nullptr;
  
@@@ -41,8 -43,8 +41,8 @@@ public
    friend ExecPar;
    friend XBT_PUBLIC void intrusive_ptr_release(Exec* e);
    friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e);
-   static xbt::signal<void(ActorPtr)> on_start;
-   static xbt::signal<void(ActorPtr)> on_completion;
+   static xbt::signal<void(Actor const&)> on_start;
+   static xbt::signal<void(Actor const&)> on_completion;
  
    virtual Exec* start() override          = 0;
    virtual double get_remaining_ratio()    = 0;
@@@ -53,7 -55,9 +53,7 @@@
    bool test() override;
  
    ExecPtr set_bound(double bound);
 -  ExecPtr set_name(const std::string& name);
    ExecPtr set_priority(double priority);
 -  ExecPtr set_tracing_category(const std::string& category);
    ExecPtr set_timeout(double timeout);
    Exec* cancel() override;
  
diff --combined src/s4u/s4u_Comm.cpp
@@@ -14,9 -14,9 +14,9 @@@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_com
  
  namespace simgrid {
  namespace s4u {
- xbt::signal<void(ActorPtr)> Comm::on_sender_start;
- xbt::signal<void(ActorPtr)> Comm::on_receiver_start;
- xbt::signal<void(ActorPtr)> Comm::on_completion;
+ xbt::signal<void(Actor const&)> Comm::on_sender_start;
+ xbt::signal<void(Actor const&)> Comm::on_receiver_start;
+ xbt::signal<void(Actor const&)> Comm::on_completion;
  
  Comm::~Comm()
  {
@@@ -108,18 -108,18 +108,18 @@@ CommPtr Comm::set_dst_data(void** buff
  
  Comm* Comm::start()
  {
 -  xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
 +  xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)",
               __FUNCTION__);
  
    if (src_buff_ != nullptr) { // Sender side
-     on_sender_start(Actor::self());
+     on_sender_start(*Actor::self());
      pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
 -                                clean_fun_, copy_data_function_, user_data_, detached_);
 +                                clean_fun_, copy_data_function_, get_user_data(), detached_);
    } else if (dst_buff_ != nullptr) { // Receiver side
      xbt_assert(not detached_, "Receive cannot be detached");
-     on_receiver_start(Actor::self());
+     on_receiver_start(*Actor::self());
      pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
 -                                copy_data_function_, user_data_, rate_);
 +                                copy_data_function_, get_user_data(), rate_);
  
    } else {
      xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
@@@ -136,7 -136,7 +136,7 @@@ Comm* Comm::wait(
  
  /** @brief Block the calling actor until the communication is finished, or until timeout
   *
-  * On timeout, an exception is thrown.
+  * On timeout, an exception is thrown and the communication is invalidated.
   *
   * @param timeout the amount of seconds to wait for the comm termination.
   *                Negative values denote infinite wait times. 0 as a timeout returns immediately. */
@@@ -148,21 -148,21 +148,21 @@@ Comm* Comm::wait_for(double timeout
  
      case State::INITED: // It's not started yet. Do it in one simcall
        if (src_buff_ != nullptr) {
-         on_sender_start(Actor::self());
+         on_sender_start(*Actor::self());
          simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
 -                          copy_data_function_, user_data_, timeout);
 +                          copy_data_function_, get_user_data(), timeout);
  
        } else { // Receiver
-         on_receiver_start(Actor::self());
+         on_receiver_start(*Actor::self());
          simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
 -                          user_data_, timeout, rate_);
 +                          get_user_data(), timeout, rate_);
        }
        state_ = State::FINISHED;
        break;
  
      case State::STARTED:
        simcall_comm_wait(pimpl_, timeout);
-       on_completion(Actor::self());
+       on_completion(*Actor::self());
        state_ = State::FINISHED;
        break;
  
diff --combined src/s4u/s4u_Exec.cpp
@@@ -12,12 -12,12 +12,12 @@@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_exe
  
  namespace simgrid {
  namespace s4u {
- xbt::signal<void(ActorPtr)> Exec::on_start;
- xbt::signal<void(ActorPtr)> Exec::on_completion;
+ xbt::signal<void(Actor const&)> Exec::on_start;
+ xbt::signal<void(Actor const&)> Exec::on_completion;
  
  Exec::Exec()
  {
-   pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(get_name(), get_tracing_category()));
+   pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl());
  }
  
  bool Exec::test()
@@@ -44,7 -44,7 +44,7 @@@ Exec* Exec::wait(
      start();
    simcall_execution_wait(pimpl_);
    state_ = State::FINISHED;
-   on_completion(Actor::self());
+   on_completion(*Actor::self());
    return this;
  }
  
@@@ -90,6 -90,13 +90,6 @@@ ExecPtr Exec::set_timeout(double timeou
    return this;
  }
  
 -ExecPtr Exec::set_name(const std::string& name)
 -{
 -  xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start");
 -  name_ = name;
 -  return this;
 -}
 -
  /** @brief  Change the execution priority, don't you think?
   *
   * An execution with twice the priority will get twice the amount of flops when the resource is shared.
@@@ -103,6 -110,13 +103,6 @@@ ExecPtr Exec::set_priority(double prior
    return this;
  }
  
 -ExecPtr Exec::set_tracing_category(const std::string& category)
 -{
 -  xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start");
 -  tracing_category_ = category;
 -  return this;
 -}
 -
  ///////////// SEQUENTIAL EXECUTIONS ////////
  ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount)
  {
  Exec* ExecSeq::start()
  {
    simix::simcall([this] {
-     boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(flops_amount_, 1. / priority_, bound_);
+     (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
+         .set_name(name_)
+         .set_tracing_category(tracing_category_)
+         .set_priority(1. / priority_)
+         .set_bound(bound_)
+         .set_flops_amount(flops_amount_)
+         .start();
    });
    state_ = State::STARTED;
-   on_start(Actor::self());
+   on_start(*Actor::self());
    return this;
  }
  
@@@ -131,7 -151,7 +137,7 @@@ ExecPtr ExecSeq::set_host(Host* host
    if (state_ == State::STARTED)
      boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->migrate(host);
    host_ = host;
-   boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host;
+   boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->set_host(host);
    return this;
  }
  
@@@ -165,21 -185,23 +171,23 @@@ ExecPar::ExecPar(const std::vector<s4u:
                   const std::vector<double>& bytes_amounts)
      : Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts)
  {
-   // For parallel executions, we need a special host to run the timeout detector.
-   host_                                                                          = hosts.front();
-   boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host_;
  }
  
  Exec* ExecPar::start()
  {
    simix::simcall([this] {
-     boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->set_timeout(timeout_);
-     boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(hosts_, flops_amounts_, bytes_amounts_);
+     (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
+         .set_hosts(hosts_)
+         .set_timeout(timeout_)
+         .set_flops_amounts(flops_amounts_)
+         .set_bytes_amounts(bytes_amounts_)
+         .start();
    });
    state_ = State::STARTED;
-   on_start(Actor::self());
+   on_start(*Actor::self());
    return this;
  }
  double ExecPar::get_remaining_ratio()
  {
    return simix::simcall(