Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
rework ExecImpl to have a single start() method
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 28 Mar 2019 23:57:07 +0000 (00:57 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 28 Mar 2019 23:57:07 +0000 (00:57 +0100)
src/kernel/activity/ExecImpl.cpp
src/kernel/activity/ExecImpl.hpp
src/kernel/actor/ActorImpl.cpp
src/plugins/dirty_page_tracking.cpp
src/plugins/host_dvfs.cpp
src/plugins/host_energy.cpp
src/plugins/host_load.cpp
src/plugins/vm/VirtualMachineImpl.cpp
src/s4u/s4u_Exec.cpp
src/simix/libsmx.cpp

index cc5a2cf..07b3c4e 100644 (file)
@@ -60,7 +60,15 @@ ExecImpl::~ExecImpl()
 
 ExecImpl& ExecImpl::set_host(s4u::Host* host)
 {
-  host_ = host;
+  if (not hosts_.empty())
+    hosts_.clear();
+  hosts_.push_back(host);
+  return *this;
+}
+
+ExecImpl& ExecImpl::set_hosts(const std::vector<s4u::Host*>& hosts)
+{
+  hosts_ = hosts;
   return *this;
 }
 
@@ -79,43 +87,53 @@ ExecImpl& ExecImpl::set_tracing_category(const std::string& category)
 ExecImpl& ExecImpl::set_timeout(double timeout)
 {
   if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) {
-    timeout_detector_ = host_->pimpl_cpu->sleep(timeout);
+    timeout_detector_ = hosts_.front()->pimpl_cpu->sleep(timeout);
     timeout_detector_->set_data(this);
   }
   return *this;
 }
 
-ExecImpl* ExecImpl::start(double flops_amount, double priority, double bound)
+ExecImpl& ExecImpl::set_flops_amount(double flops_amount)
 {
-  state_ = SIMIX_RUNNING;
-  if (not MC_is_active() && not MC_record_replay_is_active()) {
-    surf_action_ = host_->pimpl_cpu->execution_start(flops_amount);
-    surf_action_->set_data(this);
-    surf_action_->set_priority(priority);
-    if (bound > 0)
-      surf_action_->set_bound(bound);
-  }
+  if (not flops_amounts_.empty())
+    flops_amounts_.clear();
+  flops_amounts_.push_back(flops_amount);
+  return *this;
+}
 
-  XBT_DEBUG("Create execute synchro %p: %s", this, get_cname());
-  ExecImpl::on_creation(*this);
-  return this;
+ExecImpl& ExecImpl::set_flops_amounts(const std::vector<double>& flops_amounts)
+{
+  flops_amounts_ = flops_amounts;
+  return *this;
 }
 
-ExecImpl* ExecImpl::start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
-                          const std::vector<double>& bytes_amounts)
+ExecImpl& ExecImpl::set_bytes_amounts(const std::vector<double>& bytes_amounts)
+{
+  bytes_amounts_ = bytes_amounts;
+
+  return *this;
+}
+
+ExecImpl* ExecImpl::start()
 {
   state_ = SIMIX_RUNNING;
-  /* set surf's synchro */
   if (not MC_is_active() && not MC_record_replay_is_active()) {
-    surf_action_ = surf_host_model->execute_parallel(hosts, flops_amounts.data(), bytes_amounts.data(), -1);
-    if (surf_action_ != nullptr) {
-      surf_action_->set_data(this);
+    if (hosts_.size() == 1) {
+      surf_action_ = hosts_.front()->pimpl_cpu->execution_start(flops_amounts_.front());
+      surf_action_->set_priority(priority_);
+      if (bound_ > 0)
+        surf_action_->set_bound(bound_);
+    } else {
+      surf_action_ = surf_host_model->execute_parallel(hosts_, flops_amounts_.data(), bytes_amounts_.data(), -1);
     }
+    surf_action_->set_data(this);
   }
-  XBT_DEBUG("Create parallel execute synchro %p", this);
+
+  XBT_DEBUG("Create execute synchro %p: %s", this, get_cname());
   ExecImpl::on_creation(*this);
   return this;
 }
+
 void ExecImpl::cancel()
 {
   XBT_VERB("This exec %p is canceled", this);
@@ -139,20 +157,21 @@ double ExecImpl::get_par_remaining_ratio()
   return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains();
 }
 
-void ExecImpl::set_bound(double bound)
+ExecImpl& ExecImpl::set_bound(double bound)
 {
-  if (surf_action_)
-    surf_action_->set_bound(bound);
+  bound_ = bound;
+  return *this;
 }
-void ExecImpl::set_priority(double priority)
+
+ExecImpl& ExecImpl::set_priority(double priority)
 {
-  if (surf_action_)
-    surf_action_->set_priority(priority);
+  priority_ = priority;
+  return *this;
 }
 
 void ExecImpl::post()
 {
-  if (host_ && not host_->is_on()) { /* FIXME: handle resource failure for parallel tasks too */
+  if (hosts_.size() == 1 && not hosts_.front()->is_on()) { /* FIXME: handle resource failure for parallel tasks too */
     /* If the host running the synchro failed, notice it. This way, the asking
      * process can be killed if it runs on that host itself */
     state_ = SIMIX_FAILED;
index 86f2d0a..80d25bd 100644 (file)
@@ -16,30 +16,38 @@ namespace activity {
 
 class XBT_PUBLIC ExecImpl : public ActivityImpl {
   resource::Action* timeout_detector_ = nullptr;
+  double priority_                    = 1.0;
+  double bound_                       = 0.0;
+  std::vector<s4u::Host*> hosts_;
+  std::vector<double> flops_amounts_;
+  std::vector<double> bytes_amounts_;
   ~ExecImpl();
 
 public:
-  ExecImpl* start(double flops_amount, double priority, double bound);
-  ExecImpl* start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
-                  const std::vector<double>& bytes_amounts);
-
   ExecImpl& set_name(const std::string& name);
   ExecImpl& set_tracing_category(const std::string& category);
-  ExecImpl& set_host(s4u::Host* host);
   ExecImpl& set_timeout(double timeout);
+  ExecImpl& set_bound(double bound);
+  ExecImpl& set_priority(double priority);
 
-  void cancel();
-  void post() override;
-  void finish() override;
+  ExecImpl& set_flops_amount(double flop_amount);
+  ExecImpl& set_host(s4u::Host* host);
+  s4u::Host* get_host() const { return hosts_.front(); }
+
+  ExecImpl& set_flops_amounts(const std::vector<double>& flops_amounts);
+  ExecImpl& set_bytes_amounts(const std::vector<double>& bytes_amounts);
+  ExecImpl& set_hosts(const std::vector<s4u::Host*>& hosts);
+
+  unsigned int get_host_number() const { return hosts_.size(); }
   double get_remaining() const;
   double get_seq_remaining_ratio();
   double get_par_remaining_ratio();
-  void set_bound(double bound);       // deprecated. To be removed in v3.25
-  void set_priority(double priority); // deprecated. To be removed in v3.25
   virtual ActivityImpl* migrate(s4u::Host* to);
 
-  /* The host where the execution takes place. nullptr means this is a parallel exec (and only surf knows the hosts) */
-  s4u::Host* host_ = nullptr;
+  ExecImpl* start();
+  void cancel();
+  void post() override;
+  void finish() override;
 
   static xbt::signal<void(ExecImpl&)> on_creation;
   static xbt::signal<void(ExecImpl const&)> on_completion;
index dabf488..3a5938b 100644 (file)
@@ -372,7 +372,7 @@ activity::ActivityImplPtr ActorImpl::suspend(ActorImpl* issuer)
     return nullptr;
   } else {
     activity::ExecImpl* exec = new activity::ExecImpl();
-    (*exec).set_name("suspend").set_host(host_).start(0.0, 1.0, 0.0);
+    (*exec).set_name("suspend").set_host(host_).set_flops_amount(0.0).start();
     return activity::ExecImplPtr(exec);
   }
 }
index 0298cd9..684c2ad 100644 (file)
@@ -74,7 +74,7 @@ static void on_virtual_machine_creation(simgrid::vm::VirtualMachineImpl& vm)
 
 static void on_exec_creation(simgrid::kernel::activity::ExecImpl const& exec)
 {
-  simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.host_);
+  simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.get_host());
   if (vm == nullptr)
     return;
 
@@ -87,7 +87,7 @@ static void on_exec_creation(simgrid::kernel::activity::ExecImpl const& exec)
 
 static void on_exec_completion(simgrid::kernel::activity::ExecImpl const& exec)
 {
-  simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.host_);
+  simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.get_host());
   if (vm == nullptr)
     return;
 
index 0cca4f8..d94c836 100644 (file)
@@ -295,14 +295,14 @@ public:
     });
     simgrid::kernel::activity::ExecImpl::on_creation.connect(
         [this](simgrid::kernel::activity::ExecImpl const& activity) {
-          if (activity.host_ == get_host())
+          if (activity.get_host() == get_host())
             pre_task();
         });
     simgrid::kernel::activity::ExecImpl::on_completion.connect(
         [this](simgrid::kernel::activity::ExecImpl const& activity) {
           // For more than one host (not yet supported), we can access the host via
           // simcalls_.front()->issuer->iface()->get_host()
-          if (activity.host_ == get_host() && iteration_running) {
+          if (activity.get_host() == get_host() && iteration_running) {
             comp_timer += activity.surf_action_->get_finish_time() - activity.surf_action_->get_start_time();
           }
         });
index 4e695cd..92a6b5c 100644 (file)
@@ -498,8 +498,8 @@ void sg_host_energy_plugin_init()
   // during the recv call. By updating at the beginning of a compute, we can
   // fix that. (If the cpu is not idle, this is not required.)
   simgrid::kernel::activity::ExecImpl::on_creation.connect([](simgrid::kernel::activity::ExecImpl const& activity) {
-    if (activity.host_ != nullptr) { // We only run on one host
-      simgrid::s4u::Host* host         = activity.host_;
+    if (activity.get_host_number() == 1) { // We only run on one host
+      simgrid::s4u::Host* host         = activity.get_host();
       simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
       if (vm != nullptr)
         host = vm->get_pm();
index 1688ef3..ea40824 100644 (file)
@@ -205,8 +205,8 @@ void sg_host_load_plugin_init()
   });
 
   simgrid::kernel::activity::ExecImpl::on_creation.connect([](simgrid::kernel::activity::ExecImpl& activity) {
-    if (activity.host_ != nullptr) { // We only run on one host
-      simgrid::s4u::Host* host         = activity.host_;
+    if (activity.get_host_number() == 1) { // We only run on one host
+      simgrid::s4u::Host* host         = activity.get_host();
       simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
       if (vm != nullptr)
         host = vm->get_pm();
@@ -221,8 +221,8 @@ void sg_host_load_plugin_init()
     }
   });
   simgrid::kernel::activity::ExecImpl::on_completion.connect([](simgrid::kernel::activity::ExecImpl const& activity) {
-    if (activity.host_ != nullptr) { // We only run on one host
-      simgrid::s4u::Host* host         = activity.host_;
+    if (activity.get_host_number() == 1) { // We only run on one host
+      simgrid::s4u::Host* host         = activity.get_host();
       simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
       if (vm != nullptr)
         host = vm->get_pm();
index 912c575..a461371 100644 (file)
@@ -54,7 +54,7 @@ static void host_state_change(s4u::Host const& host)
 static s4u::VirtualMachine* get_vm_from_task(kernel::activity::ActivityImpl const& task)
 {
   auto* exec = dynamic_cast<kernel::activity::ExecImpl const*>(&task);
-  return exec != nullptr ? dynamic_cast<s4u::VirtualMachine*>(exec->host_) : nullptr;
+  return exec != nullptr ? dynamic_cast<s4u::VirtualMachine*>(exec->get_host()) : nullptr;
 }
 
 static void add_active_task(kernel::activity::ActivityImpl const& task)
index 60bdd34..918a705 100644 (file)
@@ -130,7 +130,10 @@ Exec* ExecSeq::start()
     (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
         .set_name(name_)
         .set_tracing_category(tracing_category_)
-        .start(flops_amount_, 1. / priority_, bound_);
+        .set_priority(1. / priority_)
+        .set_bound(bound_)
+        .set_flops_amount(flops_amount_)
+        .start();
   });
   state_ = State::STARTED;
   on_start(*Actor::self());
@@ -148,7 +151,7 @@ ExecPtr ExecSeq::set_host(Host* host)
   if (state_ == State::STARTED)
     boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->migrate(host);
   host_ = host;
-  boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host;
+  boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->set_host(host);
   return this;
 }
 
@@ -182,21 +185,23 @@ ExecPar::ExecPar(const std::vector<s4u::Host*>& hosts, const std::vector<double>
                  const std::vector<double>& bytes_amounts)
     : Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts)
 {
-  // For parallel executions, we need a special host to run the timeout detector.
-  host_                                                                          = hosts.front();
-  boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host_;
 }
 
 Exec* ExecPar::start()
 {
   simix::simcall([this] {
-    boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->set_timeout(timeout_);
-    boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(hosts_, flops_amounts_, bytes_amounts_);
+    (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
+        .set_hosts(hosts_)
+        .set_timeout(timeout_)
+        .set_flops_amounts(flops_amounts_)
+        .set_bytes_amounts(bytes_amounts_)
+        .start();
   });
   state_ = State::STARTED;
   on_start(*Actor::self());
   return this;
 }
+
 double ExecPar::get_remaining_ratio()
 {
   return simix::simcall(
index 9947bf1..2840515 100644 (file)
@@ -385,7 +385,14 @@ smx_activity_t simcall_execution_start(const std::string& name, const std::strin
 {
   return simgrid::simix::simcall([name, category, flops_amount, priority, bound, host] {
     simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl();
-    (*exec).set_name(name).set_tracing_category(category).set_host(host).start(flops_amount, priority, bound);
+    (*exec)
+        .set_name(name)
+        .set_tracing_category(category)
+        .set_host(host)
+        .set_priority(priority)
+        .set_bound(bound)
+        .set_flops_amount(flops_amount)
+        .start();
     return simgrid::kernel::activity::ExecImplPtr(exec);
   });
 }
@@ -424,7 +431,13 @@ smx_activity_t simcall_execution_parallel_start(const std::string& name, int hos
     bytes_parallel_amount = std::vector<double>(bytes_amount, bytes_amount + host_nb * host_nb);
   return simgrid::simix::simcall([name, hosts, flops_parallel_amount, bytes_parallel_amount, timeout] {
     simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl();
-    (*exec).set_name(name).set_timeout(timeout).start(hosts, flops_parallel_amount, bytes_parallel_amount);
+    (*exec)
+        .set_name(name)
+        .set_hosts(hosts)
+        .set_timeout(timeout)
+        .set_flops_amounts(flops_parallel_amount)
+        .set_bytes_amounts(bytes_parallel_amount)
+        .start();
     return simgrid::kernel::activity::ExecImplPtr(exec);
   });
 }