Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add possibility to dispatch tasks (work in progress)
[simgrid.git] / src / s4u / s4u_Task.cpp
1 #include <memory>
2 #include <simgrid/Exception.hpp>
3 #include <simgrid/s4u/Activity.hpp>
4 #include <simgrid/s4u/Comm.hpp>
5 #include <simgrid/s4u/Disk.hpp>
6 #include <simgrid/s4u/Exec.hpp>
7 #include <simgrid/s4u/Io.hpp>
8 #include <simgrid/s4u/Task.hpp>
9 #include <simgrid/simix.hpp>
10 #include <string>
11 #include <xbt/asserts.h>
12
13 #include "src/simgrid/module.hpp"
14
15 SIMGRID_REGISTER_PLUGIN(task, "Battery management", nullptr)
16 /**
17   @beginrst
18
19
20 Tasks are designed to represent dataflows, i.e, graphs of Tasks.
21 Tasks can only be instancied using either
22 :cpp:func:`simgrid::s4u::ExecTask::init` or :cpp:func:`simgrid::s4u::CommTask::init`
23 An ExecTask is an Execution Task. Its underlying Activity is an :ref:`Exec <API_s4u_Exec>`.
24 A CommTask is a Communication Task. Its underlying Activity is a :ref:`Comm <API_s4u_Comm>`.
25
26   @endrst
27  */
28 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plugin");
29
30 namespace simgrid::s4u {
31
32 Task::Task(const std::string& name) : name_(name) {}
33
34 /**
35  *  @brief Return True if the Task can start a new Activity.
36  *  @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
37  */
38 bool Task::ready_to_run(std::string instance)
39 {
40   return running_instances_[instance] < parallelism_degree_[instance] && queued_firings_[instance] > 0;
41 }
42
43 /**
44  *  @param source The sender.
45  *  @brief Receive a token from another Task.
46  *  @note Check upon reception if the Task has received a token from each of its predecessors,
47  * and in this case consumes those tokens and enqueue an execution.
48  */
49 void Task::receive(Task* source)
50 {
51   XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
52   auto source_count = predecessors_[source];
53   predecessors_[source]++;
54   if (tokens_received_.size() <= queued_firings_["dispatcher"] + source_count)
55     tokens_received_.emplace_back();
56   tokens_received_[queued_firings_["dispatcher"] + source_count][source] = source->token_;
57   bool enough_tokens                                                     = true;
58   for (auto const& [key, val] : predecessors_)
59     if (val < 1) {
60       enough_tokens = false;
61       break;
62     }
63   if (enough_tokens) {
64     for (auto& [key, val] : predecessors_)
65       val--;
66     enqueue_firings(1);
67   }
68 }
69
70 /**
71  *  @brief Task routine when finishing an execution.
72  *  @note Set its working status as false.
73  * Add 1 to its count of finished executions.
74  * Call the on_this_end func.
75  * Fire on_end callback.
76  * Send a token to each of its successors.
77  * Start a new execution if possible.
78  */
79 void Task::complete(std::string instance)
80 {
81   xbt_assert(Actor::is_maestro());
82   running_instances_[instance] = running_instances_[instance] - 1;
83   count_[instance]             = count_[instance] + 1;
84   if (instance == "collector") {
85     on_this_completion(this);
86     on_completion(this);
87     for (auto const& t : successors_)
88       t->receive(this);
89   } else if (instance == "dispatcher") {
90     auto next_instance = load_balancing_function_();
91     xbt_assert(next_instance != "dispatcher" and next_instance != "collector", "Invalid instance selected: %s",
92                next_instance.c_str());
93     queued_firings_[next_instance] = queued_firings_.at(next_instance) + 1;
94     while (ready_to_run(next_instance))
95       fire(next_instance);
96   } else {
97     queued_firings_["collector"] = queued_firings_["collector"] + 1;
98     while (ready_to_run("collector"))
99       fire("collector");
100   }
101   if (ready_to_run(instance))
102     fire(instance);
103 }
104
105 /** @param n The new parallelism degree of the Task.
106  *  @brief Set the parallelism degree of the Task to inscrease or decrease horizontal scaling.
107  *  @note When increasing the degree the function starts new instances if there is queued firings.
108  *        When decreasing the degree the function does NOT stop running instances.
109  */
110 void Task::set_parallelism_degree(int n, std::string instance)
111 {
112   xbt_assert(n > 0, "Parallelism degree must be above 0.");
113   simgrid::kernel::actor::simcall_answered([this, n, &instance] {
114     if (instance == "all") {
115       for (auto& [key, value] : parallelism_degree_) {
116         parallelism_degree_[key] = n;
117         while (ready_to_run(key))
118           fire(key);
119       }
120     } else {
121       parallelism_degree_[instance] = n;
122       while (ready_to_run(instance))
123         fire(instance);
124     }
125   });
126 }
127
128 void Task::set_internal_bytes(int bytes, std::string instance)
129 {
130   simgrid::kernel::actor::simcall_answered([this, bytes, &instance] { internal_bytes_to_send_[instance] = bytes; });
131 }
132
133 void Task::set_load_balancing_function(std::function<std::string()> func)
134 {
135   simgrid::kernel::actor::simcall_answered([this, func] { load_balancing_function_ = func; });
136 }
137
138 /** @param n The number of firings to enqueue.
139  *  @brief Enqueue firing.
140  *  @note Immediatly fire an activity if possible.
141  */
142 void Task::enqueue_firings(int n)
143 {
144   simgrid::kernel::actor::simcall_answered([this, n] {
145     queued_firings_["dispatcher"] += n;
146     while (ready_to_run("dispatcher"))
147       fire("dispatcher");
148   });
149 }
150
151 /** @param name The new name to set.
152  *  @brief Set the name of the Task.
153  */
154 void Task::set_name(std::string name)
155 {
156   name_ = name;
157 }
158
159 /** @param amount The amount to set.
160  *  @brief Set the amout of work to do.
161  *  @note Amount in flop for ExecTask and in bytes for CommTask.
162  */
163 void Task::set_amount(double amount, std::string instance)
164 {
165   simgrid::kernel::actor::simcall_answered([this, amount, &instance] { amount_[instance] = amount; });
166 }
167
168 /** @param token The token to set.
169  *  @brief Set the token to send to successors.
170  *  @note The token is passed to each successor after the task end, i.e., after the on_end callback.
171  */
172 void Task::set_token(std::shared_ptr<Token> token)
173 {
174   simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
175 }
176
177 void Task::fire(std::string instance)
178 {
179   if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
180     current_activities_[instance].pop_front();
181   }
182   if (instance == "dispatcher") {
183     on_this_start(this);
184     on_start(this);
185   }
186   running_instances_[instance]++;
187   queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
188   if (not tokens_received_.empty())
189     tokens_received_.pop_front();
190 }
191
192 /** @param successor The Task to add.
193  *  @brief Add a successor to this Task.
194  *  @note It also adds this as a predecessor of successor.
195  */
196 void Task::add_successor(TaskPtr successor)
197 {
198   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
199     successors_.insert(successor_p);
200     successor_p->predecessors_.try_emplace(this, 0);
201   });
202 }
203
204 /** @param successor The Task to remove.
205  *  @brief Remove a successor from this Task.
206  *  @note It also remove this from the predecessors of successor.
207  */
208 void Task::remove_successor(TaskPtr successor)
209 {
210   simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
211     successor_p->predecessors_.erase(this);
212     successors_.erase(successor_p);
213   });
214 }
215
216 /**
217  *  @brief TODO
218  */
219 void Task::remove_all_successors()
220 {
221   simgrid::kernel::actor::simcall_answered([this] {
222     while (not successors_.empty()) {
223       auto* successor = *(successors_.begin());
224       successor->predecessors_.erase(this);
225       successors_.erase(successor);
226     }
227   });
228 }
229
230 /**
231  *  @brief TODO
232  */
233 void Task::add_instances(int n)
234 {
235   xbt_assert(n >= 0, "Cannot add a negative number of instances (provided: %d)", n);
236   int instance_count = (int)amount_.size() - 2;
237   for (int i = instance_count; i < n + instance_count; i++) {
238     amount_["instance_" + std::to_string(i)]                 = amount_.at("instance_0");
239     queued_firings_["instance_" + std::to_string(i)]         = 0;
240     running_instances_["instance_" + std::to_string(i)]      = 0;
241     count_["instance_" + std::to_string(i)]                  = 0;
242     parallelism_degree_["instance_" + std::to_string(i)]     = parallelism_degree_.at("instance_0");
243     current_activities_["instance_" + std::to_string(i)]     = {};
244     internal_bytes_to_send_["instance_" + std::to_string(i)] = internal_bytes_to_send_.at("instance_0");
245     ;
246   }
247 }
248
249 /**
250  *  @brief TODO
251  */
252 void Task::remove_instances(int n)
253 {
254   int instance_count = (int)amount_.size() - 2;
255   xbt_assert(n >= 0, "Cannot remove a negative number of instances (provided: %d)", n);
256   xbt_assert(instance_count - n > 0, "The number of instances must be above 0 (instances: %d, provided: %d)",
257              instance_count, n);
258   for (int i = instance_count - 1; i >= instance_count - n; i--)
259     xbt_assert(running_instances_.at("instance_" + std::to_string(i)) == 0,
260                "Cannot remove a running instance (instances: %d)", i);
261   for (int i = instance_count - 1; i >= instance_count - n; i--) {
262     amount_.erase("instance_" + std::to_string(i));
263     queued_firings_.erase("instance_" + std::to_string(i));
264     running_instances_.erase("instance_" + std::to_string(i));
265     count_.erase("instance_" + std::to_string(i));
266     parallelism_degree_.erase("instance_" + std::to_string(i));
267     current_activities_.erase("instance_" + std::to_string(i));
268   }
269 }
270
271 /**
272  *  @brief Default constructor.
273  */
274 ExecTask::ExecTask(const std::string& name) : Task(name)
275 {
276   set_load_balancing_function([]() { return "instance_0"; });
277 }
278
279 /**
280  *  @brief Smart Constructor.
281  */
282 ExecTaskPtr ExecTask::init(const std::string& name)
283 {
284   return ExecTaskPtr(new ExecTask(name));
285 }
286
287 /**
288  *  @brief Smart Constructor.
289  */
290 ExecTaskPtr ExecTask::init(const std::string& name, double flops, Host* host)
291 {
292   return init(name)->set_flops(flops)->set_host(host);
293 }
294
295 /**
296  *  @brief Do one execution of the Task.
297  *  @note Call the on_this_start() func.
298  *  Init and start the underlying Activity.
299  */
300 void ExecTask::fire(std::string instance)
301 {
302   Task::fire(instance);
303   if (instance == "dispatcher" or instance == "collector") {
304     auto exec = Exec::init()
305                     ->set_name(get_name() + "_" + instance)
306                     ->set_flops_amount(get_amount(instance))
307                     ->set_host(host_[instance]);
308     exec->start();
309     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
310     store_activity(exec, instance);
311   } else {
312     auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_[instance]);
313     if (host_["dispatcher"] == host_[instance]) {
314       exec->start();
315       store_activity(exec, instance);
316     } else {
317       auto comm = Comm::sendto_init(host_["dispatcher"], host_[instance])
318                       ->set_name(get_name() + "_dispatcher_to_" + instance)
319                       ->set_payload_size(get_internal_bytes("dispatcher"));
320       comm->add_successor(exec);
321       comm->start();
322       store_activity(comm, instance);
323     }
324     if (host_[instance] == host_["collector"]) {
325       exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
326       if (host_["dispatcher"] != host_[instance])
327         store_activity(exec, instance);
328     } else {
329       auto comm = Comm::sendto_init(host_[instance], host_["collector"])
330                       ->set_name(get_name() + instance + "_to_collector")
331                       ->set_payload_size(get_internal_bytes(instance));
332       exec->add_successor(comm);
333       comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
334       comm.detach();
335     }
336   }
337 }
338
339 /**
340  *  @param host The host to set.
341  *  @brief Set a new host.
342  */
343 ExecTaskPtr ExecTask::set_host(Host* host, std::string instance)
344 {
345   kernel::actor::simcall_answered([this, host, &instance] {
346     if (instance == "all")
347       for (auto& [key, value] : host_)
348         host_[key] = host;
349     else
350       host_[instance] = host;
351   });
352   return this;
353 }
354
355 /**
356  *  @param flops The amount of flops to set.
357  */
358 ExecTaskPtr ExecTask::set_flops(double flops, std::string instance)
359 {
360   kernel::actor::simcall_answered([this, flops, &instance] { set_amount(flops, instance); });
361   return this;
362 }
363
364 /**
365  *  @brief TODO
366  */
367 void ExecTask::add_instances(int n)
368 {
369   Task::add_instances(n);
370   int instance_count = (int)host_.size() - 2;
371   for (int i = instance_count; i < n + instance_count; i++)
372     host_["instance_" + std::to_string(i)] = host_.at("instance_0");
373 }
374
375 /**
376  *  @brief TODO
377  */
378 void ExecTask::remove_instances(int n)
379 {
380   Task::remove_instances(n);
381   int instance_count = (int)host_.size() - 2;
382   for (int i = instance_count - 1; i >= instance_count - n; i--)
383     host_.erase("instance_" + std::to_string(i));
384 }
385
386 /**
387  *  @brief Default constructor.
388  */
389 CommTask::CommTask(const std::string& name) : Task(name)
390 {
391   set_load_balancing_function([]() { return "instance_0"; });
392 }
393
394 /**
395  *  @brief Smart constructor.
396  */
397 CommTaskPtr CommTask::init(const std::string& name)
398 {
399   return CommTaskPtr(new CommTask(name));
400 }
401
402 /**
403  *  @brief Smart constructor.
404  */
405 CommTaskPtr CommTask::init(const std::string& name, double bytes, Host* source, Host* destination)
406 {
407   return init(name)->set_bytes(bytes)->set_source(source)->set_destination(destination);
408 }
409
410 /**
411  *  @brief Do one execution of the Task.
412  *  @note Call the on_this_start() func.
413  *  Init and start the underlying Activity.
414  */
415 void CommTask::fire(std::string instance)
416 {
417   Task::fire(instance);
418   if (instance == "dispatcher" or instance == "collector") {
419     auto exec = Exec::init()
420                     ->set_name(get_name() + "_" + instance)
421                     ->set_flops_amount(get_amount(instance))
422                     ->set_host(instance == "dispatcher" ? source_ : destination_);
423     exec->start();
424     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
425     store_activity(exec, instance);
426   } else {
427     auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
428     comm->start();
429     comm->on_this_completion_cb([this, instance](Comm const&) { complete(instance); });
430     store_activity(comm, instance);
431   }
432 }
433
434 /**
435  *  @param source The host to set.
436  *  @brief Set a new source host.
437  */
438 CommTaskPtr CommTask::set_source(Host* source)
439 {
440   kernel::actor::simcall_answered([this, source] { source_ = source; });
441   return this;
442 }
443
444 /**
445  *  @param destination The host to set.
446  *  @brief Set a new destination host.
447  */
448 CommTaskPtr CommTask::set_destination(Host* destination)
449 {
450   kernel::actor::simcall_answered([this, destination] { destination_ = destination; });
451   return this;
452 }
453
454 /**
455  *  @param bytes The amount of bytes to set.
456  */
457 CommTaskPtr CommTask::set_bytes(double bytes)
458 {
459   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
460   return this;
461 }
462
463 /**
464  *  @brief Default constructor.
465  */
466 IoTask::IoTask(const std::string& name) : Task(name)
467 {
468   set_load_balancing_function([]() { return "instance_0"; });
469 }
470
471 /**
472  *  @brief Smart Constructor.
473  */
474 IoTaskPtr IoTask::init(const std::string& name)
475 {
476   return IoTaskPtr(new IoTask(name));
477 }
478
479 /**
480  *  @brief Smart Constructor.
481  */
482 IoTaskPtr IoTask::init(const std::string& name, double bytes, Disk* disk, Io::OpType type)
483 {
484   return init(name)->set_bytes(bytes)->set_disk(disk)->set_op_type(type);
485 }
486
487 /**
488  *  @param disk The disk to set.
489  *  @brief Set a new disk.
490  */
491 IoTaskPtr IoTask::set_disk(Disk* disk)
492 {
493   kernel::actor::simcall_answered([this, disk] { disk_ = disk; });
494   return this;
495 }
496
497 /**
498  *  @param bytes The amount of bytes to set.
499  */
500 IoTaskPtr IoTask::set_bytes(double bytes)
501 {
502   kernel::actor::simcall_answered([this, bytes] { set_amount(bytes); });
503   return this;
504 }
505
506 /**  */
507 IoTaskPtr IoTask::set_op_type(Io::OpType type)
508 {
509   kernel::actor::simcall_answered([this, type] { type_ = type; });
510   return this;
511 }
512
513 void IoTask::fire(std::string instance)
514 {
515   Task::fire(instance);
516   if (instance == "dispatcher" or instance == "collector") {
517     auto exec = Exec::init()
518                     ->set_name(get_name() + "_" + instance)
519                     ->set_flops_amount(get_amount(instance))
520                     ->set_host(disk_->get_host());
521     exec->start();
522     exec->on_this_completion_cb([this, instance](Exec const&) { complete(instance); });
523     store_activity(exec, instance);
524   } else {
525     auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
526     io->start();
527     io->on_this_completion_cb([this, instance](Io const&) { complete(instance); });
528     store_activity(io, instance);
529   }
530 }
531
532 } // namespace simgrid::s4u