ExecImpl& ExecImpl::set_host(s4u::Host* host)
{
- host_ = host;
+ if (not hosts_.empty())
+ hosts_.clear();
+ hosts_.push_back(host);
+ return *this;
+}
+
+ExecImpl& ExecImpl::set_hosts(const std::vector<s4u::Host*>& hosts)
+{
+ hosts_ = hosts;
return *this;
}
ExecImpl& ExecImpl::set_timeout(double timeout)
{
if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) {
- timeout_detector_ = host_->pimpl_cpu->sleep(timeout);
+ timeout_detector_ = hosts_.front()->pimpl_cpu->sleep(timeout);
timeout_detector_->set_data(this);
}
return *this;
}
-ExecImpl* ExecImpl::start(double flops_amount, double priority, double bound)
+ExecImpl& ExecImpl::set_flops_amount(double flops_amount)
{
- state_ = SIMIX_RUNNING;
- if (not MC_is_active() && not MC_record_replay_is_active()) {
- surf_action_ = host_->pimpl_cpu->execution_start(flops_amount);
- surf_action_->set_data(this);
- surf_action_->set_priority(priority);
- if (bound > 0)
- surf_action_->set_bound(bound);
- }
+ if (not flops_amounts_.empty())
+ flops_amounts_.clear();
+ flops_amounts_.push_back(flops_amount);
+ return *this;
+}
- XBT_DEBUG("Create execute synchro %p: %s", this, get_cname());
- ExecImpl::on_creation(*this);
- return this;
+ExecImpl& ExecImpl::set_flops_amounts(const std::vector<double>& flops_amounts)
+{
+ flops_amounts_ = flops_amounts;
+ return *this;
}
-ExecImpl* ExecImpl::start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
- const std::vector<double>& bytes_amounts)
+ExecImpl& ExecImpl::set_bytes_amounts(const std::vector<double>& bytes_amounts)
+{
+ bytes_amounts_ = bytes_amounts;
+
+ return *this;
+}
+
+ExecImpl* ExecImpl::start()
{
state_ = SIMIX_RUNNING;
- /* set surf's synchro */
if (not MC_is_active() && not MC_record_replay_is_active()) {
- surf_action_ = surf_host_model->execute_parallel(hosts, flops_amounts.data(), bytes_amounts.data(), -1);
- if (surf_action_ != nullptr) {
- surf_action_->set_data(this);
+ if (hosts_.size() == 1) {
+ surf_action_ = hosts_.front()->pimpl_cpu->execution_start(flops_amounts_.front());
+ surf_action_->set_priority(priority_);
+ if (bound_ > 0)
+ surf_action_->set_bound(bound_);
+ } else {
+ surf_action_ = surf_host_model->execute_parallel(hosts_, flops_amounts_.data(), bytes_amounts_.data(), -1);
}
+ surf_action_->set_data(this);
}
- XBT_DEBUG("Create parallel execute synchro %p", this);
+
+ XBT_DEBUG("Create execute synchro %p: %s", this, get_cname());
ExecImpl::on_creation(*this);
return this;
}
+
void ExecImpl::cancel()
{
XBT_VERB("This exec %p is canceled", this);
return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains();
}
-void ExecImpl::set_bound(double bound)
+ExecImpl& ExecImpl::set_bound(double bound)
{
- if (surf_action_)
- surf_action_->set_bound(bound);
+ bound_ = bound;
+ return *this;
}
-void ExecImpl::set_priority(double priority)
+
+ExecImpl& ExecImpl::set_priority(double priority)
{
- if (surf_action_)
- surf_action_->set_priority(priority);
+ priority_ = priority;
+ return *this;
}
void ExecImpl::post()
{
- if (host_ && not host_->is_on()) { /* FIXME: handle resource failure for parallel tasks too */
+ if (hosts_.size() == 1 && not hosts_.front()->is_on()) { /* FIXME: handle resource failure for parallel tasks too */
/* If the host running the synchro failed, notice it. This way, the asking
* process can be killed if it runs on that host itself */
state_ = SIMIX_FAILED;
class XBT_PUBLIC ExecImpl : public ActivityImpl {
resource::Action* timeout_detector_ = nullptr;
+ double priority_ = 1.0;
+ double bound_ = 0.0;
+ std::vector<s4u::Host*> hosts_;
+ std::vector<double> flops_amounts_;
+ std::vector<double> bytes_amounts_;
~ExecImpl();
public:
- ExecImpl* start(double flops_amount, double priority, double bound);
- ExecImpl* start(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
- const std::vector<double>& bytes_amounts);
-
ExecImpl& set_name(const std::string& name);
ExecImpl& set_tracing_category(const std::string& category);
- ExecImpl& set_host(s4u::Host* host);
ExecImpl& set_timeout(double timeout);
+ ExecImpl& set_bound(double bound);
+ ExecImpl& set_priority(double priority);
- void cancel();
- void post() override;
- void finish() override;
+ ExecImpl& set_flops_amount(double flop_amount);
+ ExecImpl& set_host(s4u::Host* host);
+ s4u::Host* get_host() const { return hosts_.front(); }
+
+ ExecImpl& set_flops_amounts(const std::vector<double>& flops_amounts);
+ ExecImpl& set_bytes_amounts(const std::vector<double>& bytes_amounts);
+ ExecImpl& set_hosts(const std::vector<s4u::Host*>& hosts);
+
+ unsigned int get_host_number() const { return hosts_.size(); }
double get_remaining() const;
double get_seq_remaining_ratio();
double get_par_remaining_ratio();
- void set_bound(double bound); // deprecated. To be removed in v3.25
- void set_priority(double priority); // deprecated. To be removed in v3.25
virtual ActivityImpl* migrate(s4u::Host* to);
- /* The host where the execution takes place. nullptr means this is a parallel exec (and only surf knows the hosts) */
- s4u::Host* host_ = nullptr;
+ ExecImpl* start();
+ void cancel();
+ void post() override;
+ void finish() override;
static xbt::signal<void(ExecImpl&)> on_creation;
static xbt::signal<void(ExecImpl const&)> on_completion;
return nullptr;
} else {
activity::ExecImpl* exec = new activity::ExecImpl();
- (*exec).set_name("suspend").set_host(host_).start(0.0, 1.0, 0.0);
+ (*exec).set_name("suspend").set_host(host_).set_flops_amount(0.0).start();
return activity::ExecImplPtr(exec);
}
}
static void on_exec_creation(simgrid::kernel::activity::ExecImpl const& exec)
{
- simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.host_);
+ simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.get_host());
if (vm == nullptr)
return;
static void on_exec_completion(simgrid::kernel::activity::ExecImpl const& exec)
{
- simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.host_);
+ simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec.get_host());
if (vm == nullptr)
return;
});
simgrid::kernel::activity::ExecImpl::on_creation.connect(
[this](simgrid::kernel::activity::ExecImpl const& activity) {
- if (activity.host_ == get_host())
+ if (activity.get_host() == get_host())
pre_task();
});
simgrid::kernel::activity::ExecImpl::on_completion.connect(
[this](simgrid::kernel::activity::ExecImpl const& activity) {
// For more than one host (not yet supported), we can access the host via
// simcalls_.front()->issuer->iface()->get_host()
- if (activity.host_ == get_host() && iteration_running) {
+ if (activity.get_host() == get_host() && iteration_running) {
comp_timer += activity.surf_action_->get_finish_time() - activity.surf_action_->get_start_time();
}
});
// during the recv call. By updating at the beginning of a compute, we can
// fix that. (If the cpu is not idle, this is not required.)
simgrid::kernel::activity::ExecImpl::on_creation.connect([](simgrid::kernel::activity::ExecImpl const& activity) {
- if (activity.host_ != nullptr) { // We only run on one host
- simgrid::s4u::Host* host = activity.host_;
+ if (activity.get_host_number() == 1) { // We only run on one host
+ simgrid::s4u::Host* host = activity.get_host();
simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
if (vm != nullptr)
host = vm->get_pm();
});
simgrid::kernel::activity::ExecImpl::on_creation.connect([](simgrid::kernel::activity::ExecImpl& activity) {
- if (activity.host_ != nullptr) { // We only run on one host
- simgrid::s4u::Host* host = activity.host_;
+ if (activity.get_host_number() == 1) { // We only run on one host
+ simgrid::s4u::Host* host = activity.get_host();
simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
if (vm != nullptr)
host = vm->get_pm();
}
});
simgrid::kernel::activity::ExecImpl::on_completion.connect([](simgrid::kernel::activity::ExecImpl const& activity) {
- if (activity.host_ != nullptr) { // We only run on one host
- simgrid::s4u::Host* host = activity.host_;
+ if (activity.get_host_number() == 1) { // We only run on one host
+ simgrid::s4u::Host* host = activity.get_host();
simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
if (vm != nullptr)
host = vm->get_pm();
static s4u::VirtualMachine* get_vm_from_task(kernel::activity::ActivityImpl const& task)
{
auto* exec = dynamic_cast<kernel::activity::ExecImpl const*>(&task);
- return exec != nullptr ? dynamic_cast<s4u::VirtualMachine*>(exec->host_) : nullptr;
+ return exec != nullptr ? dynamic_cast<s4u::VirtualMachine*>(exec->get_host()) : nullptr;
}
static void add_active_task(kernel::activity::ActivityImpl const& task)
(*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
.set_name(name_)
.set_tracing_category(tracing_category_)
- .start(flops_amount_, 1. / priority_, bound_);
+ .set_priority(1. / priority_)
+ .set_bound(bound_)
+ .set_flops_amount(flops_amount_)
+ .start();
});
state_ = State::STARTED;
on_start(*Actor::self());
if (state_ == State::STARTED)
boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->migrate(host);
host_ = host;
- boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host;
+ boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->set_host(host);
return this;
}
const std::vector<double>& bytes_amounts)
: Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts)
{
- // For parallel executions, we need a special host to run the timeout detector.
- host_ = hosts.front();
- boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_)->host_ = host_;
}
Exec* ExecPar::start()
{
simix::simcall([this] {
- boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->set_timeout(timeout_);
- boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->start(hosts_, flops_amounts_, bytes_amounts_);
+ (*boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_))
+ .set_hosts(hosts_)
+ .set_timeout(timeout_)
+ .set_flops_amounts(flops_amounts_)
+ .set_bytes_amounts(bytes_amounts_)
+ .start();
});
state_ = State::STARTED;
on_start(*Actor::self());
return this;
}
+
double ExecPar::get_remaining_ratio()
{
return simix::simcall(
{
return simgrid::simix::simcall([name, category, flops_amount, priority, bound, host] {
simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl();
- (*exec).set_name(name).set_tracing_category(category).set_host(host).start(flops_amount, priority, bound);
+ (*exec)
+ .set_name(name)
+ .set_tracing_category(category)
+ .set_host(host)
+ .set_priority(priority)
+ .set_bound(bound)
+ .set_flops_amount(flops_amount)
+ .start();
return simgrid::kernel::activity::ExecImplPtr(exec);
});
}
bytes_parallel_amount = std::vector<double>(bytes_amount, bytes_amount + host_nb * host_nb);
return simgrid::simix::simcall([name, hosts, flops_parallel_amount, bytes_parallel_amount, timeout] {
simgrid::kernel::activity::ExecImpl* exec = new simgrid::kernel::activity::ExecImpl();
- (*exec).set_name(name).set_timeout(timeout).start(hosts, flops_parallel_amount, bytes_parallel_amount);
+ (*exec)
+ .set_name(name)
+ .set_hosts(hosts)
+ .set_timeout(timeout)
+ .set_flops_amounts(flops_parallel_amount)
+ .set_bytes_amounts(bytes_parallel_amount)
+ .start();
return simgrid::kernel::activity::ExecImplPtr(exec);
});
}