From 3cf7d641b2d601a49f505e817dc50fdd97c2d975 Mon Sep 17 00:00:00 2001 From: Adrien Gougeon Date: Tue, 11 Apr 2023 15:48:13 +0200 Subject: [PATCH] several updates to plugin operation --- include/simgrid/plugins/operation.hpp | 27 ++- src/plugins/operation.cpp | 261 ++++++++++++++++++++++---- 2 files changed, 243 insertions(+), 45 deletions(-) diff --git a/include/simgrid/plugins/operation.hpp b/include/simgrid/plugins/operation.hpp index 43c76a3e03..0ff852785b 100644 --- a/include/simgrid/plugins/operation.hpp +++ b/include/simgrid/plugins/operation.hpp @@ -29,9 +29,9 @@ private: static bool inited_; std::set successors_ = {}; std::map predecessors_ = {}; - std::function end_func = [](Operation*) {}; void add_predecessor(Operation* predecessor); + void remove_predecessor(Operation* predecessor); bool ready_to_run() const; void receive(Operation* source); void complete(); @@ -39,22 +39,27 @@ private: protected: std::string name_; double amount_; - int iteration_limit_ = -1; - int iteration_count_ = 0; + int queued_execs_ = 0; + int count_ = 0; bool working_ = false; simgrid::s4u::ActivityPtr current_activity_; + std::function end_func_ = [](Operation*) {}; + std::function start_func_ = [](Operation*) {}; Operation(const std::string& name, double amount); ~Operation() = default; - void consume(); - void call_end(); + virtual void execute() = 0; public: static void init(); std::string get_name(); - void set_iteration_limit(unsigned int n); + void enqueue_execs(int n); + void set_amount(double amount); void add_successor(OperationPtr op); + void remove_successor(OperationPtr op); + void on_start(std::function func); void on_end(std::function func); - virtual void execute() = 0; + int get_count(); + }; class ExecOp : public Operation { @@ -62,10 +67,11 @@ private: simgrid::s4u::Host* host_; ExecOp(const std::string& name, double flops, simgrid::s4u::Host* host); + void execute(); public: static ExecOpPtr create(const std::string& name, double flops, simgrid::s4u::Host* host); - void execute(); + void set_host(simgrid::s4u::Host* host); }; class CommOp : public Operation { @@ -74,11 +80,14 @@ private: simgrid::s4u::Host* destination_; CommOp(const std::string& name, double bytes, simgrid::s4u::Host* source, simgrid::s4u::Host* destination); + void execute(); public: static CommOpPtr create(const std::string& name, double bytes, simgrid::s4u::Host* source, simgrid::s4u::Host* destination); - void execute(); + void set_source(simgrid::s4u::Host* source); + void set_destination(simgrid::s4u::Host* destination); + }; } // namespace simgrid::plugins #endif \ No newline at end of file diff --git a/src/plugins/operation.cpp b/src/plugins/operation.cpp index 5ae2e2c3fd..c3996bb632 100644 --- a/src/plugins/operation.cpp +++ b/src/plugins/operation.cpp @@ -2,10 +2,27 @@ #include #include #include +#include #include "src/simgrid/module.hpp" + SIMGRID_REGISTER_PLUGIN(operation, "Battery management", nullptr) +/** @defgroup plugin_operation plugin_operation Plugin Operation + + @beginrst + +This is the operation plugin, enabling management of Operations. +To activate this plugin, first call :cpp:func:`Operation::init`. + +Operations are designed to represent workflows, i.e, graphs of Operations. +Operations can only be instancied using either +:cpp:func:`simgrid::plugins::ExecOp::create` or :cpp:func:`simgrid::plugins::CommOp::create` +An ExecOp is an Execution Operation. Its underlying Activity is an :ref:`Exec `. +A CommOp is a Communication Operation. Its underlying Activity is a :ref:`Comm `. + + @endrst + */ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Operation, kernel, "Logging specific to the operation plugin"); namespace simgrid::plugins { @@ -13,53 +30,93 @@ Operation::Operation(const std::string& name, double amount) : name_(name), amou std::string Operation::get_name() { - return this->name_; + return name_; } +/** + * @param predecessor The Operation to add. + * @brief Add a predecessor to this Operation. + */ void Operation::add_predecessor(Operation* predecessor) { - this->predecessors_[predecessor] = 0; + if (predecessors_.find(predecessor) == predecessors_.end()) + simgrid::kernel::actor::simcall_answered([this, predecessor] { + predecessors_[predecessor] = 0; + }); +} + +/** + * @param predecessor The Operation to remove. + * @brief Remove a predecessor from this Operation. + */ +void Operation::remove_predecessor(Operation* predecessor) +{ + simgrid::kernel::actor::simcall_answered([this, predecessor] { + predecessors_.erase(predecessor); + }); } +/** + * @brief Return True if the Operation can start a new Activity. + * @note The Operation is ready if not already doing something and there is at least one execution waiting in queue. + */ bool Operation::ready_to_run() const { - if (this->working_ or (this->iteration_count_ != -1 and this->iteration_count_ >= this->iteration_limit_)) + if (working_ or queued_execs_ <= 0) return false; - for (auto const& [key, val] : this->predecessors_) - if (val < 1) - return false; - return true; + else + return true; } +/** + * @param source The sender. + * @brief Receive a token from another Operation. + * @note Check upon reception if the Operation has received a token from each of its predecessors, + * and in this case consumes those tokens and enqueue an execution. + */ void Operation::receive(Operation* source) { - auto it = this->predecessors_.find(source); + XBT_DEBUG("Operation %s received a token from %s", name_.c_str(), source->name_.c_str()); + auto it = predecessors_.find(source); + simgrid::kernel::actor::simcall_answered([this, it] { it->second++; - if (this->ready_to_run()) - this->execute(); + bool enough_tokens = true; + for (auto const& [key, val] : predecessors_) + if (val < 1) { + enough_tokens = false; + break; + } + if (enough_tokens) { + for (auto [key, val] : predecessors_) + val--; + enqueue_execs(1); + } + }); } +/** + * @brief Operation routine when finishing an execution. + * @note Set its working status as false. Add 1 to its count of finished executions. + * Call the on_end() func. Send a token to each of its successors. + * Start a new execution if possible. + */ void Operation::complete() { - working_ = false; - call_end(); - for (auto const& op : this->successors_) + simgrid::kernel::actor::simcall_answered([this] { + working_ = false; + count_++; + }); + end_func_(this); + for (auto const& op : successors_) op->receive(this); if (ready_to_run()) execute(); } -void Operation::consume() -{ - for (auto const& [key, val] : predecessors_) - predecessors_[key] = predecessors_[key] > 0 ? predecessors_[key] - 1 : 0; -} - -void Operation::call_end() -{ - end_func(this); -} - +/** @ingroup plugin_operation + * @brief Init the Operation plugin. + * @note Add a completion callback to all Activities to call Operation::complete(). + */ void Operation::init() { if (Operation::inited_) @@ -71,35 +128,116 @@ void Operation::init() }); } -void Operation::set_iteration_limit(unsigned int n) +/** @ingroup plugin_operation + * @param n The number of executions to enqueue. + * @brief Enqueue executions. + * @note Immediatly starts an execution if possible. + */ +void Operation::enqueue_execs(int n) +{ + simgrid::kernel::actor::simcall_answered([this, n] { + queued_execs_ += n; + if (ready_to_run()) + execute(); + }); +} + +/** @ingroup plugin_operation + * @param amount The amount to set. + * @brief Set the amout of work to do. + * @note Amount in flop for ExecOp and in bytes for CommOp. + */ +void Operation::set_amount(double amount) +{ + simgrid::kernel::actor::simcall_answered([this, amount] { + amount_ = amount; + }); +} + +/** @ingroup plugin_operation + * @param successor The Operation to add. + * @brief Add a successor to this Operation. + * @note It also adds this as a predecessor of successor. + */ +void Operation::add_successor(OperationPtr successor) { - iteration_limit_ = n; + simgrid::kernel::actor::simcall_answered([this, successor] { + successors_.insert(successor.get()); + }); + successor->add_predecessor(this); } -void Operation::add_successor(OperationPtr op) +/** @ingroup plugin_operation + * @param successor The Operation to remove. + * @brief Remove a successor from this Operation. + * @note It also remove this from the predecessors of successor. + */ +void Operation::remove_successor(OperationPtr successor) { - successors_.insert(op.get()); - op->add_predecessor(this); + simgrid::kernel::actor::simcall_answered([this, successor] { + successors_.erase(successor.get()); + }); + successor->remove_predecessor(this); +} + +/** @ingroup plugin_operation + * @param func The function to set. + * @brief Set a function to be called before each execution. + * @note The function is called before the underlying Activity starts. + */ +void Operation::on_start(std::function func) +{ + simgrid::kernel::actor::simcall_answered([this, func] { + start_func_ = func; + }); } +/** @ingroup plugin_operation + * @param func The function to set. + * @brief Set a function to be called after each execution. + * @note The function is called after the underlying Activity ends, but before sending tokens to successors. + */ void Operation::on_end(std::function func) { - end_func = func; + simgrid::kernel::actor::simcall_answered([this,func] { + end_func_ = func; + }); +} + +/** @ingroup plugin_operation + * @brief Return the number of completed executions. + */ +int Operation::get_count() +{ + return count_; } +/** + * @brief Default constructor. + */ ExecOp::ExecOp(const std::string& name, double flops, simgrid::s4u::Host* host) : Operation(name, flops), host_(host) {} +/** @ingroup plugin_operation + * @brief Smart Constructor. + */ ExecOpPtr ExecOp::create(const std::string& name, double flops, simgrid::s4u::Host* host) { auto op = ExecOpPtr(new ExecOp(name, flops, host)); return op; } +/** + * @brief Do one execution of the Operation. + * @note Call the on_start() func. Set its working status as true. + * Create and start the underlying Activity. + */ void ExecOp::execute() { - iteration_count_++; - working_ = true; - consume(); + start_func_(this); + simgrid::kernel::actor::simcall_answered([this] { + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); + }); simgrid::s4u::ExecPtr exec = simgrid::s4u::Exec::init(); exec->set_name(name_); exec->set_flops_amount(amount_); @@ -107,14 +245,33 @@ void ExecOp::execute() exec->start(); exec->extension_set(new ExtendedAttributeActivity()); exec->extension()->operation_ = this; + simgrid::kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; + }); +} + +/** @ingroup plugin_operation + * @param host The host to set. + * @brief Set a new host. + */ +void ExecOp::set_host(simgrid::s4u::Host* host) +{ + simgrid::kernel::actor::simcall_answered([this, host] { + host_ = host; + }); } +/** + * @brief Default constructor. + */ CommOp::CommOp(const std::string& name, double bytes, simgrid::s4u::Host* source, simgrid::s4u::Host* destination) : Operation(name, bytes), source_(source), destination_(destination) { } +/** @ingroup plugin_operation + * @brief Smart constructor. + */ CommOpPtr CommOp::create(const std::string& name, double bytes, simgrid::s4u::Host* source, simgrid::s4u::Host* destination) { @@ -122,19 +279,51 @@ CommOpPtr CommOp::create(const std::string& name, double bytes, simgrid::s4u::Ho return op; } +/** + * @brief Do one execution of the Operation. + * @note Call the on_start() func. Set its working status as true. + * Create and start the underlying Activity. + */ void CommOp::execute() { - iteration_count_++; - working_ = true; - consume(); + start_func_(this); + simgrid::kernel::actor::simcall_answered([this] { + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); + }); simgrid::s4u::CommPtr comm = simgrid::s4u::Comm::sendto_init(source_, destination_); comm->set_name(name_); comm->set_payload_size(amount_); comm->start(); comm->extension_set(new ExtendedAttributeActivity()); comm->extension()->operation_ = this; + simgrid::kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; + }); +} + +/** @ingroup plugin_operation + * @param source The host to set. + * @brief Set a new source host. + */ +void CommOp::set_source(simgrid::s4u::Host* source) +{ + simgrid::kernel::actor::simcall_answered([this, source] { + source_ = source; + }); } + +/** @ingroup plugin_operation + * @param destination The host to set. + * @brief Set a new destination host. + */ +void CommOp::set_destination(simgrid::s4u::Host* destination) +{ + simgrid::kernel::actor::simcall_answered([this, destination] { + destination_ = destination; + }); +} + } // namespace simgrid::plugins simgrid::xbt::Extension -- 2.20.1