2 #include <simgrid/Exception.hpp>
3 #include <simgrid/plugins/task.hpp>
4 #include <simgrid/s4u/Comm.hpp>
5 #include <simgrid/s4u/Exec.hpp>
6 #include <simgrid/s4u/Io.hpp>
7 #include <simgrid/simix.hpp>
9 #include "src/simgrid/module.hpp"
11 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
12 /** @defgroup plugin_task plugin_task Plugin Task
16 This is the task plugin, enabling management of Tasks.
17 To activate this plugin, first call :cpp:func:`Task::init`.
19 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
20 Tasks can only be instancied using either
21 :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
22 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
23 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
27 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
29 namespace simgrid::plugins {
31 xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
33 xbt::signal<void(Task*)> Task::on_start;
34 xbt::signal<void(Task*)> Task::on_end;
36 Task::Task(const std::string& name) : name_(name) {}
39 * @brief Return True if the Task can start a new Activity.
40 * @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
42 bool Task::ready_to_run() const
44 return not working_ && queued_execs_ > 0;
48 * @param source The sender.
49 * @brief Receive a token from another Task.
50 * @note Check upon reception if the Task has received a token from each of its predecessors,
51 * and in this case consumes those tokens and enqueue an execution.
53 void Task::receive(Task* source)
55 XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
56 auto source_count = predecessors_[source]++;
57 if (tokens_received_.size() <= queued_execs_ + source_count)
58 tokens_received_.push_back({});
59 tokens_received_[queued_execs_ + source_count][source] = source->token_;
60 bool enough_tokens = true;
61 for (auto const& [key, val] : predecessors_)
63 enough_tokens = false;
67 for (auto& [key, val] : predecessors_)
74 * @brief Task routine when finishing an execution.
75 * @note Set its working status as false.
76 * Add 1 to its count of finished executions.
77 * Call the on_this_end func.
78 * Fire on_end callback.
79 * Send a token to each of its successors.
80 * Start a new execution if possible.
84 xbt_assert(s4u::Actor::is_maestro());
89 if (current_activity_)
90 previous_activity_ = std::move(current_activity_);
91 for (auto const& t : successors_)
97 /** @ingroup plugin_task
98 * @brief Init the Task plugin.
99 * @note Add a completion callback to all Activities to call Task::complete().
103 static bool inited = false;
108 ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
109 simgrid::s4u::Exec::on_completion_cb(
110 [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
111 simgrid::s4u::Comm::on_completion_cb(
112 [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
113 simgrid::s4u::Io::on_completion_cb(
114 [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
117 /** @ingroup plugin_task
118 * @param n The number of executions to enqueue.
119 * @brief Enqueue executions.
120 * @note Immediatly starts an execution if possible.
122 void Task::enqueue_execs(int n)
124 simgrid::kernel::actor::simcall_answered([this, n] {
131 /** @ingroup plugin_task
132 * @param amount The amount to set.
133 * @brief Set the amout of work to do.
134 * @note Amount in flop for ExecTask and in bytes for CommTask.
136 void Task::set_amount(double amount)
138 simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
141 /** @ingroup plugin_task
142 * @param token The token to set.
143 * @brief Set the token to send to successors.
144 * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
146 void Task::set_token(std::shared_ptr<Token> token)
148 simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
151 /** @ingroup plugin_task
152 * @return Map of tokens received for the next execution.
153 * @note If there is no queued execution for this task the map might not exist or be partially empty.
155 std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
157 return tokens_received_.front()[t];
160 /** @ingroup plugin_task
161 * @param successor The Task to add.
162 * @brief Add a successor to this Task.
163 * @note It also adds this as a predecessor of successor.
165 void Task::add_successor(TaskPtr successor)
167 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
168 successors_.insert(successor_p);
169 successor_p->predecessors_.try_emplace(this, 0);
173 /** @ingroup plugin_task
174 * @param successor The Task to remove.
175 * @brief Remove a successor from this Task.
176 * @note It also remove this from the predecessors of successor.
178 void Task::remove_successor(TaskPtr successor)
180 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
181 successor_p->predecessors_.erase(this);
182 successors_.erase(successor_p);
186 void Task::remove_all_successors()
188 simgrid::kernel::actor::simcall_answered([this] {
189 while (not successors_.empty()) {
190 auto* successor = *(successors_.begin());
191 successor->predecessors_.erase(this);
192 successors_.erase(successor);
197 /** @ingroup plugin_task
198 * @param func The function to set.
199 * @brief Set a function to be called before each execution.
200 * @note The function is called before the underlying Activity starts.
202 void Task::on_this_start_cb(const std::function<void(Task*)>& func)
204 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
207 /** @ingroup plugin_task
208 * @param func The function to set.
209 * @brief Set a function to be called after each execution.
210 * @note The function is called after the underlying Activity ends, but before sending tokens to successors.
212 void Task::on_this_end_cb(const std::function<void(Task*)>& func)
214 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
217 /** @ingroup plugin_task
218 * @brief Return the number of completed executions.
220 int Task::get_count() const
226 * @brief Default constructor.
228 ExecTask::ExecTask(const std::string& name) : Task(name) {}
230 /** @ingroup plugin_task
231 * @brief Smart Constructor.
233 ExecTaskPtr ExecTask::init(const std::string& name)
235 return ExecTaskPtr(new ExecTask(name));
238 /** @ingroup plugin_task
239 * @brief Smart Constructor.
241 ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
243 return init(name)->set_flops(flops)->set_host(host);
247 * @brief Do one execution of the Task.
248 * @note Call the on_this_start() func. Set its working status as true.
249 * Init and start the underlying Activity.
251 void ExecTask::fire()
253 on_this_start_(this);
254 Task::on_start(this);
256 queued_execs_ = std::max(queued_execs_ - 1, 0);
257 if (tokens_received_.size() > 0)
258 tokens_received_.pop_front();
259 s4u::ExecPtr exec = s4u::Exec::init();
260 exec->set_name(name_);
261 exec->set_flops_amount(amount_);
262 exec->set_host(host_);
264 exec->extension_set(new ExtendedAttributeActivity());
265 exec->extension<ExtendedAttributeActivity>()->task_ = this;
266 current_activity_ = exec;
269 /** @ingroup plugin_task
270 * @param host The host to set.
271 * @brief Set a new host.
273 ExecTaskPtr ExecTask::set_host(s4u::Host* host)
275 kernel::actor::simcall_answered([this, host] { host_ = host; });
279 /** @ingroup plugin_task
280 * @param flops The amount of flops to set.
282 ExecTaskPtr ExecTask::set_flops(double flops)
284 kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
289 * @brief Default constructor.
291 CommTask::CommTask(const std::string& name) : Task(name) {}
293 /** @ingroup plugin_task
294 * @brief Smart constructor.
296 CommTaskPtr CommTask::init(const std::string& name)
298 return CommTaskPtr(new CommTask(name));
301 /** @ingroup plugin_task
302 * @brief Smart constructor.
304 CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
306 return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
310 * @brief Do one execution of the Task.
311 * @note Call the on_this_start() func. Set its working status as true.
312 * Init and start the underlying Activity.
314 void CommTask::fire()
316 on_this_start_(this);
317 Task::on_start(this);
319 queued_execs_ = std::max(queued_execs_ - 1, 0);
320 if (tokens_received_.size() > 0)
321 tokens_received_.pop_front();
322 s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
323 comm->set_name(name_);
324 comm->set_payload_size(amount_);
326 comm->extension_set(new ExtendedAttributeActivity());
327 comm->extension<ExtendedAttributeActivity>()->task_ = this;
328 current_activity_ = comm;
331 /** @ingroup plugin_task
332 * @param source The host to set.
333 * @brief Set a new source host.
335 CommTaskPtr CommTask::set_source(s4u::Host* source)
337 kernel::actor::simcall_answered([this, source] { source_ = source; });
341 /** @ingroup plugin_task
342 * @param destination The host to set.
343 * @brief Set a new destination host.
345 CommTaskPtr CommTask::set_destination(s4u::Host* destination)
347 kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
351 /** @ingroup plugin_task
352 * @param bytes The amount of bytes to set.
354 CommTaskPtr CommTask::set_bytes(double bytes)
356 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
361 * @brief Default constructor.
363 IoTask::IoTask(const std::string& name) : Task(name) {}
365 /** @ingroup plugin_task
366 * @brief Smart Constructor.
368 IoTaskPtr IoTask::init(const std::string& name)
370 return IoTaskPtr(new IoTask(name));
373 /** @ingroup plugin_task
374 * @brief Smart Constructor.
376 IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
378 return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
381 /** @ingroup plugin_task
382 * @param disk The disk to set.
383 * @brief Set a new disk.
385 IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
387 kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
391 /** @ingroup plugin_task
392 * @param bytes The amount of bytes to set.
394 IoTaskPtr IoTask::set_bytes(double bytes)
396 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
400 /** @ingroup plugin_task */
401 IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
403 kernel::actor::simcall_answered([this, type] { type_ = type; });
409 on_this_start_(this);
410 Task::on_start(this);
412 queued_execs_ = std::max(queued_execs_ - 1, 0);
413 if (tokens_received_.size() > 0)
414 tokens_received_.pop_front();
415 s4u::IoPtr io = s4u::Io::init();
417 io->set_size(amount_);
419 io->set_op_type(type_);
421 io->extension_set(new ExtendedAttributeActivity());
422 io->extension<ExtendedAttributeActivity>()->task_ = this;
423 current_activity_ = io;
426 } // namespace simgrid::plugins