From 08668c93eec47c236766aae49bc39e3c772180f9 Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Fri, 29 Mar 2019 00:57:07 +0100 Subject: [PATCH] rework ExecImpl to have a single start() method --- src/kernel/activity/ExecImpl.cpp | 75 +++++++++++++++++---------- src/kernel/activity/ExecImpl.hpp | 32 +++++++----- src/kernel/actor/ActorImpl.cpp | 2 +- src/plugins/dirty_page_tracking.cpp | 4 +- src/plugins/host_dvfs.cpp | 4 +- src/plugins/host_energy.cpp | 4 +- src/plugins/host_load.cpp | 8 +-- src/plugins/vm/VirtualMachineImpl.cpp | 2 +- src/s4u/s4u_Exec.cpp | 19 ++++--- src/simix/libsmx.cpp | 17 +++++- 10 files changed, 106 insertions(+), 61 deletions(-) diff --git a/src/kernel/activity/ExecImpl.cpp b/src/kernel/activity/ExecImpl.cpp index cc5a2cf7eb..07b3c4eb81 100644 --- a/src/kernel/activity/ExecImpl.cpp +++ b/src/kernel/activity/ExecImpl.cpp @@ -60,7 +60,15 @@ ExecImpl::~ExecImpl() ExecImpl& ExecImpl::set_host(s4u::Host* host) { - host_ = host; + if (not hosts_.empty()) + hosts_.clear(); + hosts_.push_back(host); + return *this; +} + +ExecImpl& ExecImpl::set_hosts(const std::vector& hosts) +{ + hosts_ = hosts; return *this; } @@ -79,43 +87,53 @@ ExecImpl& ExecImpl::set_tracing_category(const std::string& category) ExecImpl& ExecImpl::set_timeout(double timeout) { if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) { - timeout_detector_ = host_->pimpl_cpu->sleep(timeout); + timeout_detector_ = hosts_.front()->pimpl_cpu->sleep(timeout); timeout_detector_->set_data(this); } return *this; } -ExecImpl* ExecImpl::start(double flops_amount, double priority, double bound) +ExecImpl& ExecImpl::set_flops_amount(double flops_amount) { - state_ = SIMIX_RUNNING; - if (not MC_is_active() && not MC_record_replay_is_active()) { - surf_action_ = host_->pimpl_cpu->execution_start(flops_amount); - surf_action_->set_data(this); - surf_action_->set_priority(priority); - if (bound > 0) - surf_action_->set_bound(bound); - } + if (not flops_amounts_.empty()) + flops_amounts_.clear(); + flops_amounts_.push_back(flops_amount); + return *this; +} - XBT_DEBUG("Create execute synchro %p: %s", this, get_cname()); - ExecImpl::on_creation(*this); - return this; +ExecImpl& ExecImpl::set_flops_amounts(const std::vector& flops_amounts) +{ + flops_amounts_ = flops_amounts; + return *this; } -ExecImpl* ExecImpl::start(const std::vector& hosts, const std::vector& flops_amounts, - const std::vector& bytes_amounts) +ExecImpl& ExecImpl::set_bytes_amounts(const std::vector& bytes_amounts) +{ + bytes_amounts_ = bytes_amounts; + + return *this; +} + +ExecImpl* ExecImpl::start() { state_ = SIMIX_RUNNING; - /* set surf's synchro */ if (not MC_is_active() && not MC_record_replay_is_active()) { - surf_action_ = surf_host_model->execute_parallel(hosts, flops_amounts.data(), bytes_amounts.data(), -1); - if (surf_action_ != nullptr) { - surf_action_->set_data(this); + if (hosts_.size() == 1) { + surf_action_ = hosts_.front()->pimpl_cpu->execution_start(flops_amounts_.front()); + surf_action_->set_priority(priority_); + if (bound_ > 0) + surf_action_->set_bound(bound_); + } else { + surf_action_ = surf_host_model->execute_parallel(hosts_, flops_amounts_.data(), bytes_amounts_.data(), -1); } + surf_action_->set_data(this); } - XBT_DEBUG("Create parallel execute synchro %p", this); + + XBT_DEBUG("Create execute synchro %p: %s", this, get_cname()); ExecImpl::on_creation(*this); return this; } + void ExecImpl::cancel() { XBT_VERB("This exec %p is canceled", this); @@ -139,20 +157,21 @@ double ExecImpl::get_par_remaining_ratio() return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains(); } -void ExecImpl::set_bound(double bound) +ExecImpl& ExecImpl::set_bound(double bound) { - if (surf_action_) - surf_action_->set_bound(bound); + bound_ = bound; + return *this; } -void ExecImpl::set_priority(double priority) + +ExecImpl& ExecImpl::set_priority(double priority) { - if (surf_action_) - surf_action_->set_priority(priority); + priority_ = priority; + return *this; } void ExecImpl::post() { - if (host_ && not host_->is_on()) { /* FIXME: handle resource failure for parallel tasks too */ + if (hosts_.size() == 1 && not hosts_.front()->is_on()) { /* FIXME: handle resource failure for parallel tasks too */ /* If the host running the synchro failed, notice it. This way, the asking * process can be killed if it runs on that host itself */ state_ = SIMIX_FAILED; diff --git a/src/kernel/activity/ExecImpl.hpp b/src/kernel/activity/ExecImpl.hpp index 86f2d0ada2..80d25bdd63 100644 --- a/src/kernel/activity/ExecImpl.hpp +++ b/src/kernel/activity/ExecImpl.hpp @@ -16,30 +16,38 @@ namespace activity { class XBT_PUBLIC ExecImpl : public ActivityImpl { resource::Action* timeout_detector_ = nullptr; + double priority_ = 1.0; + double bound_ = 0.0; + std::vector hosts_; + std::vector flops_amounts_; + std::vector bytes_amounts_; ~ExecImpl(); public: - ExecImpl* start(double flops_amount, double priority, double bound); - ExecImpl* start(const std::vector& hosts, const std::vector& flops_amounts, - const std::vector& bytes_amounts); - ExecImpl& set_name(const std::string& name); ExecImpl& set_tracing_category(const std::string& category); - ExecImpl& set_host(s4u::Host* host); ExecImpl& set_timeout(double timeout); + ExecImpl& set_bound(double bound); + ExecImpl& set_priority(double priority); - void cancel(); - void post() override; - void finish() override; + ExecImpl& set_flops_amount(double flop_amount); + ExecImpl& set_host(s4u::Host* host); + s4u::Host* get_host() const { return hosts_.front(); } + + ExecImpl& set_flops_amounts(const std::vector& flops_amounts); + ExecImpl& set_bytes_amounts(const std::vector& bytes_amounts); + ExecImpl& set_hosts(const std::vector& hosts); + + unsigned int get_host_number() const { return hosts_.size(); } double get_remaining() const; double get_seq_remaining_ratio(); double get_par_remaining_ratio(); - void set_bound(double bound); // deprecated. To be removed in v3.25 - void set_priority(double priority); // deprecated. To be removed in v3.25 virtual ActivityImpl* migrate(s4u::Host* to); - /* The host where the execution takes place. nullptr means this is a parallel exec (and only surf knows the hosts) */ - s4u::Host* host_ = nullptr; + ExecImpl* start(); + void cancel(); + void post() override; + void finish() override; static xbt::signal on_creation; static xbt::signal on_completion; diff --git a/src/kernel/actor/ActorImpl.cpp b/src/kernel/actor/ActorImpl.cpp index dabf488a38..3a5938bfb0 100644 --- a/src/kernel/actor/ActorImpl.cpp +++ b/src/kernel/actor/ActorImpl.cpp @@ -372,7 +372,7 @@ activity::ActivityImplPtr ActorImpl::suspend(ActorImpl* issuer) return nullptr; } else { activity::ExecImpl* exec = new activity::ExecImpl(); - (*exec).set_name("suspend").set_host(host_).start(0.0, 1.0, 0.0); + (*exec).set_name("suspend").set_host(host_).set_flops_amount(0.0).start(); return activity::ExecImplPtr(exec); } } diff --git a/src/plugins/dirty_page_tracking.cpp b/src/plugins/dirty_page_tracking.cpp index 0298cd9f20..684c2ad8ba 100644 --- a/src/plugins/dirty_page_tracking.cpp +++ b/src/plugins/dirty_page_tracking.cpp @@ -74,7 +74,7 @@ static void on_virtual_machine_creation(simgrid::vm::VirtualMachineImpl& vm) static void on_exec_creation(simgrid::kernel::activity::ExecImpl const& exec) { - simgrid::s4u::VirtualMachine* vm = dynamic_cast(exec.host_); + simgrid::s4u::VirtualMachine* vm = dynamic_cast(exec.get_host()); if (vm == nullptr) return; @@ -87,7 +87,7 @@ static void on_exec_creation(simgrid::kernel::activity::ExecImpl const& exec) static void on_exec_completion(simgrid::kernel::activity::ExecImpl const& exec) { - simgrid::s4u::VirtualMachine* vm = dynamic_cast(exec.host_); + simgrid::s4u::VirtualMachine* vm = dynamic_cast(exec.get_host()); if (vm == nullptr) return; diff --git a/src/plugins/host_dvfs.cpp b/src/plugins/host_dvfs.cpp index 0cca4f8c2d..d94c836dda 100644 --- a/src/plugins/host_dvfs.cpp +++ b/src/plugins/host_dvfs.cpp @@ -295,14 +295,14 @@ public: }); simgrid::kernel::activity::ExecImpl::on_creation.connect( [this](simgrid::kernel::activity::ExecImpl const& activity) { - if (activity.host_ == get_host()) + if (activity.get_host() == get_host()) pre_task(); }); simgrid::kernel::activity::ExecImpl::on_completion.connect( [this](simgrid::kernel::activity::ExecImpl const& activity) { // For more than one host (not yet supported), we can access the host via // simcalls_.front()->issuer->iface()->get_host() - if (activity.host_ == get_host() && iteration_running) { + if (activity.get_host() == get_host() && iteration_running) { comp_timer += activity.surf_action_->get_finish_time() - activity.surf_action_->get_start_time(); } }); diff --git a/src/plugins/host_energy.cpp b/src/plugins/host_energy.cpp index 4e695cd2bc..92a6b5c157 100644 --- a/src/plugins/host_energy.cpp +++ b/src/plugins/host_energy.cpp @@ -498,8 +498,8 @@ void sg_host_energy_plugin_init() // during the recv call. By updating at the beginning of a compute, we can // fix that. (If the cpu is not idle, this is not required.) simgrid::kernel::activity::ExecImpl::on_creation.connect([](simgrid::kernel::activity::ExecImpl const& activity) { - if (activity.host_ != nullptr) { // We only run on one host - simgrid::s4u::Host* host = activity.host_; + if (activity.get_host_number() == 1) { // We only run on one host + simgrid::s4u::Host* host = activity.get_host(); simgrid::s4u::VirtualMachine* vm = dynamic_cast(host); if (vm != nullptr) host = vm->get_pm(); diff --git a/src/plugins/host_load.cpp b/src/plugins/host_load.cpp index 1688ef3d9a..ea40824614 100644 --- a/src/plugins/host_load.cpp +++ b/src/plugins/host_load.cpp @@ -205,8 +205,8 @@ void sg_host_load_plugin_init() }); simgrid::kernel::activity::ExecImpl::on_creation.connect([](simgrid::kernel::activity::ExecImpl& activity) { - if (activity.host_ != nullptr) { // We only run on one host - simgrid::s4u::Host* host = activity.host_; + if (activity.get_host_number() == 1) { // We only run on one host + simgrid::s4u::Host* host = activity.get_host(); simgrid::s4u::VirtualMachine* vm = dynamic_cast(host); if (vm != nullptr) host = vm->get_pm(); @@ -221,8 +221,8 @@ void sg_host_load_plugin_init() } }); simgrid::kernel::activity::ExecImpl::on_completion.connect([](simgrid::kernel::activity::ExecImpl const& activity) { - if (activity.host_ != nullptr) { // We only run on one host - simgrid::s4u::Host* host = activity.host_; + if (activity.get_host_number() == 1) { // We only run on one host + simgrid::s4u::Host* host = activity.get_host(); simgrid::s4u::VirtualMachine* vm = dynamic_cast(host); if (vm != nullptr) host = vm->get_pm(); diff --git a/src/plugins/vm/VirtualMachineImpl.cpp b/src/plugins/vm/VirtualMachineImpl.cpp index 912c575dc7..a46137176d 100644 --- a/src/plugins/vm/VirtualMachineImpl.cpp +++ b/src/plugins/vm/VirtualMachineImpl.cpp @@ -54,7 +54,7 @@ static void host_state_change(s4u::Host const& host) static s4u::VirtualMachine* get_vm_from_task(kernel::activity::ActivityImpl const& task) { auto* exec = dynamic_cast(&task); - return exec != nullptr ? dynamic_cast(exec->host_) : nullptr; + return exec != nullptr ? dynamic_cast(exec->get_host()) : nullptr; } static void add_active_task(kernel::activity::ActivityImpl const& task) diff --git a/src/s4u/s4u_Exec.cpp b/src/s4u/s4u_Exec.cpp index 60bdd349af..918a705e11 100644 --- a/src/s4u/s4u_Exec.cpp +++ b/src/s4u/s4u_Exec.cpp @@ -130,7 +130,10 @@ Exec* ExecSeq::start() (*boost::static_pointer_cast(pimpl_)) .set_name(name_) .set_tracing_category(tracing_category_) - .start(flops_amount_, 1. / priority_, bound_); + .set_priority(1. / priority_) + .set_bound(bound_) + .set_flops_amount(flops_amount_) + .start(); }); state_ = State::STARTED; on_start(*Actor::self()); @@ -148,7 +151,7 @@ ExecPtr ExecSeq::set_host(Host* host) if (state_ == State::STARTED) boost::static_pointer_cast(pimpl_)->migrate(host); host_ = host; - boost::static_pointer_cast(pimpl_)->host_ = host; + boost::static_pointer_cast(pimpl_)->set_host(host); return this; } @@ -182,21 +185,23 @@ ExecPar::ExecPar(const std::vector& hosts, const std::vector const std::vector& bytes_amounts) : Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts) { - // For parallel executions, we need a special host to run the timeout detector. - host_ = hosts.front(); - boost::static_pointer_cast(pimpl_)->host_ = host_; } Exec* ExecPar::start() { simix::simcall([this] { - boost::static_pointer_cast(pimpl_)->set_timeout(timeout_); - boost::static_pointer_cast(pimpl_)->start(hosts_, flops_amounts_, bytes_amounts_); + (*boost::static_pointer_cast(pimpl_)) + .set_hosts(hosts_) + .set_timeout(timeout_) + .set_flops_amounts(flops_amounts_) + .set_bytes_amounts(bytes_amounts_) + .start(); }); state_ = State::STARTED; on_start(*Actor::self()); return this; } + double ExecPar::get_remaining_ratio() { return simix::simcall( diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index 9947bf18cb..2840515913 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -385,7 +385,14 @@ smx_activity_t simcall_execution_start(const std::string& name, const std::strin { return simgrid::simix::simcall([name, category, flops_amount, priority, bound, host] { simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl(); - (*exec).set_name(name).set_tracing_category(category).set_host(host).start(flops_amount, priority, bound); + (*exec) + .set_name(name) + .set_tracing_category(category) + .set_host(host) + .set_priority(priority) + .set_bound(bound) + .set_flops_amount(flops_amount) + .start(); return simgrid::kernel::activity::ExecImplPtr(exec); }); } @@ -424,7 +431,13 @@ smx_activity_t simcall_execution_parallel_start(const std::string& name, int hos bytes_parallel_amount = std::vector(bytes_amount, bytes_amount + host_nb * host_nb); return simgrid::simix::simcall([name, hosts, flops_parallel_amount, bytes_parallel_amount, timeout] { simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl(); - (*exec).set_name(name).set_timeout(timeout).start(hosts, flops_parallel_amount, bytes_parallel_amount); + (*exec) + .set_name(name) + .set_hosts(hosts) + .set_timeout(timeout) + .set_flops_amounts(flops_parallel_amount) + .set_bytes_amounts(bytes_parallel_amount) + .start(); return simgrid::kernel::activity::ExecImplPtr(exec); }); } -- 2.20.1