Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
dca0c9013d3ebb4cc02321a032447eb96b31b563
[simgrid.git] / src / plugins / task.cpp
1 #include <simgrid/Exception.hpp>
2 #include <simgrid/plugins/task.hpp>
3 #include <simgrid/s4u/Comm.hpp>
4 #include <simgrid/s4u/Exec.hpp>
5 #include <simgrid/s4u/Io.hpp>
6 #include <simgrid/simix.hpp>
7
8 #include "src/simgrid/module.hpp"
9
10 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
11 /** @defgroup plugin_task plugin_task Plugin Task
12
13   @beginrst
14
15 This is the task plugin, enabling management of Tasks.
16 To activate this plugin, first call :cpp:func:`Task::init`.
17
18 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
19 Tasks can only be instancied using either
20 :cpp:func:`simgrid::plugins::ExecTask::init` or :cpp:func:`simgrid::plugins::CommTask::init`
21 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
22 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
23
24   @endrst
25  */
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
27
28 namespace simgrid::plugins {
29
30 xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
31
32 xbt::signal<void(Task*)> Task::on_start;
33 xbt::signal<void(Task*)> Task::on_end;
34
35 Task::Task(const std::string& name) : name_(name) {}
36
37 /**
38  *  @brief Return True if the Task can start a new Activity.
39  *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
40  */
41 bool Task::ready_to_run() const
42 {
43   return not working_ && queued_execs_ > 0;
44 }
45
46 /**
47  *  @param source The sender.
48  *  @brief Receive a token from another Task.
49  *  @note Check upon reception if the Task has received a token from each of its predecessors,
50  * and in this case consumes those tokens and enqueue an execution.
51  */
52 void Task::receive(Task* source)
53 {
54   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
55   predecessors_[source]++;
56   bool enough_tokens = true;
57   for (auto const& [key, val] : predecessors_)
58     if (val < 1) {
59       enough_tokens = false;
60       break;
61     }
62   if (enough_tokens) {
63     for (auto& [key, val] : predecessors_)
64       val--;
65     enqueue_execs(1);
66   }
67 }
68
69 /**
70  *  @brief Task routine when finishing an execution.
71  *  @note Set its working status as false.
72  * Add 1 to its count of finished executions.
73  * Call the on_this_end func.
74  * Fire on_end callback.
75  * Send a token to each of its successors.
76  * Start a new execution if possible.
77  */
78 void Task::complete()
79 {
80   xbt_assert(s4u::Actor::is_maestro());
81   working_ = false;
82   count_++;
83   on_this_end_(this);
84   Task::on_end(this);
85   if (current_activity_)
86     previous_activity_ = std::move(current_activity_);
87   for (auto const& t : successors_)
88     t->receive(this);
89   if (ready_to_run())
90     fire();
91 }
92
93 /** @ingroup plugin_task
94  *  @brief Init the Task plugin.
95  *  @note Add a completion callback to all Activities to call Task::complete().
96  */
97 void Task::init()
98 {
99   static bool inited = false;
100   if (inited)
101     return;
102
103   inited                                  = true;
104   ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
105   simgrid::s4u::Exec::on_completion_cb(
106       [](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
107   simgrid::s4u::Comm::on_completion_cb(
108       [](simgrid::s4u::Comm const& comm) { comm.extension<ExtendedAttributeActivity>()->task_->complete(); });
109   simgrid::s4u::Io::on_completion_cb(
110       [](simgrid::s4u::Io const& io) { io.extension<ExtendedAttributeActivity>()->task_->complete(); });
111 }
112
113 /** @ingroup plugin_task
114  *  @param n The number of executions to enqueue.
115  *  @brief Enqueue executions.
116  *  @note Immediatly starts an execution if possible.
117  */
118 void Task::enqueue_execs(int n)
119 {
120   simgrid::kernel::actor::simcall_answered([this, n] {
121     queued_execs_ += n;
122     if (ready_to_run())
123       fire();
124   });
125 }
126
127 /** @ingroup plugin_task
128  *  @param amount The amount to set.
129  *  @brief Set the amout of work to do.
130  *  @note Amount in flop for ExecTask and in bytes for CommTask.
131  */
132 void Task::set_amount(double amount)
133 {
134   simgrid::kernel::actor::simcall_answered([this, amount] { amount_ = amount; });
135 }
136
137 /** @ingroup plugin_task
138  *  @param successor The Task to add.
139  *  @brief Add a successor to this Task.
140  *  @note It also adds this as a predecessor of successor.
141  */
142 void Task::add_successor(TaskPtr successor)
143 {
144   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
145     successors_.insert(successor_p);
146     successor_p->predecessors_.try_emplace(this, 0);
147   });
148 }
149
150 /** @ingroup plugin_task
151  *  @param successor The Task to remove.
152  *  @brief Remove a successor from this Task.
153  *  @note It also remove this from the predecessors of successor.
154  */
155 void Task::remove_successor(TaskPtr successor)
156 {
157   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
158     successor_p->predecessors_.erase(this);
159     successors_.erase(successor_p);
160   });
161 }
162
163 void Task::remove_all_successors()
164 {
165   simgrid::kernel::actor::simcall_answered([this] {
166     while (not successors_.empty()) {
167       auto* successor = *(successors_.begin());
168       successor->predecessors_.erase(this);
169       successors_.erase(successor);
170     }
171   });
172 }
173
174 /** @ingroup plugin_task
175  *  @param func The function to set.
176  *  @brief Set a function to be called before each execution.
177  *  @note The function is called before the underlying Activity starts.
178  */
179 void Task::on_this_start_cb(const std::function<void(Task*)>& func)
180 {
181   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
182 }
183
184 /** @ingroup plugin_task
185  *  @param func The function to set.
186  *  @brief Set a function to be called after each execution.
187  *  @note The function is called after the underlying Activity ends, but before sending tokens to successors.
188  */
189 void Task::on_this_end_cb(const std::function<void(Task*)>& func)
190 {
191   simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
192 }
193
194 /** @ingroup plugin_task
195  *  @brief Return the number of completed executions.
196  */
197 int Task::get_count() const
198 {
199   return count_;
200 }
201
202 /**
203  *  @brief Default constructor.
204  */
205 ExecTask::ExecTask(const std::string& name) : Task(name) {}
206
207 /** @ingroup plugin_task
208  *  @brief Smart Constructor.
209  */
210 ExecTaskPtr ExecTask::init(const std::string& name)
211 {
212   return ExecTaskPtr(new ExecTask(name));
213 }
214
215 /** @ingroup plugin_task
216  *  @brief Smart Constructor.
217  */
218 ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* host)
219 {
220   return init(name)->set_flops(flops)->set_host(host);
221 }
222
223 /**
224  *  @brief Do one execution of the Task.
225  *  @note Call the on_this_start() func. Set its working status as true.
226  *  Init and start the underlying Activity.
227  */
228 void ExecTask::fire()
229 {
230   on_this_start_(this);
231   Task::on_start(this);
232   working_          = true;
233   queued_execs_     = std::max(queued_execs_ - 1, 0);
234   s4u::ExecPtr exec = s4u::Exec::init();
235   exec->set_name(name_);
236   exec->set_flops_amount(amount_);
237   exec->set_host(host_);
238   exec->start();
239   exec->extension_set(new ExtendedAttributeActivity());
240   exec->extension<ExtendedAttributeActivity>()->task_ = this;
241   current_activity_                                   = exec;
242 }
243
244 /** @ingroup plugin_task
245  *  @param host The host to set.
246  *  @brief Set a new host.
247  */
248 ExecTaskPtr ExecTask::set_host(s4u::Host* host)
249 {
250   kernel::actor::simcall_answered([this, host] { host_ = host; });
251   return this;
252 }
253
254 /** @ingroup plugin_task
255  *  @param flops The amount of flops to set.
256  */
257 ExecTaskPtr ExecTask::set_flops(double flops)
258 {
259   kernel::actor::simcall_answered([this, flops] { amount_ = flops; });
260   return this;
261 }
262
263 /**
264  *  @brief Default constructor.
265  */
266 CommTask::CommTask(const std::string& name) : Task(name) {}
267
268 /** @ingroup plugin_task
269  *  @brief Smart constructor.
270  */
271 CommTaskPtr CommTask::init(const std::string& name)
272 {
273   return CommTaskPtr(new CommTask(name));
274 }
275
276 /** @ingroup plugin_task
277  *  @brief Smart constructor.
278  */
279 CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* source, s4u::Host* destination)
280 {
281   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
282 }
283
284 /**
285  *  @brief Do one execution of the Task.
286  *  @note Call the on_this_start() func. Set its working status as true.
287  *  Init and start the underlying Activity.
288  */
289 void CommTask::fire()
290 {
291   on_this_start_(this);
292   Task::on_start(this);
293   working_          = true;
294   queued_execs_     = std::max(queued_execs_ - 1, 0);
295   s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
296   comm->set_name(name_);
297   comm->set_payload_size(amount_);
298   comm->start();
299   comm->extension_set(new ExtendedAttributeActivity());
300   comm->extension<ExtendedAttributeActivity>()->task_ = this;
301   current_activity_                                   = comm;
302 }
303
304 /** @ingroup plugin_task
305  *  @param source The host to set.
306  *  @brief Set a new source host.
307  */
308 CommTaskPtr CommTask::set_source(s4u::Host* source)
309 {
310   kernel::actor::simcall_answered([this, source] { source_ = source; });
311   return this;
312 }
313
314 /** @ingroup plugin_task
315  *  @param destination The host to set.
316  *  @brief Set a new destination host.
317  */
318 CommTaskPtr CommTask::set_destination(s4u::Host* destination)
319 {
320   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
321   return this;
322 }
323
324 /** @ingroup plugin_task
325  *  @param bytes The amount of bytes to set.
326  */
327 CommTaskPtr CommTask::set_bytes(double bytes)
328 {
329   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
330   return this;
331 }
332
333 /**
334  *  @brief Default constructor.
335  */
336 IoTask::IoTask(const std::string& name) : Task(name) {}
337
338 /** @ingroup plugin_task
339  *  @brief Smart Constructor.
340  */
341 IoTaskPtr IoTask::init(const std::string& name)
342 {
343   return IoTaskPtr(new IoTask(name));
344 }
345
346 /** @ingroup plugin_task
347  *  @brief Smart Constructor.
348  */
349 IoTaskPtr IoTask::init(const std::string& name, double bytes, s4u::Disk* disk, s4u::Io::OpType type)
350 {
351   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
352 }
353
354 /** @ingroup plugin_task
355  *  @param disk The disk to set.
356  *  @brief Set a new disk.
357  */
358 IoTaskPtr IoTask::set_disk(s4u::Disk* disk)
359 {
360   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
361   return this;
362 }
363
364 /** @ingroup plugin_task
365  *  @param bytes The amount of bytes to set.
366  */
367 IoTaskPtr IoTask::set_bytes(double bytes)
368 {
369   kernel::actor::simcall_answered([this, bytes] { amount_ = bytes; });
370   return this;
371 }
372
373 /** @ingroup plugin_task */
374 IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type)
375 {
376   kernel::actor::simcall_answered([this, type] { type_ = type; });
377   return this;
378 }
379
380 void IoTask::fire()
381 {
382   on_this_start_(this);
383   Task::on_start(this);
384   working_      = true;
385   queued_execs_ = std::max(queued_execs_ - 1, 0);
386   s4u::IoPtr io = s4u::Io::init();
387   io->set_name(name_);
388   io->set_size(amount_);
389   io->set_disk(disk_);
390   io->set_op_type(type_);
391   io->start();
392   io->extension_set(new ExtendedAttributeActivity());
393   io->extension<ExtendedAttributeActivity>()->task_ = this;
394   current_activity_                                 = io;
395 }
396
397 } // namespace simgrid::plugins