2 #include <simgrid/Exception.hpp>
3 #include <simgrid/s4u/Task.hpp>
4 #include <simgrid/s4u/Comm.hpp>
5 #include <simgrid/s4u/Exec.hpp>
6 #include <simgrid/s4u/Io.hpp>
7 #include <simgrid/simix.hpp>
9 #include "src/simgrid/module.hpp"
11 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
16 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
17 Tasks can only be instancied using either
18 :cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
19 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
20 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
26 namespace simgrid::s4u {
28 xbt::signal<void(Task*)> Task::on_start;
29 xbt::signal<void(Task*)> Task::on_end;
31 Task::Task(const std::string& name) : name_(name) {}
34 * @brief Return True if the Task can start a new Activity.
35 * @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
37 bool Task::ready_to_run() const
39 return not working_ && queued_execs_ > 0;
43 * @param source The sender.
44 * @brief Receive a token from another Task.
45 * @note Check upon reception if the Task has received a token from each of its predecessors,
46 * and in this case consumes those tokens and enqueue an execution.
48 void Task::receive(Task* source)
50 XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
51 auto source_count = predecessors_[source]++;
52 if (tokens_received_.size() <= queued_execs_ + source_count)
53 tokens_received_.push_back({});
54 tokens_received_[queued_execs_ + source_count][source] = source->token_;
55 bool enough_tokens = true;
56 for (auto const& [key, val] : predecessors_)
58 enough_tokens = false;
62 for (auto& [key, val] : predecessors_)
69 * @brief Task routine when finishing an execution.
70 * @note Set its working status as false.
71 * Add 1 to its count of finished executions.
72 * Call the on_this_end func.
73 * Fire on_end callback.
74 * Send a token to each of its successors.
75 * Start a new execution if possible.
79 xbt_assert(s4u::Actor::is_maestro());
84 if (current_activity_)
85 previous_activity_ = std::move(current_activity_);
86 for (auto const& t : successors_)
92 /** @param n The number of executions to enqueue.
93 * @brief Enqueue executions.
94 * @note Immediatly starts an execution if possible.
96 void Task::enqueue_execs(int n)
98 simgrid::kernel::actor::simcall_answered([this, n] {
105 /** @param amount The amount to set.
106 * @brief Set the amout of work to do.
107 * @note Amount in flop for ExecTask and in bytes for CommTask.
109 void Task::set_amount(double amount)
111 simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
114 /** @param token The token to set.
115 * @brief Set the token to send to successors.
116 * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
118 void Task::set_token(std::shared_ptr<Token> token)
120 simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
123 /** @return Map of tokens received for the next execution.
124 * @note If there is no queued execution for this task the map might not exist or be partially empty.
126 std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
128 return tokens_received_.front()[t];
131 /** @param successor The Task to add.
132 * @brief Add a successor to this Task.
133 * @note It also adds this as a predecessor of successor.
135 void Task::add_successor(TaskPtr successor)
137 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
138 successors_.insert(successor_p);
139 successor_p->predecessors_.try_emplace(this, 0);
143 /** @param successor The Task to remove.
144 * @brief Remove a successor from this Task.
145 * @note It also remove this from the predecessors of successor.
147 void Task::remove_successor(TaskPtr successor)
149 simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
150 successor_p->predecessors_.erase(this);
151 successors_.erase(successor_p);
155 void Task::remove_all_successors()
157 simgrid::kernel::actor::simcall_answered([this] {
158 while (not successors_.empty()) {
159 auto* successor = *(successors_.begin());
160 successor->predecessors_.erase(this);
161 successors_.erase(successor);
166 /** @ingroup plugin_task
167 * @param func The function to set.
168 * @brief Set a function to be called before each execution.
169 * @note The function is called before the underlying Activity starts.
171 void Task::on_this_start_cb(const std::function<void(Task*)>& func)
173 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
176 /** @ingroup plugin_task
177 * @param func The function to set.
178 * @brief Set a function to be called after each execution.
179 * @note The function is called after the underlying Activity ends, but before sending tokens to successors.
181 void Task::on_this_end_cb(const std::function<void(Task*)>& func)
183 simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
186 /** @ingroup plugin_task
187 * @brief Return the number of completed executions.
189 int Task::get_count() const
195 * @brief Default constructor.
197 ExecTask::ExecTask(const std::string& name) : Task(name) {}
199 /** @ingroup plugin_task
200 * @brief Smart Constructor.
202 ExecTaskPtr ExecTask::init(const std::string& name)
204 return ExecTaskPtr(new ExecTask(name));
207 /** @ingroup plugin_task
208 * @brief Smart Constructor.
210 ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
212 return init(name)->set_flops(flops)->set_host(host);
216 * @brief Do one execution of the Task.
217 * @note Call the on_this_start() func. Set its working status as true.
218 * Init and start the underlying Activity.
220 void ExecTask::fire()
222 on_this_start_(this);
223 Task::on_start(this);
225 queued_execs_ = std::max(queued_execs_ - 1, 0);
226 if (tokens_received_.size() > 0)
227 tokens_received_.pop_front();
228 s4u::ExecPtr exec = s4u::Exec::init();
229 exec->set_name(name_);
230 exec->set_flops_amount(amount_);
231 exec->set_host(host_);
233 exec->on_this_completion_cb([this](Exec const& exec) { this->complete(); });
234 current_activity_ = exec;
237 /** @ingroup plugin_task
238 * @param host The host to set.
239 * @brief Set a new host.
241 ExecTaskPtr ExecTask::set_host(s4u::Host* host)
243 kernel::actor::simcall_answered([this, host] { host_ = host; });
247 /** @ingroup plugin_task
248 * @param flops The amount of flops to set.
250 ExecTaskPtr ExecTask::set_flops(double flops)
252 kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
257 * @brief Default constructor.
259 CommTask::CommTask(const std::string& name) : Task(name) {}
261 /** @ingroup plugin_task
262 * @brief Smart constructor.
264 CommTaskPtr CommTask::init(const std::string& name)
266 return CommTaskPtr(new CommTask(name));
269 /** @ingroup plugin_task
270 * @brief Smart constructor.
272 CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
274 return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
278 * @brief Do one execution of the Task.
279 * @note Call the on_this_start() func. Set its working status as true.
280 * Init and start the underlying Activity.
282 void CommTask::fire()
284 on_this_start_(this);
285 Task::on_start(this);
287 queued_execs_ = std::max(queued_execs_ - 1, 0);
288 if (tokens_received_.size() > 0)
289 tokens_received_.pop_front();
290 s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
291 comm->set_name(name_);
292 comm->set_payload_size(amount_);
294 comm->on_this_completion_cb([this](Comm const& comm) { this->complete(); });
295 current_activity_ = comm;
298 /** @ingroup plugin_task
299 * @param source The host to set.
300 * @brief Set a new source host.
302 CommTaskPtr CommTask::set_source(s4u::Host* source)
304 kernel::actor::simcall_answered([this, source] { source_ = source; });
308 /** @ingroup plugin_task
309 * @param destination The host to set.
310 * @brief Set a new destination host.
312 CommTaskPtr CommTask::set_destination(s4u::Host* destination)
314 kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
318 /** @ingroup plugin_task
319 * @param bytes The amount of bytes to set.
321 CommTaskPtr CommTask::set_bytes(double bytes)
323 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
328 * @brief Default constructor.
330 IoTask::IoTask(const std::string& name) : Task(name) {}
332 /** @ingroup plugin_task
333 * @brief Smart Constructor.
335 IoTaskPtr IoTask::init(const std::string& name)
337 return IoTaskPtr(new IoTask(name));
340 /** @ingroup plugin_task
341 * @brief Smart Constructor.
343 IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
345 return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
348 /** @ingroup plugin_task
349 * @param disk The disk to set.
350 * @brief Set a new disk.
352 IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
354 kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
358 /** @ingroup plugin_task
359 * @param bytes The amount of bytes to set.
361 IoTaskPtr IoTask::set_bytes(double bytes)
363 kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
367 /** @ingroup plugin_task */
368 IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
370 kernel::actor::simcall_answered([this, type] { type_ = type; });
376 on_this_start_(this);
377 Task::on_start(this);
379 queued_execs_ = std::max(queued_execs_ - 1, 0);
380 if (tokens_received_.size() > 0)
381 tokens_received_.pop_front();
382 s4u::IoPtr io = s4u::Io::init();
384 io->set_size(amount_);
386 io->set_op_type(type_);
388 io->on_this_completion_cb([this](Io const& io) { this->complete(); });
389 current_activity_ = io;
392 } // namespace simgrid::s4u