From 6ae42d54959fc08bb83d5db2cd1aca93dfebc66f Mon Sep 17 00:00:00 2001 From: Adrien Gougeon Date: Tue, 10 Oct 2023 16:26:44 +0200 Subject: [PATCH] update doc --- docs/source/app_s4u.rst | 25 +++++-- include/simgrid/s4u/Task.hpp | 2 + src/s4u/s4u_Task.cpp | 137 +++++++++++++++++------------------ 3 files changed, 86 insertions(+), 78 deletions(-) diff --git a/docs/source/app_s4u.rst b/docs/source/app_s4u.rst index 4a0f3fe0a5..a44f8a199e 100644 --- a/docs/source/app_s4u.rst +++ b/docs/source/app_s4u.rst @@ -203,24 +203,33 @@ Repeatable Activities In order to simulate the execution of Dataflow applications, we introduced the concept of |API_s4u_Tasks|, that can be seen as repeatable activities. A Dataflow -is defined as a graph of |API_s4u_Tasks| through which circulate Tokens. Tokens -can carry any user-defined data, using the same internal mechanisms as for the -other simulated objects. Each Task has to receive a token from each of its -predecessor to fire a new instance of a |API_s4u_Comm|, |API_s4u_Exec|, or -|API_s4u_Io| activity. On completion of this activity, the Task propagates tokens -to its successors, and waits for the next set of tokens to arrive. +is defined as a graph of |API_s4u_Tasks|, where each |API_s4u_Tasks| has a set of +successors and predecessors. When a |API_s4u_Tasks| ends it sends a token to each +of its successors. Each |API_s4u_Tasks| has to receive a token from each of its +predecessor to start. Tokens can carry any user-defined data. + +|API_s4u_Tasks| are composed of several instances: a dispatcher, a collector, and +instance_0 to instance_n. The dispatcher rely on a load balancing function to select +the next instance to fire. Once this instance finishes it fires the collector. + +Each instance of an |API_s4u_ExecTask| can be placed on a different host. +|API_s4u_Comm| activities are automatically created when an instance triggers +another instance on a different host. Each instance has its own parallelism degree +to scale horizontally on several cores. To initiate the execution of a Dataflow, it is possible to some make |API_s4u_Tasks| fire one or more activities without waiting for any token with the :cpp:func:`s4u::Task::enqueue_firings() ` function. -The parameters and successors of a Task can be redefined at runtime by attaching +The parameters of Tasks can be redefined at runtime by attaching callbacks to the :cpp:func:`s4u::Task::on_this_start ` and :cpp:func:`s4u::Task::on_this_completion ` -signals. +signals. The former is triggered by instances others than the dispatcher and the collector, +and the latter is triggered by the collector. + .. _s4u_mailbox: diff --git a/include/simgrid/s4u/Task.hpp b/include/simgrid/s4u/Task.hpp index 1fe18138bd..03ee2653ba 100644 --- a/include/simgrid/s4u/Task.hpp +++ b/include/simgrid/s4u/Task.hpp @@ -75,6 +75,8 @@ public: const char* get_cname() const { return name_.c_str(); } void set_amount(double amount, std::string instance = "instance_0"); double get_amount(std::string instance = "instance_0") const { return amount_.at(instance); } + int get_queued_firings(std::string instance = "instance_0") { return queued_firings_.at(instance); } + int get_running_count(std::string instance = "instance_0") { return running_instances_.at(instance); } int get_count(std::string instance = "collector") const { return count_.at(instance); } void set_parallelism_degree(int n, std::string instance = "all"); int get_parallelism_degree(std::string instance = "instance_0") const { return parallelism_degree_.at(instance); } diff --git a/src/s4u/s4u_Task.cpp b/src/s4u/s4u_Task.cpp index 80756d6cfa..bcc2f4eea4 100644 --- a/src/s4u/s4u_Task.cpp +++ b/src/s4u/s4u_Task.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -13,35 +14,21 @@ #include "src/simgrid/module.hpp" SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr) -/** - @beginrst - - -Tasks are designed to represent dataflows, i.e, graphs of Tasks. -Tasks can only be instancied using either -:cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init` -An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec `. -A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm `. - - @endrst - */ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin"); namespace simgrid::s4u { Task::Task(const std::string& name) : name_(name) {} -/** - * @brief Return True if the Task can start a new Activity. - * @note The Task is ready if not already doing something and there is at least one execution waiting in queue. +/** @param instance The Task instance to check. + * @brief Return True if this Task instance can start. */ bool Task::ready_to_run(std::string instance) { return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0; } -/** - * @param source The sender. +/** @param source The sender. * @brief Receive a token from another Task. * @note Check upon reception if the Task has received a token from each of its predecessors, * and in this case consumes those tokens and enqueue an execution. @@ -50,7 +37,8 @@ void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); predecessors_[source]++; - tokens_received_[source].push_back(source->token_); + if (source->token_ != nullptr) + tokens_received_[source].push_back(source->token_); bool enough_tokens = true; for (auto const& [key, val] : predecessors_) if (val < 1) { @@ -64,22 +52,18 @@ void Task::receive(Task* source) } } -/** - * @brief Task routine when finishing an execution. - * @note Set its working status as false. - * Add 1 to its count of finished executions. - * Call the on_this_end func. - * Fire on_end callback. - * Send a token to each of its successors. - * Start a new execution if possible. +/** @param instance The Taks instance to complete. + * @brief Task instance routine when finishing an execution of an instance. + * @note The dispatcher instance enqueues a firing for the next instance. + * The collector instance triggers the on_completion signals and sends tokens to successors. + * Others instances enqueue a firing of the collector instance. */ void Task::complete(std::string instance) { xbt_assert(Actor::is_maestro()); - running_instances_[instance] = running_instances_[instance] - 1; - count_[instance] = count_[instance] + 1; + running_instances_[instance]--; + count_[instance]++; if (instance == "collector") { - // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str()); on_this_completion(this); on_completion(this); for (auto const& t : successors_) @@ -92,7 +76,7 @@ void Task::complete(std::string instance) while (ready_to_run(next_instance)) fire(next_instance); } else { - queued_firings_["collector"] = queued_firings_["collector"] + 1; + queued_firings_["collector"]++; while (ready_to_run("collector")) fire("collector"); } @@ -100,10 +84,11 @@ void Task::complete(std::string instance) fire(instance); } -/** @param n The new parallelism degree of the Task. - * @brief Set the parallelism degree of the Task to inscrease or decrease horizontal scaling. - * @note When increasing the degree the function starts new instances if there is queued firings. - * When decreasing the degree the function does NOT stop running instances. +/** @param n The new parallelism degree of the Task instance. + * @param instance The Task instance to modify. + * @note You can use instance "all" to modify the parallelism degree of all instances of this Task. + * When increasing the degree new executions are started if there is queued firings. + * When decreasing the degree instances already running are NOT stopped. */ void Task::set_parallelism_degree(int n, std::string instance) { @@ -123,19 +108,25 @@ void Task::set_parallelism_degree(int n, std::string instance) }); } +/** @param bytes The internal bytes of the Task instance. + * @param instance The Task instance to modify. + * @note Internal bytes are used for Comms between the dispatcher and instance_n, + * and between instance_n and the collector if they are not on the same host. + */ void Task::set_internal_bytes(int bytes, std::string instance) { simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; }); } +/** @param func The load balancing function. + * @note The dispatcher uses this function to determine which instance to trigger next. + */ void Task::set_load_balancing_function(std::function func) { simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; }); } /** @param n The number of firings to enqueue. - * @brief Enqueue firing. - * @note Immediatly fire an activity if possible. */ void Task::enqueue_firings(int n) { @@ -155,7 +146,7 @@ void Task::set_name(std::string name) } /** @param amount The amount to set. - * @brief Set the amout of work to do. + * @param instance The Task instance to modify. * @note Amount in flop for ExecTask and in bytes for CommTask. */ void Task::set_amount(double amount, std::string instance) @@ -165,16 +156,19 @@ void Task::set_amount(double amount, std::string instance) /** @param token The token to set. * @brief Set the token to send to successors. - * @note The token is passed to each successor after the task end, i.e., after the on_completion callback. + * @note The token is passed to each successor after the Task instance collector end, i.e., after the on_completion + * callback. */ void Task::set_token(std::shared_ptr token) { simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; }); } +/** @param t The Task to deque a token from. + */ void Task::deque_token_from(TaskPtr t) { - simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_.at(t).pop_front(); }); + simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_[t].pop_front(); }); } void Task::fire(std::string instance) @@ -190,8 +184,7 @@ void Task::fire(std::string instance) queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0); } -/** @param successor The Task to add. - * @brief Add a successor to this Task. +/** @param successor The Task to add as a successor. * @note It also adds this as a predecessor of successor. */ void Task::add_successor(TaskPtr successor) @@ -202,8 +195,7 @@ void Task::add_successor(TaskPtr successor) }); } -/** @param successor The Task to remove. - * @brief Remove a successor from this Task. +/** @param successor The Task to remove from the successors of this Task. * @note It also remove this from the predecessors of successor. */ void Task::remove_successor(TaskPtr successor) @@ -214,8 +206,7 @@ void Task::remove_successor(TaskPtr successor) }); } -/** - * @brief TODO +/** @brief Remove all successors from this Task. */ void Task::remove_all_successors() { @@ -228,8 +219,9 @@ void Task::remove_all_successors() }); } -/** - * @brief TODO +/** @param n The number of instances to add to this Task (>=0). + * @note Instances goes always from instance_0 to instance_x, + * where x is the current number of instance. */ void Task::add_instances(int n) { @@ -247,8 +239,10 @@ void Task::add_instances(int n) } } -/** - * @brief TODO +/** @param n The number of instances to remove from this Task (>=0). + * @note Instances goes always from instance_0 to instance_x, + * where x is the current number of instance. + * Running instances cannot be removed. */ void Task::remove_instances(int n) { @@ -256,10 +250,9 @@ void Task::remove_instances(int n) xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n); xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)", instance_count, n); - for (int i = instance_count - 1; i >= instance_count - n; i--) + for (int i = instance_count - 1; i >= instance_count - n; i--) { xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0, "Cannot remove a running instance (instances: %d)", i); - for (int i = instance_count - 1; i >= instance_count - n; i--) { amount_.erase("instance_" + std::to_string(i)); queued_firings_.erase("instance_" + std::to_string(i)); running_instances_.erase("instance_" + std::to_string(i)); @@ -293,10 +286,10 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host) return init(name)->set_flops(flops)->set_host(host); } -/** - * @brief Do one execution of the Task. - * @note Call the on_this_start() func. - * Init and start the underlying Activity. +/** @param instance The Task instance to fire. + * @note Only the dispatcher instance triggers the on_start signal. + * Comms are created if hosts differ between dispatcher and the instance to fire, + * or between the instance and the collector. */ void ExecTask::fire(std::string instance) { @@ -337,8 +330,8 @@ void ExecTask::fire(std::string instance) } } -/** - * @param host The host to set. +/** @param host The host to set. + * @param instance The Task instance to modify. * @brief Set a new host. */ ExecTaskPtr ExecTask::set_host(Host* host, std::string instance) @@ -353,8 +346,8 @@ ExecTaskPtr ExecTask::set_host(Host* host, std::string instance) return this; } -/** - * @param flops The amount of flops to set. +/** @param flops The amount of flops to set. + * @param instance The Task instance to modify. */ ExecTaskPtr ExecTask::set_flops(double flops, std::string instance) { @@ -362,8 +355,9 @@ ExecTaskPtr ExecTask::set_flops(double flops, std::string instance) return this; } -/** - * @brief TODO +/** @param n The number of instances to add to this Task (>=0). + @note Instances goes always from instance_0 to instance_x, + where x is the current number of instance. */ void ExecTask::add_instances(int n) { @@ -373,8 +367,10 @@ void ExecTask::add_instances(int n) host_["instance_" + std::to_string(i)] = host_.at("instance_0"); } -/** - * @brief TODO +/** @param n The number of instances to remove from this Task (>=0). + @note Instances goes always from instance_0 to instance_x, + where x is the current number of instance. + Running instance cannot be removed. */ void ExecTask::remove_instances(int n) { @@ -408,10 +404,8 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination); } -/** - * @brief Do one execution of the Task. - * @note Call the on_this_start() func. - * Init and start the underlying Activity. +/** @param instance The Task instance to fire. + * @note Only the dispatcher instance triggers the on_start signal. */ void CommTask::fire(std::string instance) { @@ -487,7 +481,6 @@ IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::Op /** * @param disk The disk to set. - * @brief Set a new disk. */ IoTaskPtr IoTask::set_disk(Disk* disk) { @@ -504,13 +497,18 @@ IoTaskPtr IoTask::set_bytes(double bytes) return this; } -/** */ +/** + * @param type The op type to set. + */ IoTaskPtr IoTask::set_op_type(Io::OpType type) { kernel::actor::simcall_answered([this, type] { type_ = type; }); return this; } +/** @param instance The Task instance to fire. + * @note Only the dispatcher instance triggers the on_start signal. + */ void IoTask::fire(std::string instance) { Task::fire(instance); @@ -529,5 +527,4 @@ void IoTask::fire(std::string instance) store_activity(io, instance); } } - } // namespace simgrid::s4u -- 2.20.1