From: Frederic Suter Date: Tue, 5 Mar 2019 22:43:51 +0000 (+0100) Subject: s4u::Exec now has 2 child classes X-Git-Tag: v3_22~172 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/54f9d48567e65bd17b9ad3544b5aad57bef54463 s4u::Exec now has 2 child classes s4u::ExecSeq and s4u::ExecPar side effects: + no more C-style host list in HostImpl and ptaskLO7 + Sequential execution can have a timeout TODO: + get rid of ExecImplPtr in MSG + check and clean --- diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index ef576ad250..b486383474 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -48,6 +48,8 @@ class Exec; typedef boost::intrusive_ptr ExecPtr; XBT_PUBLIC void intrusive_ptr_release(Exec* e); XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e); +class ExecSeq; +class ExecPar; class Host; diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index 0ae0cbb3ff..39570d2748 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -32,6 +32,8 @@ class XBT_PUBLIC Activity { friend XBT_PUBLIC void intrusive_ptr_release(Comm * c); friend XBT_PUBLIC void intrusive_ptr_add_ref(Comm * c); friend simgrid::s4u::Exec; + friend simgrid::s4u::ExecSeq; + friend simgrid::s4u::ExecPar; friend XBT_PUBLIC void intrusive_ptr_release(Exec * e); friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec * e); friend simgrid::s4u::Io; diff --git a/include/simgrid/s4u/Actor.hpp b/include/simgrid/s4u/Actor.hpp index 2b85f6db75..905189049b 100644 --- a/include/simgrid/s4u/Actor.hpp +++ b/include/simgrid/s4u/Actor.hpp @@ -496,6 +496,9 @@ XBT_ATTRIB_DEPRECATED_v325("Please use std::vectors as parameters") XBT_PUBLIC #endif XBT_PUBLIC ExecPtr exec_init(double flops_amounts); +XBT_PUBLIC ExecPtr exec_init(const std::vector& hosts, const std::vector& flops_amounts, + const std::vector& bytes_amounts); + XBT_PUBLIC ExecPtr exec_async(double flops_amounts); /** @brief Returns the actor ID of the current actor. */ diff --git a/include/simgrid/s4u/Exec.hpp b/include/simgrid/s4u/Exec.hpp index 3be13fc212..93651fa798 100644 --- a/include/simgrid/s4u/Exec.hpp +++ b/include/simgrid/s4u/Exec.hpp @@ -8,6 +8,7 @@ #include #include +#include #include @@ -19,52 +20,76 @@ namespace s4u { * They are generated from this_actor::exec_init() or Host::execute(), and can be used to model pools of threads or * similar mechanisms. */ - class XBT_PUBLIC Exec : public Activity { - Host* host_ = nullptr; - double flops_amount_ = 0.0; + std::string name_ = ""; double priority_ = 1.0; double bound_ = 0.0; - std::string name_ = ""; + double timeout_ = 0.0; std::string tracing_category_ = ""; std::atomic_int_fast32_t refcount_{0}; + Host* host_ = nullptr; - explicit Exec(sg_host_t host, double flops_amount); +protected: + Exec(); + virtual ~Exec() = default; public: +#ifndef DOXYGEN + Exec(Exec const&) = delete; + Exec& operator=(Exec const&) = delete; +#endif + + friend simgrid::s4u::ExecSeq; + friend simgrid::s4u::ExecPar; friend XBT_PUBLIC void intrusive_ptr_release(Exec* e); friend XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e); - friend XBT_PUBLIC ExecPtr this_actor::exec_init(double flops_amount); - - ~Exec() = default; - static xbt::signal on_start; static xbt::signal on_completion; - Exec* start() override; + virtual double get_remaining() = 0; + virtual double get_remaining_ratio() = 0; + virtual Exec* start() = 0; + virtual ExecPtr set_host(Host* host) = 0; + Exec* wait() override; Exec* wait_for(double timeout) override; - Exec* cancel() override; bool test() override; - ExecPtr set_priority(double priority); ExecPtr set_bound(double bound); - ExecPtr set_host(Host* host); ExecPtr set_name(std::string name); + ExecPtr set_priority(double priority); ExecPtr set_tracing_category(std::string category); - Host* get_host(); - - double get_remaining() override; - double get_remaining_ratio(); + ExecPtr set_timeout(double timeout); + Exec* cancel() override; -#ifndef DOXYGEN - //////////////// Deprecated functions - XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); } XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_priority()") ExecPtr setPriority(double priority) { return set_priority(priority); } XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_bound()") ExecPtr setBound(double bound) { return set_bound(bound); } + XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); } +}; + +class XBT_PUBLIC ExecSeq : public Exec { + double flops_amount_ = 0.0; + + explicit ExecSeq(sg_host_t host, double flops_amount); + +public: + friend XBT_PUBLIC ExecPtr this_actor::exec_init(double flops_amount); + + ~ExecSeq() = default; + + Exec* start() override; + + ExecPtr set_host(Host* host); + Host* get_host(); + + double get_remaining() override; + double get_remaining_ratio() override; + +#ifndef DOXYGEN + //////////////// Deprecated functions XBT_ATTRIB_DEPRECATED_v323("Please use Exec::set_host()") ExecPtr setHost(Host* host) { return set_host(host); } XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_host()") Host* getHost() { return get_host(); } XBT_ATTRIB_DEPRECATED_v323("Please use Exec::get_remaining_ratio()") double getRemainingRatio() @@ -73,6 +98,25 @@ public: } #endif }; + +class XBT_PUBLIC ExecPar : public Exec { + std::vector hosts_; + std::vector flops_amounts_; + std::vector bytes_amounts_; + explicit ExecPar(const std::vector& hosts, const std::vector& flops_amounts, + const std::vector& bytes_amounts); + ExecPtr set_host(Host* host) { return this; } + +public: + ~ExecPar() = default; + friend XBT_PUBLIC ExecPtr this_actor::exec_init(const std::vector& hosts, + const std::vector& flops_amounts, + const std::vector& bytes_amounts); + double get_remaining() override; + double get_remaining_ratio() override; + Exec* start() override; +}; + } // namespace s4u } // namespace simgrid diff --git a/src/kernel/activity/ExecImpl.cpp b/src/kernel/activity/ExecImpl.cpp index 6645e768ba..b33bd1cf3e 100644 --- a/src/kernel/activity/ExecImpl.cpp +++ b/src/kernel/activity/ExecImpl.cpp @@ -8,6 +8,7 @@ #include "simgrid/modelchecker.h" #include "src/mc/mc_replay.hpp" #include "src/simix/smx_host_private.hpp" +#include "src/surf/HostImpl.hpp" #include "src/surf/cpu_interface.hpp" #include "src/surf/surf_interface.hpp" @@ -51,8 +52,7 @@ namespace simgrid { namespace kernel { namespace activity { -ExecImpl::ExecImpl(std::string name, std::string tracing_category, s4u::Host* host) - : ActivityImpl(std::move(name)), host_(host) +ExecImpl::ExecImpl(std::string name, std::string tracing_category) : ActivityImpl(std::move(name)) { this->state_ = SIMIX_RUNNING; this->set_category(std::move(tracing_category)); @@ -60,16 +60,6 @@ ExecImpl::ExecImpl(std::string name, std::string tracing_category, s4u::Host* ho XBT_DEBUG("Create exec %p", this); } -ExecImpl::ExecImpl(std::string name, std::string tracing_category, s4u::Host* host, double timeout) - : ExecImpl(std::move(name), std::move(tracing_category), nullptr) -{ - if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) { - timeout_detector_ = host->pimpl_cpu->sleep(timeout); - timeout_detector_->set_data(this); - } - XBT_DEBUG("Create exec %p", this); -} - ExecImpl::~ExecImpl() { if (surf_action_) @@ -79,6 +69,21 @@ ExecImpl::~ExecImpl() XBT_DEBUG("Destroy exec %p", this); } +ExecImpl* ExecImpl::set_host(s4u::Host* host) +{ + host_ = host; + return this; +} + +ExecImpl* ExecImpl::set_timeout(double timeout) +{ + if (timeout > 0 && not MC_is_active() && not MC_record_replay_is_active()) { + timeout_detector_ = host_->pimpl_cpu->sleep(timeout); + timeout_detector_->set_data(this); + } + return this; +} + ExecImpl* ExecImpl::start(double flops_amount, double priority, double bound) { if (not MC_is_active() && not MC_record_replay_is_active()) { @@ -94,6 +99,20 @@ ExecImpl* ExecImpl::start(double flops_amount, double priority, double bound) return this; } +ExecImpl* ExecImpl::start(const std::vector& hosts, const std::vector& flops_amounts, + const std::vector& bytes_amounts) +{ + /* set surf's synchro */ + if (not MC_is_active() && not MC_record_replay_is_active()) { + surf_action_ = surf_host_model->execute_parallel(hosts, flops_amounts.data(), bytes_amounts.data(), -1); + if (surf_action_ != nullptr) { + surf_action_->set_data(this); + } + } + XBT_DEBUG("Create parallel execute synchro %p", this); + ExecImpl::on_creation(this); + return this; +} void ExecImpl::cancel() { XBT_VERB("This exec %p is canceled", this); @@ -103,19 +122,18 @@ void ExecImpl::cancel() double ExecImpl::get_remaining() { - xbt_assert(host_ != nullptr, "Calling remains() on a parallel execution is not allowed. " - "We would need to return a vector instead of a scalar. " - "Did you mean remainingRatio() instead?"); return surf_action_ ? surf_action_->get_remains() : 0; } -double ExecImpl::get_remaining_ratio() +double ExecImpl::get_seq_remaining_ratio() +{ + return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains() / surf_action_->get_cost(); +} + +double ExecImpl::get_par_remaining_ratio() { - if (host_ == - nullptr) // parallel task: their remain is already between 0 and 1 (see comment in ExecImpl::get_remaining()) - return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains(); - else // Actually compute the ratio for sequential tasks - return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains() / surf_action_->get_cost(); + // parallel task: their remain is already between 0 and 1 + return (surf_action_ == nullptr) ? 0 : surf_action_->get_remains(); } void ExecImpl::set_bound(double bound) diff --git a/src/kernel/activity/ExecImpl.hpp b/src/kernel/activity/ExecImpl.hpp index 38cedadf04..966438fdce 100644 --- a/src/kernel/activity/ExecImpl.hpp +++ b/src/kernel/activity/ExecImpl.hpp @@ -18,14 +18,19 @@ class XBT_PUBLIC ExecImpl : public ActivityImpl { ~ExecImpl() override; public: - explicit ExecImpl(std::string name, std::string tracing_category, s4u::Host* host); - explicit ExecImpl(std::string name, std::string tracing_category, s4u::Host* host, double timeout); + explicit ExecImpl(std::string name, std::string tracing_category); ExecImpl* start(double flops_amount, double priority, double bound); + ExecImpl* start(const std::vector& hosts, const std::vector& flops_amounts, + const std::vector& bytes_amounts); + + ExecImpl* set_host(s4u::Host* host); + ExecImpl* set_timeout(double timeout); void cancel(); void post() override; void finish() override; double get_remaining(); - double get_remaining_ratio(); + double get_seq_remaining_ratio(); + double get_par_remaining_ratio(); void set_bound(double bound); // deprecated. To be removed in v3.25 void set_priority(double priority); // deprecated. To be removed in v3.25 virtual ActivityImpl* migrate(s4u::Host* to); diff --git a/src/msg/msg_gos.cpp b/src/msg/msg_gos.cpp index 5b3736bc70..7757928aeb 100644 --- a/src/msg/msg_gos.cpp +++ b/src/msg/msg_gos.cpp @@ -6,7 +6,9 @@ #include #include "simgrid/Exception.hpp" +#include "simgrid/s4u/Actor.hpp" #include "simgrid/s4u/Comm.hpp" +#include "simgrid/s4u/Exec.hpp" #include "simgrid/s4u/Mailbox.hpp" #include "src/instr/instr_private.hpp" #include "src/kernel/activity/ExecImpl.hpp" @@ -28,7 +30,6 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task) msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout) { - e_smx_state_t comp_state; msg_error_t status = MSG_OK; xbt_assert((not task->compute) && not task->is_used(), "This task is executed somewhere else. Go fix your code!"); @@ -40,20 +41,21 @@ msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeo try { task->set_used(); + simgrid::s4u::ExecPtr e = + simgrid::s4u::this_actor::exec_init(task->hosts_, task->flops_parallel_amount, task->bytes_parallel_amount) + ->set_name(task->get_name()) + ->set_tracing_category(task->get_tracing_category()) + ->set_timeout(timeout) + ->start(); + task->compute = boost::static_pointer_cast(e->get_impl()); - task->compute = boost::static_pointer_cast(simcall_execution_parallel_start( - std::move(task->get_name()), task->hosts_.size(), task->hosts_.data(), - (task->flops_parallel_amount.empty() ? nullptr : task->flops_parallel_amount.data()), - (task->bytes_parallel_amount.empty() ? nullptr : task->bytes_parallel_amount.data()), -1.0, timeout)); XBT_DEBUG("Parallel execution action created: %p", task->compute.get()); - if (task->has_tracing_category()) - simgrid::simix::simcall([task] { task->compute->set_category(std::move(task->get_tracing_category())); }); - comp_state = simcall_execution_wait(task->compute); + e->wait(); task->set_not_used(); - XBT_DEBUG("Execution task '%s' finished in state %d", task->get_cname(), (int)comp_state); + XBT_DEBUG("Execution task '%s' finished", task->get_cname()); } catch (simgrid::HostFailureException& e) { status = MSG_HOST_FAILURE; } catch (simgrid::TimeoutError& e) { diff --git a/src/msg/msg_task.cpp b/src/msg/msg_task.cpp index d3416fc5ee..5809bdb0f6 100644 --- a/src/msg/msg_task.cpp +++ b/src/msg/msg_task.cpp @@ -403,7 +403,10 @@ double MSG_task_get_remaining_work_ratio(msg_task_t task) { xbt_assert((task != nullptr), "Cannot get information from a nullptr task"); if (task->compute) { // Task in progress - return task->compute->get_remaining_ratio(); + if (task->is_parallel()) + return task->compute->get_par_remaining_ratio(); + else + return task->compute->get_seq_remaining_ratio(); } else { // Task not started (flops_amount is > 0.0) or finished (flops_amount is set to 0.0) return task->flops_amount > 0.0 ? 1.0 : 0.0; diff --git a/src/s4u/s4u_Actor.cpp b/src/s4u/s4u_Actor.cpp index b20d8c6d37..21398a2f2c 100644 --- a/src/s4u/s4u_Actor.cpp +++ b/src/s4u/s4u_Actor.cpp @@ -8,11 +8,13 @@ #include "simgrid/s4u/Actor.hpp" #include "simgrid/s4u/Exec.hpp" #include "simgrid/s4u/Host.hpp" +#include "simgrid/s4u/VirtualMachine.hpp" #include "src/kernel/activity/ExecImpl.hpp" #include "src/simix/smx_host_private.hpp" #include "src/simix/smx_private.hpp" #include "src/surf/HostImpl.hpp" +#include #include XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor, "S4U actors"); @@ -315,15 +317,21 @@ void parallel_execute(const std::vector& hosts, const std::vector(hosts.front())); + xbt_assert(std::all_of(hosts.begin(), hosts.end(), + [is_a_vm](s4u::Host* elm) { + bool tmp_is_a_vm = (nullptr != dynamic_cast(elm)); + return is_a_vm == tmp_is_a_vm; + }), + "parallel_execute: mixing VMs and PMs is not supported (yet)."); + /* checking for infinite values */ + xbt_assert(std::all_of(flops_amounts.begin(), flops_amounts.end(), [](double elm) { return std::isfinite(elm); }), + "flops_amounts comprises infinite values!"); + xbt_assert(std::all_of(bytes_amounts.begin(), bytes_amounts.end(), [](double elm) { return std::isfinite(elm); }), + "flops_amounts comprises infinite values!"); + + exec_init(hosts, flops_amounts, bytes_amounts)->set_timeout(timeout)->wait(); } // deprecated @@ -349,7 +357,13 @@ void parallel_execute(int host_nb, s4u::Host* const* host_list, const double* fl ExecPtr exec_init(double flops_amount) { - return ExecPtr(new Exec(get_host(), flops_amount)); + return ExecPtr(new ExecSeq(get_host(), flops_amount)); +} + +ExecPtr exec_init(const std::vector& hosts, const std::vector& flops_amounts, + const std::vector& bytes_amounts) +{ + return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts)); } ExecPtr exec_async(double flops) diff --git a/src/s4u/s4u_Exec.cpp b/src/s4u/s4u_Exec.cpp index 12397b7aba..0cfb0896ab 100644 --- a/src/s4u/s4u_Exec.cpp +++ b/src/s4u/s4u_Exec.cpp @@ -15,27 +15,27 @@ namespace s4u { simgrid::xbt::signal s4u::Exec::on_start; simgrid::xbt::signal s4u::Exec::on_completion; -Exec::Exec(sg_host_t host, double flops_amount) : Activity(), host_(host), flops_amount_(flops_amount) +Exec::Exec() { - Activity::set_remaining(flops_amount_); - pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(name_, tracing_category_, host_)); + pimpl_ = kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(name_, tracing_category_)); } -Exec* Exec::start() +bool Exec::test() { - simix::simcall([this] { - boost::static_pointer_cast(pimpl_)->start(flops_amount_, 1. / priority_, bound_); - }); - state_ = State::STARTED; - on_start(Actor::self()); - return this; -} + xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED); -Exec* Exec::cancel() -{ - simgrid::simix::simcall([this] { boost::static_pointer_cast(pimpl_)->cancel(); }); - state_ = State::CANCELED; - return this; + if (state_ == State::FINISHED) + return true; + + if (state_ == State::INITED) + this->start(); + + if (simcall_execution_test(pimpl_)) { + state_ = State::FINISHED; + return true; + } + + return false; } Exec* Exec::wait() @@ -53,23 +53,48 @@ Exec* Exec::wait_for(double) THROW_UNIMPLEMENTED; } -/** @brief Returns whether the state of the exec is finished */ -bool Exec::test() +Exec* Exec::cancel() { - xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::FINISHED); + simgrid::simix::simcall([this] { boost::static_pointer_cast(pimpl_)->cancel(); }); + state_ = State::CANCELED; + return this; +} - if (state_ == State::FINISHED) - return true; +void intrusive_ptr_release(simgrid::s4u::Exec* e) +{ + if (e->refcount_.fetch_sub(1, std::memory_order_release) == 1) { + std::atomic_thread_fence(std::memory_order_acquire); + delete e; + } +} - if (state_ == State::INITED) - this->start(); +void intrusive_ptr_add_ref(simgrid::s4u::Exec* e) +{ + e->refcount_.fetch_add(1, std::memory_order_relaxed); +} - if (simcall_execution_test(pimpl_)) { - state_ = State::FINISHED; - return true; - } +/** @brief change the execution bound + * This means changing the maximal amount of flops per second that it may consume, regardless of what the host may + * deliver. Currently, this cannot be changed once the exec started. + */ +ExecPtr Exec::set_bound(double bound) +{ + xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start"); + bound_ = bound; + return this; +} +ExecPtr Exec::set_timeout(double timeout) +{ + xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start"); + timeout_ = timeout; + return this; +} - return false; +ExecPtr Exec::set_name(std::string name) +{ + xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start"); + name_ = std::move(name); + return this; } /** @brief Change the execution priority, don't you think? @@ -85,21 +110,35 @@ ExecPtr Exec::set_priority(double priority) return this; } -/** @brief change the execution bound, ie the maximal amount of flops per second that it may consume, regardless of what - * the host may deliver - * - * Currently, this cannot be changed once the exec started. */ -ExecPtr Exec::set_bound(double bound) +ExecPtr Exec::set_tracing_category(std::string category) { - xbt_assert(state_ == State::INITED, "Cannot change the bound of an exec after its start"); - bound_ = bound; + xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start"); + tracing_category_ = std::move(category); return this; } +///////////// SEQUENTIAL EXECUTIONS //////// +ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount) +{ + Activity::set_remaining(flops_amount_); + boost::static_pointer_cast(pimpl_)->set_host(host); +} + +Exec* ExecSeq::start() +{ + simix::simcall([this] { + boost::static_pointer_cast(pimpl_)->start(flops_amount_, 1. / priority_, bound_); + }); + state_ = State::STARTED; + on_start(Actor::self()); + return this; +} + +/** @brief Returns whether the state of the exec is finished */ /** @brief Change the host on which this activity takes place. * * The activity cannot be terminated already (but it may be started). */ -ExecPtr Exec::set_host(Host* host) +ExecPtr ExecSeq::set_host(Host* host) { xbt_assert(state_ == State::INITED || state_ == State::STARTED, "Cannot change the host of an exec once it's done (state: %d)", (int)state_); @@ -110,28 +149,15 @@ ExecPtr Exec::set_host(Host* host) return this; } -ExecPtr Exec::set_name(std::string name) -{ - xbt_assert(state_ == State::INITED, "Cannot change the name of an exec after its start"); - name_ = std::move(name); - return this; -} - -ExecPtr Exec::set_tracing_category(std::string category) -{ - xbt_assert(state_ == State::INITED, "Cannot change the tracing category of an exec after its start"); - tracing_category_ = std::move(category); - return this; -} /** @brief Retrieve the host on which this activity takes place. */ -Host* Exec::get_host() +Host* ExecSeq::get_host() { return host_; } /** @brief Returns the amount of flops that remain to be done */ -double Exec::get_remaining() +double ExecSeq::get_remaining() { return simgrid::simix::simcall( [this]() { return boost::static_pointer_cast(pimpl_)->get_remaining(); }); @@ -141,24 +167,44 @@ double Exec::get_remaining() * * The returned value is between 0 (completely done) and 1 (nothing done yet). */ -double Exec::get_remaining_ratio() +double ExecSeq::get_remaining_ratio() { return simgrid::simix::simcall([this]() { - return boost::static_pointer_cast(pimpl_)->get_remaining_ratio(); + return boost::static_pointer_cast(pimpl_)->get_seq_remaining_ratio(); }); } -void intrusive_ptr_release(simgrid::s4u::Exec* e) +///////////// PARALLEL EXECUTIONS //////// +ExecPar::ExecPar(const std::vector& hosts, const std::vector& flops_amounts, + const std::vector& bytes_amounts) + : Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts) { - if (e->refcount_.fetch_sub(1, std::memory_order_release) == 1) { - std::atomic_thread_fence(std::memory_order_acquire); - delete e; - } + // For parallel executions, we need a special host to run the timeout detector. + host_ = hosts.front(); + boost::static_pointer_cast(pimpl_)->host_ = host_; } -void intrusive_ptr_add_ref(simgrid::s4u::Exec* e) +Exec* ExecPar::start() { - e->refcount_.fetch_add(1, std::memory_order_relaxed); + simix::simcall([this] { + boost::static_pointer_cast(pimpl_)->set_timeout(timeout_); + boost::static_pointer_cast(pimpl_)->start(hosts_, flops_amounts_, bytes_amounts_); + }); + state_ = State::STARTED; + on_start(Actor::self()); + return this; +} +double ExecPar::get_remaining_ratio() +{ + return simgrid::simix::simcall([this]() { + return boost::static_pointer_cast(pimpl_)->get_par_remaining_ratio(); + }); +} + +double ExecPar::get_remaining() +{ + XBT_WARN("Calling get_remaining() on a parallel execution is not allowed. Call get_remaining_ratio() instead."); + return get_remaining_ratio(); } } // namespace s4u } // namespace simgrid diff --git a/src/s4u/s4u_Host.cpp b/src/s4u/s4u_Host.cpp index 3f5d4d4f0e..ee79d4175d 100644 --- a/src/s4u/s4u_Host.cpp +++ b/src/s4u/s4u_Host.cpp @@ -305,7 +305,7 @@ std::unordered_map const& Host::get_mounted_storages() ExecPtr Host::exec_async(double flops) { - return this_actor::exec_init(flops)->set_host(this); + return this_actor::exec_init(flops); } void Host::execute(double flops) @@ -315,7 +315,7 @@ void Host::execute(double flops) void Host::execute(double flops, double priority) { - this_actor::exec_init(flops)->set_host(this)->set_priority(1 / priority)->start()->wait(); + this_actor::exec_init(flops)->set_priority(1 / priority)->start()->wait(); } } // namespace s4u diff --git a/src/simdag/sd_task.cpp b/src/simdag/sd_task.cpp index 86a8e15fe4..0827cc7d12 100644 --- a/src/simdag/sd_task.cpp +++ b/src/simdag/sd_task.cpp @@ -794,8 +794,8 @@ void SD_task_run(SD_task_t task) XBT_VERB("Executing task '%s'", task->name); /* Beware! The scheduling data are now used by the surf action directly! no copy was done */ - task->surf_action = surf_host_model->execute_parallel(task->allocation->size(), task->allocation->data(), - task->flops_amount, task->bytes_amount, task->rate); + task->surf_action = + surf_host_model->execute_parallel(*task->allocation, task->flops_amount, task->bytes_amount, task->rate); task->surf_action->set_data(task); diff --git a/src/simix/ActorImpl.cpp b/src/simix/ActorImpl.cpp index 2a664f42e5..b5d78bb57c 100644 --- a/src/simix/ActorImpl.cpp +++ b/src/simix/ActorImpl.cpp @@ -344,7 +344,7 @@ activity::ActivityImplPtr ActorImpl::suspend(ActorImpl* issuer) return nullptr; } else { - return activity::ExecImplPtr(new activity::ExecImpl("suspend", "", this->host_))->start(0.0, 1.0, 0.0); + return activity::ExecImplPtr(new activity::ExecImpl("suspend", ""))->set_host(host_)->start(0.0, 1.0, 0.0); } } diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index d488f6c953..a71478001b 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -65,7 +65,6 @@ smx_activity_t simcall_execution_parallel_start(const std::string& name, int hos } xbt_assert(std::isfinite(rate), "rate is not finite!"); - return simgrid::simix::simcall([name, host_nb, host_list, flops_amount, bytes_amount, rate, timeout] { return SIMIX_execution_parallel_start(std::move(name), host_nb, host_list, flops_amount, bytes_amount, rate, timeout); @@ -431,7 +430,8 @@ smx_activity_t simcall_execution_start(const std::string& name, const std::strin { return simgrid::simix::simcall([name, category, flops_amount, priority, bound, host] { return simgrid::kernel::activity::ExecImplPtr( - new simgrid::kernel::activity::ExecImpl(std::move(name), std::move(category), host)) + new simgrid::kernel::activity::ExecImpl(std::move(name), std::move(category))) + ->set_host(host) ->start(flops_amount, priority, bound); }); } diff --git a/src/simix/smx_host.cpp b/src/simix/smx_host.cpp index 42e65026ac..7b69ef0e81 100644 --- a/src/simix/smx_host.cpp +++ b/src/simix/smx_host.cpp @@ -17,12 +17,15 @@ simgrid::kernel::activity::ExecImplPtr SIMIX_execution_parallel_start(std::string name, int host_nb, const sg_host_t* host_list, const double* flops_amount, const double* bytes_amount, double rate, double timeout) { - simgrid::kernel::activity::ExecImplPtr exec = simgrid::kernel::activity::ExecImplPtr( - new simgrid::kernel::activity::ExecImpl(std::move(name), "", host_list[0], timeout)); + simgrid::kernel::activity::ExecImplPtr exec = + simgrid::kernel::activity::ExecImplPtr(new simgrid::kernel::activity::ExecImpl(std::move(name), "")); + std::vector hosts; + for (int i = 0; i < host_nb; i++) + hosts.push_back(host_list[i]); /* set surf's synchro */ if (not MC_is_active() && not MC_record_replay_is_active()) { - exec->surf_action_ = surf_host_model->execute_parallel(host_nb, host_list, flops_amount, bytes_amount, rate); + exec->surf_action_ = surf_host_model->execute_parallel(hosts, flops_amount, bytes_amount, rate); if (exec->surf_action_ != nullptr) { exec->surf_action_->set_data(exec.get()); } diff --git a/src/surf/HostImpl.cpp b/src/surf/HostImpl.cpp index caec7b645c..aa2acd2e02 100644 --- a/src/surf/HostImpl.cpp +++ b/src/surf/HostImpl.cpp @@ -22,7 +22,6 @@ namespace surf { /********* * Model * *********/ - /* Helper function for executeParallelTask */ static inline double has_cost(const double* array, size_t pos) { @@ -32,20 +31,20 @@ static inline double has_cost(const double* array, size_t pos) return -1.0; } -kernel::resource::Action* HostModel::execute_parallel(size_t host_nb, s4u::Host* const* host_list, +kernel::resource::Action* HostModel::execute_parallel(const std::vector host_list, const double* flops_amount, const double* bytes_amount, double rate) { kernel::resource::Action* action = nullptr; - if ((host_nb == 1) && (has_cost(bytes_amount, 0) <= 0) && (has_cost(flops_amount, 0) > 0)) { + if ((host_list.size() == 1) && (has_cost(bytes_amount, 0) <= 0) && (has_cost(flops_amount, 0) > 0)) { action = host_list[0]->pimpl_cpu->execution_start(flops_amount[0]); - } else if ((host_nb == 1) && (has_cost(flops_amount, 0) <= 0)) { + } else if ((host_list.size() == 1) && (has_cost(flops_amount, 0) <= 0)) { action = surf_network_model->communicate(host_list[0], host_list[0], bytes_amount[0], rate); - } else if ((host_nb == 2) && (has_cost(flops_amount, 0) <= 0) && (has_cost(flops_amount, 1) <= 0)) { + } else if ((host_list.size() == 2) && (has_cost(flops_amount, 0) <= 0) && (has_cost(flops_amount, 1) <= 0)) { int nb = 0; double value = 0.0; - for (size_t i = 0; i < host_nb * host_nb; i++) { + for (size_t i = 0; i < host_list.size() * host_list.size(); i++) { if (has_cost(bytes_amount, i) > 0.0) { nb++; value = has_cost(bytes_amount, i); diff --git a/src/surf/HostImpl.hpp b/src/surf/HostImpl.hpp index 1fa3bc729c..d7cfc9bbfa 100644 --- a/src/surf/HostImpl.hpp +++ b/src/surf/HostImpl.hpp @@ -29,7 +29,7 @@ class XBT_PRIVATE HostModel : public kernel::resource::Model { public: HostModel() : Model(Model::UpdateAlgo::FULL) {} - virtual kernel::resource::Action* execute_parallel(size_t host_nb, s4u::Host* const* host_list, + virtual kernel::resource::Action* execute_parallel(const std::vector host_list, const double* flops_amount, const double* bytes_amount, double rate); }; diff --git a/src/surf/ptask_L07.cpp b/src/surf/ptask_L07.cpp index edf4f3045a..f49ad5ee4d 100644 --- a/src/surf/ptask_L07.cpp +++ b/src/surf/ptask_L07.cpp @@ -131,14 +131,14 @@ void HostL07Model::update_actions_state(double /*now*/, double delta) } } -kernel::resource::Action* HostL07Model::execute_parallel(size_t host_nb, s4u::Host* const* host_list, +kernel::resource::Action* HostL07Model::execute_parallel(const std::vector host_list, const double* flops_amount, const double* bytes_amount, double rate) { - return new L07Action(this, host_nb, host_list, flops_amount, bytes_amount, rate); + return new L07Action(this, host_list, flops_amount, bytes_amount, rate); } -L07Action::L07Action(kernel::resource::Model* model, size_t host_nb, s4u::Host* const* host_list, +L07Action::L07Action(kernel::resource::Model* model, const std::vector host_list, const double* flops_amount, const double* bytes_amount, double rate) : CpuAction(model, 1, 0), computationAmount_(flops_amount), communicationAmount_(bytes_amount), rate_(rate) { @@ -147,22 +147,22 @@ L07Action::L07Action(kernel::resource::Model* model, size_t host_nb, s4u::Host* double latency = 0.0; this->set_last_update(); - hostList_.insert(hostList_.end(), host_list, host_list + host_nb); + hostList_.insert(hostList_.end(), host_list.begin(), host_list.end()); if (flops_amount != nullptr) - used_host_nb += std::count_if(flops_amount, flops_amount + host_nb, [](double x) { return x > 0.0; }); + used_host_nb += std::count_if(flops_amount, flops_amount + host_list.size(), [](double x) { return x > 0.0; }); /* Compute the number of affected resources... */ if(bytes_amount != nullptr) { std::unordered_set affected_links; - for (size_t k = 0; k < host_nb * host_nb; k++) { + for (size_t k = 0; k < host_list.size() * host_list.size(); k++) { if (bytes_amount[k] <= 0) continue; double lat = 0.0; std::vector route; - hostList_[k / host_nb]->route_to(hostList_[k % host_nb], route, &lat); + hostList_[k / host_list.size()]->route_to(hostList_[k % host_list.size()], route, &lat); latency = std::max(latency, lat); for (auto const& link : route) @@ -172,26 +172,27 @@ L07Action::L07Action(kernel::resource::Model* model, size_t host_nb, s4u::Host* link_nb = affected_links.size(); } - XBT_DEBUG("Creating a parallel task (%p) with %zu hosts and %zu unique links.", this, host_nb, link_nb); + XBT_DEBUG("Creating a parallel task (%p) with %zu hosts and %zu unique links.", this, host_list.size(), link_nb); latency_ = latency; - set_variable(model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_nb + link_nb)); + set_variable( + model->get_maxmin_system()->variable_new(this, 1.0, (rate > 0 ? rate : -1.0), host_list.size() + link_nb)); if (latency_ > 0) model->get_maxmin_system()->update_variable_weight(get_variable(), 0.0); /* Expand it for the CPUs even if there is nothing to compute, to make sure that it gets expended even if there is no * communication either */ - for (size_t i = 0; i < host_nb; i++) + for (size_t i = 0; i < host_list.size(); i++) model->get_maxmin_system()->expand(host_list[i]->pimpl_cpu->get_constraint(), get_variable(), (flops_amount == nullptr ? 0.0 : flops_amount[i])); if (bytes_amount != nullptr) { - for (size_t k = 0; k < host_nb * host_nb; k++) { + for (size_t k = 0; k < host_list.size() * host_list.size(); k++) { if (bytes_amount[k] <= 0.0) continue; std::vector route; - hostList_[k / host_nb]->route_to(hostList_[k % host_nb], route, nullptr); + hostList_[k / host_list.size()]->route_to(hostList_[k % host_list.size()], route, nullptr); for (auto const& link : route) model->get_maxmin_system()->expand_add(link->get_constraint(), this->get_variable(), bytes_amount[k]); @@ -206,15 +207,13 @@ L07Action::L07Action(kernel::resource::Model* model, size_t host_nb, s4u::Host* kernel::resource::Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate) { - sg_host_t* host_list = new sg_host_t[2](); + std::vector host_list = {src, dst}; double* flops_amount = new double[2](); double* bytes_amount = new double[4](); - host_list[0] = src; - host_list[1] = dst; bytes_amount[1] = size; - kernel::resource::Action* res = hostModel_->execute_parallel(2, host_list, flops_amount, bytes_amount, rate); + kernel::resource::Action* res = hostModel_->execute_parallel(host_list, flops_amount, bytes_amount, rate); static_cast(res)->free_arrays_ = true; return res; } @@ -257,13 +256,13 @@ LinkL07::LinkL07(NetworkL07Model* model, const std::string& name, double bandwid kernel::resource::Action* CpuL07::execution_start(double size) { - sg_host_t host_list[1] = {get_host()}; + std::vector host_list = {get_host()}; double* flops_amount = new double[1](); flops_amount[0] = size; kernel::resource::Action* res = - static_cast(get_model())->hostModel_->execute_parallel(1, host_list, flops_amount, nullptr, -1); + static_cast(get_model())->hostModel_->execute_parallel(host_list, flops_amount, nullptr, -1); static_cast(res)->free_arrays_ = true; return res; } diff --git a/src/surf/ptask_L07.hpp b/src/surf/ptask_L07.hpp index c5781cf02e..c0b2dc3c82 100644 --- a/src/surf/ptask_L07.hpp +++ b/src/surf/ptask_L07.hpp @@ -42,7 +42,7 @@ public: double next_occuring_event(double now) override; void update_actions_state(double now, double delta) override; - kernel::resource::Action* execute_parallel(size_t host_nb, s4u::Host* const* host_list, const double* flops_amount, + kernel::resource::Action* execute_parallel(const std::vector host_list, const double* flops_amount, const double* bytes_amount, double rate) override; }; @@ -114,12 +114,12 @@ public: class L07Action : public CpuAction { friend Action *CpuL07::execution_start(double size); friend Action *CpuL07::sleep(double duration); - friend Action* HostL07Model::execute_parallel(size_t host_nb, s4u::Host* const* host_list, const double* flops_amount, + friend Action* HostL07Model::execute_parallel(const std::vector host_list, const double* flops_amount, const double* bytes_amount, double rate); friend Action* NetworkL07Model::communicate(s4u::Host* src, s4u::Host* dst, double size, double rate); public: - L07Action(kernel::resource::Model* model, size_t host_nb, s4u::Host* const* host_list, const double* flops_amount, + L07Action(kernel::resource::Model* model, const std::vector host_list, const double* flops_amount, const double* bytes_amount, double rate); L07Action(const L07Action&) = delete; L07Action& operator=(const L07Action&) = delete;