From: Frederic Suter Date: Fri, 14 Feb 2020 00:03:30 +0000 (+0100) Subject: merge ExecSeq and ExecPar into Exec (simdag-style) X-Git-Tag: v3.26~958 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/cfcdf2e0866cb50cace1db634430d0861af5f0a1 merge ExecSeq and ExecPar into Exec (simdag-style) --- diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index e7268b88f6..ffc82faf26 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -56,8 +56,6 @@ class Exec; using ExecPtr = boost::intrusive_ptr; XBT_PUBLIC void intrusive_ptr_release(Exec* e); XBT_PUBLIC void intrusive_ptr_add_ref(Exec* e); -class ExecSeq; // FIXME: hide this class in implementation -class ExecPar; // FIXME: hide this class in implementation class Host; @@ -197,6 +195,7 @@ class VirtualMachineImpl; typedef simgrid::s4u::Actor s4u_Actor; typedef simgrid::s4u::Barrier s4u_Barrier; typedef simgrid::s4u::Comm s4u_Comm; +typedef simgrid::s4u::Exec s4u_Exec; typedef simgrid::s4u::Host s4u_Host; typedef simgrid::s4u::Link s4u_Link; typedef simgrid::s4u::File s4u_File; @@ -222,6 +221,7 @@ XBT_ATTRIB_DEPRECATED_v330("Please use kernel::activity::State") typedef simgrid typedef struct s4u_Actor s4u_Actor; typedef struct s4u_Barrier s4u_Barrier; typedef struct s4u_Comm s4u_Comm; +typedef struct s4u_Exec s4u_Exec; typedef struct s4u_Host s4u_Host; typedef struct s4u_Link s4u_Link; typedef struct s4u_File s4u_File; @@ -251,6 +251,8 @@ typedef s4u_Barrier* sg_bar_t; typedef const s4u_Barrier* const_sg_bar_t; typedef s4u_Comm* sg_comm_t; typedef const s4u_Comm* const_sg_comm_t; +typedef s4u_Exec* sg_exec_t; +typedef const s4u_Exec* const_sg_exec_t; typedef s4u_ConditionVariable* sg_cond_t; typedef const s4u_ConditionVariable* const_sg_cond_t; typedef s4u_Mailbox* sg_mailbox_t; diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index 22e66db9d7..2572d61eee 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -27,8 +27,6 @@ namespace s4u { class XBT_PUBLIC Activity { friend Comm; friend Exec; - friend ExecSeq; - friend ExecPar; friend Io; protected: diff --git a/include/simgrid/s4u/Exec.hpp b/include/simgrid/s4u/Exec.hpp index e3892918c5..ab98234a07 100644 --- a/include/simgrid/s4u/Exec.hpp +++ b/include/simgrid/s4u/Exec.hpp @@ -33,27 +33,33 @@ class XBT_PUBLIC Exec : public Activity_T { double priority_ = 1.0; double bound_ = 0.0; double timeout_ = 0.0; - -protected: - Exec(); + std::vector flops_amounts_; + std::vector bytes_amounts_; + std::vector hosts_; + bool parallel_ = false; public: + Exec(); virtual ~Exec() = default; #ifndef DOXYGEN Exec(Exec const&) = delete; Exec& operator=(Exec const&) = delete; - friend ExecSeq; - friend ExecPar; #endif static xbt::signal on_start; static xbt::signal on_completion; - Exec* start() override = 0; + Exec* start() override; /** @brief On sequential executions, returns the amount of flops that remain to be done; This cannot be used on * parallel executions. */ - virtual double get_remaining_ratio() const = 0; - virtual ExecPtr set_host(Host* host) = 0; + double get_remaining() const; + double get_remaining_ratio() const; + ExecPtr set_host(Host* host); + ExecPtr set_hosts(const std::vector& hosts); + + ExecPtr set_flops_amount(double flops_amount); + ExecPtr set_flops_amounts(const std::vector& flops_amounts); + ExecPtr set_bytes_amounts(const std::vector& bytes_amounts); Exec* wait() override; Exec* wait_for(double timeout) override; @@ -66,48 +72,14 @@ public: ExecPtr set_bound(double bound); ExecPtr set_priority(double priority); XBT_ATTRIB_DEPRECATED_v329("Please use exec_init(...)->wait_for(timeout)") ExecPtr set_timeout(double timeout); + Exec* cancel() override; Host* get_host() const; unsigned int get_host_number() const; double get_start_time() const; double get_finish_time() const; double get_cost() const; -}; - -class XBT_PUBLIC ExecSeq : public Exec { - double flops_amount_ = 0.0; - - explicit ExecSeq(sg_host_t host, double flops_amount); - -public: - friend XBT_PUBLIC ExecPtr this_actor::exec_init(double flops_amount); - - ~ExecSeq() = default; - - Exec* start() override; - - ExecPtr set_host(Host* host) override; - - double get_remaining() const override; - double get_remaining_ratio() const override; -}; - -class XBT_PUBLIC ExecPar : public Exec { - std::vector hosts_; - std::vector flops_amounts_; - std::vector bytes_amounts_; - explicit ExecPar(const std::vector& hosts, const std::vector& flops_amounts, - const std::vector& bytes_amounts); - ExecPtr set_host(Host*) override { /* parallel exec cannot be moved */ THROW_UNIMPLEMENTED; } - -public: - ~ExecPar() = default; - friend XBT_PUBLIC ExecPtr this_actor::exec_init(const std::vector& hosts, - const std::vector& flops_amounts, - const std::vector& bytes_amounts); - double get_remaining() const override; - double get_remaining_ratio() const override; - Exec* start() override; + bool is_parallel() const { return parallel_; } }; } // namespace s4u diff --git a/src/s4u/s4u_Actor.cpp b/src/s4u/s4u_Actor.cpp index 0ab25b8fb2..70e3706975 100644 --- a/src/s4u/s4u_Actor.cpp +++ b/src/s4u/s4u_Actor.cpp @@ -361,7 +361,9 @@ void parallel_execute(const std::vector& hosts, const std::vectorset_flops_amount(flops_amount)->set_host(get_host()); + return exec; } ExecPtr exec_init(const std::vector& hosts, const std::vector& flops_amounts, @@ -387,7 +389,9 @@ ExecPtr exec_init(const std::vector& hosts, const std::vectorset_flops_amounts(flops_amounts)->set_bytes_amounts(bytes_amounts)->set_hosts(hosts); + return exec; } ExecPtr exec_async(double flops) diff --git a/src/s4u/s4u_Exec.cpp b/src/s4u/s4u_Exec.cpp index 227ab302c3..ca551bfcc7 100644 --- a/src/s4u/s4u_Exec.cpp +++ b/src/s4u/s4u_Exec.cpp @@ -74,6 +74,30 @@ ExecPtr Exec::set_timeout(double timeout) // XBT_ATTRIB_DEPRECATED_v329 return this; } +ExecPtr Exec::set_flops_amount(double flops_amount) +{ + xbt_assert(state_ == State::INITED, "Cannot change the flop_amount of an exec after its start"); + flops_amounts_.assign(1, flops_amount); + Activity::set_remaining(flops_amounts_.front()); + return this; +} + +ExecPtr Exec::set_flops_amounts(const std::vector& flops_amounts) +{ + xbt_assert(state_ == State::INITED, "Cannot change the flops_amounts of an exec after its start"); + flops_amounts_ = flops_amounts; + parallel_ = true; + return this; +} + +ExecPtr Exec::set_bytes_amounts(const std::vector& bytes_amounts) +{ + xbt_assert(state_ == State::INITED, "Cannot change the bytes_amounts of an exec after its start"); + bytes_amounts_ = bytes_amounts; + parallel_ = true; + return this; +} + /** @brief Retrieve the host on which this activity takes place. * If it runs on more than one host, only the first host is returned. */ @@ -111,34 +135,10 @@ ExecPtr Exec::set_priority(double priority) return this; } -///////////// SEQUENTIAL EXECUTIONS //////// -ExecSeq::ExecSeq(sg_host_t host, double flops_amount) : Exec(), flops_amount_(flops_amount) -{ - Activity::set_remaining(flops_amount_); - boost::static_pointer_cast(pimpl_)->set_host(host); -} - -Exec* ExecSeq::start() -{ - kernel::actor::simcall([this] { - (*boost::static_pointer_cast(pimpl_)) - .set_name(get_name()) - .set_tracing_category(get_tracing_category()) - .set_sharing_penalty(1. / priority_) - .set_bound(bound_) - .set_flops_amount(flops_amount_) - .start(); - }); - state_ = State::STARTED; - on_start(*Actor::self(), *this); - return this; -} - -/** @brief Returns whether the state of the exec is finished */ /** @brief Change the host on which this activity takes place. * * The activity cannot be terminated already (but it may be started). */ -ExecPtr ExecSeq::set_host(Host* host) +ExecPtr Exec::set_host(Host* host) { xbt_assert(state_ == State::INITED || state_ == State::STARTED, "Cannot change the host of an exec once it's done (state: %d)", (int)state_); @@ -148,54 +148,64 @@ ExecPtr ExecSeq::set_host(Host* host) return this; } -double ExecSeq::get_remaining() const -{ - return kernel::actor::simcall( - [this]() { return boost::static_pointer_cast(pimpl_)->get_remaining(); }); -} - -/** @brief Returns the ratio of elements that are still to do - * - * The returned value is between 0 (completely done) and 1 (nothing done yet). - */ -double ExecSeq::get_remaining_ratio() const -{ - return kernel::actor::simcall( - [this]() { return boost::static_pointer_cast(pimpl_)->get_seq_remaining_ratio(); }); -} - -///////////// PARALLEL EXECUTIONS //////// -ExecPar::ExecPar(const std::vector& hosts, const std::vector& flops_amounts, - const std::vector& bytes_amounts) - : Exec(), hosts_(hosts), flops_amounts_(flops_amounts), bytes_amounts_(bytes_amounts) +ExecPtr Exec::set_hosts(const std::vector& hosts) { + xbt_assert(state_ == State::INITED, "Cannot change the hosts of an exec once it's done (state: %d)", (int)state_); + hosts_ = hosts; + parallel_ = true; + return this; } -Exec* ExecPar::start() -{ - kernel::actor::simcall([this] { - (*boost::static_pointer_cast(pimpl_)) - .set_hosts(hosts_) - .set_timeout(timeout_) - .set_flops_amounts(flops_amounts_) - .set_bytes_amounts(bytes_amounts_) - .start(); - }); +///////////// SEQUENTIAL EXECUTIONS //////// +Exec* Exec::start() +{ + if (is_parallel()) + kernel::actor::simcall([this] { + (*boost::static_pointer_cast(pimpl_)) + .set_hosts(hosts_) + .set_timeout(timeout_) + .set_flops_amounts(flops_amounts_) + .set_bytes_amounts(bytes_amounts_) + .start(); + }); + else + kernel::actor::simcall([this] { + (*boost::static_pointer_cast(pimpl_)) + .set_name(get_name()) + .set_tracing_category(get_tracing_category()) + .set_sharing_penalty(1. / priority_) + .set_bound(bound_) + .set_flops_amount(flops_amounts_.front()) + .start(); + }); state_ = State::STARTED; on_start(*Actor::self(), *this); return this; } -double ExecPar::get_remaining_ratio() const +double Exec::get_remaining() const { - return kernel::actor::simcall( - [this]() { return boost::static_pointer_cast(pimpl_)->get_par_remaining_ratio(); }); + if (is_parallel()) { + XBT_WARN("Calling get_remaining() on a parallel execution is not allowed. Call get_remaining_ratio() instead."); + return get_remaining_ratio(); + } else + return kernel::actor::simcall( + [this]() { return boost::static_pointer_cast(pimpl_)->get_remaining(); }); } -double ExecPar::get_remaining() const +/** @brief Returns the ratio of elements that are still to do + * + * The returned value is between 0 (completely done) and 1 (nothing done yet). + */ +double Exec::get_remaining_ratio() const { - XBT_WARN("Calling get_remaining() on a parallel execution is not allowed. Call get_remaining_ratio() instead."); - return get_remaining_ratio(); + if (is_parallel()) + return kernel::actor::simcall( + [this]() { return boost::static_pointer_cast(pimpl_)->get_par_remaining_ratio(); }); + else + return kernel::actor::simcall( + [this]() { return boost::static_pointer_cast(pimpl_)->get_seq_remaining_ratio(); }); } + } // namespace s4u } // namespace simgrid