1 #include <simgrid/Exception.hpp>
2 #include <simgrid/plugins/task.hpp>
3 #include <simgrid/s4u/Comm.hpp>
4 #include <simgrid/s4u/Exec.hpp>
5 #include <simgrid/s4u/Io.hpp>
6 #include <simgrid/simix.hpp>
8 #include "src/simgrid/module.hpp"
10 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
11 /** @defgroup plugin_task plugin_task Plugin Task
15 This is the task plugin, enabling management of Tasks.
16 To activate this plugin, first call :cpp:func:`Task::init`.
18 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
19 Tasks can only be instancied using either
20 :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
21 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
22 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
28 namespace simgrid::plugins {
30 xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
32 xbt::signal<void(Task*)> Task::on_start;
33 xbt::signal<void(Task*)> Task::on_end;
35 Task::Task(const std::string& name) : name_(name) {}
38 * @brief Return True if the Task can start a new Activity.
39 * @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
41 bool Task::ready_to_run() const
43 return not working_ && queued_execs_ > 0;
47 * @param source The sender.
48 * @brief Receive a token from another Task.
49 * @note Check upon reception if the Task has received a token from each of its predecessors,
50 * and in this case consumes those tokens and enqueue an execution.
52 void Task::receive(Task* source)
54 XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
55 predecessors_[source]++;
56 bool enough_tokens = true;
57 for (auto const& [key, val] : predecessors_)
59 enough_tokens = false;
63 for (auto& [key, val] : predecessors_)
70 * @brief Task routine when finishing an execution.
71 * @note Set its working status as false.
72 * Add 1 to its count of finished executions.
73 * Call the on_this_end func.
74 * Fire on_end callback.
75 * Send a token to each of its successors.
76 * Start a new execution if possible.
80 xbt_assert(s4u::Actor::is_maestro());
85 if (current_activity_)
86 previous_activity_ = std::move(current_activity_);
87 for (auto const& t : successors_)
93 /** @ingroup plugin_task
94 * @brief Init the Task plugin.
95 * @note Add a completion callback to all Activities to call Task::complete().
99 static bool inited = false;
104 ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
105 simgrid::s4u::Exec::on_completion_cb(
106 [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
107 simgrid::s4u::Comm::on_completion_cb(
108 [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
109 simgrid::s4u::Io::on_completion_cb(
110 [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
113 /** @ingroup plugin_task
114 * @param n The number of executions to enqueue.
115 * @brief Enqueue executions.
116 * @note Immediatly starts an execution if possible.
118 void Task::enqueue_execs(int n)
120 simgrid::kernel::actor::simcall_answered([this, n] {
127 /** @ingroup plugin_task
128 * @param amount The amount to set.
129 * @brief Set the amout of work to do.
130 * @note Amount in flop for ExecTask and in bytes for CommTask.
132 void Task::set_amount(double amount)
134 simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
137 /** @ingroup plugin_task
138 * @param successor The Task to add.
139 * @brief Add a successor to this Task.
140 * @note It also adds this as a predecessor of successor.
142 void Task::add_successor(TaskPtr successor)
144 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
145 successors_.insert(successor_p);
146 successor_p->predecessors_.try_emplace(this, 0);
150 /** @ingroup plugin_task
151 * @param successor The Task to remove.
152 * @brief Remove a successor from this Task.
153 * @note It also remove this from the predecessors of successor.
155 void Task::remove_successor(TaskPtr successor)
157 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
158 successor_p->predecessors_.erase(this);
159 successors_.erase(successor_p);
163 void Task::remove_all_successors()
165 simgrid::kernel::actor::simcall_answered([this] {
166 while (not successors_.empty()) {
167 auto* successor = *(successors_.begin());
168 successor->predecessors_.erase(this);
169 successors_.erase(successor);
174 /** @ingroup plugin_task
175 * @param func The function to set.
176 * @brief Set a function to be called before each execution.
177 * @note The function is called before the underlying Activity starts.
179 void Task::on_this_start_cb(const std::function<void(Task*)>& func)
181 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
184 /** @ingroup plugin_task
185 * @param func The function to set.
186 * @brief Set a function to be called after each execution.
187 * @note The function is called after the underlying Activity ends, but before sending tokens to successors.
189 void Task::on_this_end_cb(const std::function<void(Task*)>& func)
191 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
194 /** @ingroup plugin_task
195 * @brief Return the number of completed executions.
197 int Task::get_count() const
203 * @brief Default constructor.
205 ExecTask::ExecTask(const std::string& name) : Task(name) {}
207 /** @ingroup plugin_task
208 * @brief Smart Constructor.
210 ExecTaskPtr ExecTask::init(const std::string& name)
212 return ExecTaskPtr(new ExecTask(name));
215 /** @ingroup plugin_task
216 * @brief Smart Constructor.
218 ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
220 return init(name)->set_flops(flops)->set_host(host);
224 * @brief Do one execution of the Task.
225 * @note Call the on_this_start() func. Set its working status as true.
226 * Init and start the underlying Activity.
228 void ExecTask::fire()
230 on_this_start_(this);
231 Task::on_start(this);
233 queued_execs_ = std::max(queued_execs_ - 1, 0);
234 s4u::ExecPtr exec = s4u::Exec::init();
235 exec->set_name(name_);
236 exec->set_flops_amount(amount_);
237 exec->set_host(host_);
239 exec->extension_set(new ExtendedAttributeActivity());
240 exec->extension<ExtendedAttributeActivity>()->task_ = this;
241 current_activity_ = exec;
244 /** @ingroup plugin_task
245 * @param host The host to set.
246 * @brief Set a new host.
248 ExecTaskPtr ExecTask::set_host(s4u::Host* host)
250 kernel::actor::simcall_answered([this, host] { host_ = host; });
254 /** @ingroup plugin_task
255 * @param flops The amount of flops to set.
257 ExecTaskPtr ExecTask::set_flops(double flops)
259 kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
264 * @brief Default constructor.
266 CommTask::CommTask(const std::string& name) : Task(name) {}
268 /** @ingroup plugin_task
269 * @brief Smart constructor.
271 CommTaskPtr CommTask::init(const std::string& name)
273 return CommTaskPtr(new CommTask(name));
276 /** @ingroup plugin_task
277 * @brief Smart constructor.
279 CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
281 return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
285 * @brief Do one execution of the Task.
286 * @note Call the on_this_start() func. Set its working status as true.
287 * Init and start the underlying Activity.
289 void CommTask::fire()
291 on_this_start_(this);
292 Task::on_start(this);
294 queued_execs_ = std::max(queued_execs_ - 1, 0);
295 s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
296 comm->set_name(name_);
297 comm->set_payload_size(amount_);
299 comm->extension_set(new ExtendedAttributeActivity());
300 comm->extension<ExtendedAttributeActivity>()->task_ = this;
301 current_activity_ = comm;
304 /** @ingroup plugin_task
305 * @param source The host to set.
306 * @brief Set a new source host.
308 CommTaskPtr CommTask::set_source(s4u::Host* source)
310 kernel::actor::simcall_answered([this, source] { source_ = source; });
314 /** @ingroup plugin_task
315 * @param destination The host to set.
316 * @brief Set a new destination host.
318 CommTaskPtr CommTask::set_destination(s4u::Host* destination)
320 kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
324 /** @ingroup plugin_task
325 * @param bytes The amount of bytes to set.
327 CommTaskPtr CommTask::set_bytes(double bytes)
329 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
334 * @brief Default constructor.
336 IoTask::IoTask(const std::string& name) : Task(name) {}
338 /** @ingroup plugin_task
339 * @brief Smart Constructor.
341 IoTaskPtr IoTask::init(const std::string& name)
343 return IoTaskPtr(new IoTask(name));
346 /** @ingroup plugin_task
347 * @brief Smart Constructor.
349 IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
351 return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
354 /** @ingroup plugin_task
355 * @param disk The disk to set.
356 * @brief Set a new disk.
358 IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
360 kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
364 /** @ingroup plugin_task
365 * @param bytes The amount of bytes to set.
367 IoTaskPtr IoTask::set_bytes(double bytes)
369 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
373 /** @ingroup plugin_task */
374 IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
376 kernel::actor::simcall_answered([this, type] { type_ = type; });
382 on_this_start_(this);
383 Task::on_start(this);
385 queued_execs_ = std::max(queued_execs_ - 1, 0);
386 s4u::IoPtr io = s4u::Io::init();
388 io->set_size(amount_);
390 io->set_op_type(type_);
392 io->extension_set(new ExtendedAttributeActivity());
393 io->extension<ExtendedAttributeActivity>()->task_ = this;
394 current_activity_ = io;
397 } // namespace simgrid::plugins