Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
835b307144545b4d15712610e821df8717d20af0
[simgrid.git] / src / plugins / task.cpp
1 #include <memory>
2 #include <simgrid/Exception.hpp>
3 #include <simgrid/plugins/task.hpp>
4 #include <simgrid/s4u/Comm.hpp>
5 #include <simgrid/s4u/Exec.hpp>
6 #include <simgrid/s4u/Io.hpp>
7 #include <simgrid/simix.hpp>
8
9 #include "src/simgrid/module.hpp"
10
11 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
12 /** @defgroup plugin_task plugin_task Plugin Task
13
14   @beginrst
15
16 This is the task plugin, enabling management of Tasks.
17 To activate this plugin, first call :cpp:func:`Task::init`.
18
19 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
20 Tasks can only be instancied using either
21 :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
22 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
23 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
24
25   @endrst
26  */
27 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
28
29 namespace simgrid::plugins {
30
31 xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
32
33 xbt::signal<void(Task*)> Task::on_start;
34 xbt::signal<void(Task*)> Task::on_end;
35
36 Task::Task(const std::string& name) : name_(name) {}
37
38 /**
39  *  @brief Return True if the Task can start a new Activity.
40  *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
41  */
42 bool Task::ready_to_run() const
43 {
44   return not working_ && queued_execs_ > 0;
45 }
46
47 /**
48  *  @param source The sender.
49  *  @brief Receive a token from another Task.
50  *  @note Check upon reception if the Task has received a token from each of its predecessors,
51  * and in this case consumes those tokens and enqueue an execution.
52  */
53 void Task::receive(Task* source)
54 {
55   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
56   auto source_count = predecessors_[source]++;
57   if (tokens_received_.size() <= queued_execs_ + source_count)
58     tokens_received_.push_back({});
59   tokens_received_[queued_execs_ + source_count][source] = source->token_;
60   bool enough_tokens = true;
61   for (auto const& [key, val] : predecessors_)
62     if (val < 1) {
63       enough_tokens = false;
64       break;
65     }
66   if (enough_tokens) {
67     for (auto& [key, val] : predecessors_)
68       val--;
69     enqueue_execs(1);
70   }
71 }
72
73 /**
74  *  @brief Task routine when finishing an execution.
75  *  @note Set its working status as false.
76  * Add 1 to its count of finished executions.
77  * Call the on_this_end func.
78  * Fire on_end callback.
79  * Send a token to each of its successors.
80  * Start a new execution if possible.
81  */
82 void Task::complete()
83 {
84   xbt_assert(s4u::Actor::is_maestro());
85   working_ = false;
86   count_++;
87   on_this_end_(this);
88   Task::on_end(this);
89   if (current_activity_)
90     previous_activity_ = std::move(current_activity_);
91   for (auto const& t : successors_)
92     t->receive(this);
93   if (ready_to_run())
94     fire();
95 }
96
97 /** @ingroup plugin_task
98  *  @brief Init the Task plugin.
99  *  @note Add a completion callback to all Activities to call Task::complete().
100  */
101 void Task::init()
102 {
103   static bool inited = false;
104   if (inited)
105     return;
106
107   inited                                  = true;
108   ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
109   simgrid::s4u::Exec::on_completion_cb(
110       [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
111   simgrid::s4u::Comm::on_completion_cb(
112       [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
113   simgrid::s4u::Io::on_completion_cb(
114       [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
115 }
116
117 /** @ingroup plugin_task
118  *  @param n The number of executions to enqueue.
119  *  @brief Enqueue executions.
120  *  @note Immediatly starts an execution if possible.
121  */
122 void Task::enqueue_execs(int n)
123 {
124   simgrid::kernel::actor::simcall_answered([this, n] {
125     queued_execs_ += n;
126     if (ready_to_run())
127       fire();
128   });
129 }
130
131 /** @ingroup plugin_task
132  *  @param amount The amount to set.
133  *  @brief Set the amout of work to do.
134  *  @note Amount in flop for ExecTask and in bytes for CommTask.
135  */
136 void Task::set_amount(double amount)
137 {
138   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
139 }
140
141 /** @ingroup plugin_task
142  *  @param token The token to set.
143  *  @brief Set the token to send to successors.
144  *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
145  */
146 void Task::set_token(std::shared_ptr<Token> token)
147 {
148   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
149 }
150
151 /** @ingroup plugin_task
152  *  @return Map of tokens received for the next execution.
153  *  @note If there is no queued execution for this task the map might not exist or be partially empty.
154  */
155 std::shared_ptr<Token> Task::get_next_token_from(TaskPtr t)
156 {
157   return tokens_received_.front()[t];
158 }
159
160 /** @ingroup plugin_task
161  *  @param successor The Task to add.
162  *  @brief Add a successor to this Task.
163  *  @note It also adds this as a predecessor of successor.
164  */
165 void Task::add_successor(TaskPtr successor)
166 {
167   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
168     successors_.insert(successor_p);
169     successor_p->predecessors_.try_emplace(this, 0);
170   });
171 }
172
173 /** @ingroup plugin_task
174  *  @param successor The Task to remove.
175  *  @brief Remove a successor from this Task.
176  *  @note It also remove this from the predecessors of successor.
177  */
178 void Task::remove_successor(TaskPtr successor)
179 {
180   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
181     successor_p->predecessors_.erase(this);
182     successors_.erase(successor_p);
183   });
184 }
185
186 void Task::remove_all_successors()
187 {
188   simgrid::kernel::actor::simcall_answered([this] {
189     while (not successors_.empty()) {
190       auto* successor = *(successors_.begin());
191       successor->predecessors_.erase(this);
192       successors_.erase(successor);
193     }
194   });
195 }
196
197 /** @ingroup plugin_task
198  *  @param func The function to set.
199  *  @brief Set a function to be called before each execution.
200  *  @note The function is called before the underlying Activity starts.
201  */
202 void Task::on_this_start_cb(const std::function<void(Task*)>& func)
203 {
204   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
205 }
206
207 /** @ingroup plugin_task
208  *  @param func The function to set.
209  *  @brief Set a function to be called after each execution.
210  *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
211  */
212 void Task::on_this_end_cb(const std::function<void(Task*)>& func)
213 {
214   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
215 }
216
217 /** @ingroup plugin_task
218  *  @brief Return the number of completed executions.
219  */
220 int Task::get_count() const
221 {
222   return count_;
223 }
224
225 /**
226  *  @brief Default constructor.
227  */
228 ExecTask::ExecTask(const std::string& name) : Task(name) {}
229
230 /** @ingroup plugin_task
231  *  @brief Smart Constructor.
232  */
233 ExecTaskPtr ExecTask::init(const std::string& name)
234 {
235   return ExecTaskPtr(new ExecTask(name));
236 }
237
238 /** @ingroup plugin_task
239  *  @brief Smart Constructor.
240  */
241 ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
242 {
243   return init(name)->set_flops(flops)->set_host(host);
244 }
245
246 /**
247  *  @brief Do one execution of the Task.
248  *  @note Call the on_this_start() func. Set its working status as true.
249  *  Init and start the underlying Activity.
250  */
251 void ExecTask::fire()
252 {
253   on_this_start_(this);
254   Task::on_start(this);
255   working_          = true;
256   queued_execs_     = std::max(queued_execs_ - 1, 0);
257   if (tokens_received_.size() > 0)
258       tokens_received_.pop_front();
259   s4u::ExecPtr exec = s4u::Exec::init();
260   exec->set_name(name_);
261   exec->set_flops_amount(amount_);
262   exec->set_host(host_);
263   exec->start();
264   exec->extension_set(new ExtendedAttributeActivity());
265   exec->extension<ExtendedAttributeActivity>()->task_ = this;
266   current_activity_                                   = exec;
267 }
268
269 /** @ingroup plugin_task
270  *  @param host The host to set.
271  *  @brief Set a new host.
272  */
273 ExecTaskPtr ExecTask::set_host(s4u::Host* host)
274 {
275   kernel::actor::simcall_answered([this, host] { host_ = host; });
276   return this;
277 }
278
279 /** @ingroup plugin_task
280  *  @param flops The amount of flops to set.
281  */
282 ExecTaskPtr ExecTask::set_flops(double flops)
283 {
284   kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
285   return this;
286 }
287
288 /**
289  *  @brief Default constructor.
290  */
291 CommTask::CommTask(const std::string& name) : Task(name) {}
292
293 /** @ingroup plugin_task
294  *  @brief Smart constructor.
295  */
296 CommTaskPtr CommTask::init(const std::string& name)
297 {
298   return CommTaskPtr(new CommTask(name));
299 }
300
301 /** @ingroup plugin_task
302  *  @brief Smart constructor.
303  */
304 CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
305 {
306   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
307 }
308
309 /**
310  *  @brief Do one execution of the Task.
311  *  @note Call the on_this_start() func. Set its working status as true.
312  *  Init and start the underlying Activity.
313  */
314 void CommTask::fire()
315 {
316   on_this_start_(this);
317   Task::on_start(this);
318   working_          = true;
319   queued_execs_     = std::max(queued_execs_ - 1, 0);
320   if (tokens_received_.size() > 0)
321       tokens_received_.pop_front();
322   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
323   comm->set_name(name_);
324   comm->set_payload_size(amount_);
325   comm->start();
326   comm->extension_set(new ExtendedAttributeActivity());
327   comm->extension<ExtendedAttributeActivity>()->task_ = this;
328   current_activity_                                   = comm;
329 }
330
331 /** @ingroup plugin_task
332  *  @param source The host to set.
333  *  @brief Set a new source host.
334  */
335 CommTaskPtr CommTask::set_source(s4u::Host* source)
336 {
337   kernel::actor::simcall_answered([this, source] { source_ = source; });
338   return this;
339 }
340
341 /** @ingroup plugin_task
342  *  @param destination The host to set.
343  *  @brief Set a new destination host.
344  */
345 CommTaskPtr CommTask::set_destination(s4u::Host* destination)
346 {
347   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
348   return this;
349 }
350
351 /** @ingroup plugin_task
352  *  @param bytes The amount of bytes to set.
353  */
354 CommTaskPtr CommTask::set_bytes(double bytes)
355 {
356   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
357   return this;
358 }
359
360 /**
361  *  @brief Default constructor.
362  */
363 IoTask::IoTask(const std::string& name) : Task(name) {}
364
365 /** @ingroup plugin_task
366  *  @brief Smart Constructor.
367  */
368 IoTaskPtr IoTask::init(const std::string& name)
369 {
370   return IoTaskPtr(new IoTask(name));
371 }
372
373 /** @ingroup plugin_task
374  *  @brief Smart Constructor.
375  */
376 IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
377 {
378   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
379 }
380
381 /** @ingroup plugin_task
382  *  @param disk The disk to set.
383  *  @brief Set a new disk.
384  */
385 IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
386 {
387   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
388   return this;
389 }
390
391 /** @ingroup plugin_task
392  *  @param bytes The amount of bytes to set.
393  */
394 IoTaskPtr IoTask::set_bytes(double bytes)
395 {
396   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
397   return this;
398 }
399
400 /** @ingroup plugin_task */
401 IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
402 {
403   kernel::actor::simcall_answered([this, type] { type_ = type; });
404   return this;
405 }
406
407 void IoTask::fire()
408 {
409   on_this_start_(this);
410   Task::on_start(this);
411   working_      = true;
412   queued_execs_ = std::max(queued_execs_ - 1, 0);
413   if (tokens_received_.size() > 0)
414       tokens_received_.pop_front();
415   s4u::IoPtr io = s4u::Io::init();
416   io->set_name(name_);
417   io->set_size(amount_);
418   io->set_disk(disk_);
419   io->set_op_type(type_);
420   io->start();
421   io->extension_set(new ExtendedAttributeActivity());
422   io->extension<ExtendedAttributeActivity>()->task_ = this;
423   current_activity_                                 = io;
424 }
425
426 } // namespace simgrid::plugins