Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Say goodbye to last global: surf_host_model
[simgrid.git] / src / simdag / sd_task.cpp
1 /* Copyright (c) 2006-2021. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "simdag_private.hpp"
8 #include "simgrid/kernel/routing/NetPoint.hpp"
9 #include "src/surf/HostImpl.hpp"
10 #include "src/surf/surf_interface.hpp"
11 #include <algorithm>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_task, sd, "Logging specific to SimDag (task)");
14
15 /* Destroys the data memorized by SD_task_schedule. Task state must be SD_SCHEDULED or SD_RUNNABLE. */
16 static void __SD_task_destroy_scheduling_data(SD_task_t task)
17 {
18   if (task->state != SD_SCHEDULED && task->state != SD_RUNNABLE)
19     throw std::invalid_argument(
20         simgrid::xbt::string_printf("Task '%s' must be SD_SCHEDULED or SD_RUNNABLE", SD_task_get_name(task)));
21
22   xbt_free(task->flops_amount);
23   xbt_free(task->bytes_amount);
24   task->bytes_amount = nullptr;
25   task->flops_amount = nullptr;
26 }
27
28 /**
29  * @brief Creates a new task.
30  *
31  * @param name the name of the task (can be @c nullptr)
32  * @param data the user data you want to associate with the task (can be @c nullptr)
33  * @param amount amount of the task
34  * @return the new task
35  * @see SD_task_destroy()
36  */
37 SD_task_t SD_task_create(const char* name, void* data, double amount)
38 {
39   SD_task_t task = xbt_new0(s_SD_task_t, 1);
40   task->kind     = SD_TASK_NOT_TYPED;
41   task->state    = SD_NOT_SCHEDULED;
42   sd_global->initial_tasks.insert(task);
43
44   task->marked       = false;
45   task->start_time   = -1.0;
46   task->finish_time  = -1.0;
47   task->surf_action  = nullptr;
48   task->watch_points = 0;
49
50   task->inputs       = new std::set<SD_task_t>();
51   task->outputs      = new std::set<SD_task_t>();
52   task->predecessors = new std::set<SD_task_t>();
53   task->successors   = new std::set<SD_task_t>();
54
55   task->data       = data;
56   task->name       = xbt_strdup(name);
57   task->amount     = amount;
58   task->allocation = new std::vector<sg_host_t>();
59   task->rate       = -1;
60   return task;
61 }
62
63 static inline SD_task_t SD_task_create_sized(const char* name, void* data, double amount, int count)
64 {
65   SD_task_t task     = SD_task_create(name, data, amount);
66   task->bytes_amount = xbt_new0(double, count* count);
67   task->flops_amount = xbt_new0(double, count);
68   return task;
69 }
70
71 /** @brief create an end-to-end communication task that can then be auto-scheduled
72  *
73  * Auto-scheduling mean that the task can be used with SD_task_schedulev(). This allows one to specify the task costs at
74  * creation, and decouple them from the scheduling process where you just specify which resource should deliver the
75  * mandatory power.
76  *
77  * A end-to-end communication must be scheduled on 2 hosts, and the amount specified at creation is sent from hosts[0]
78  * to hosts[1].
79  */
80 SD_task_t SD_task_create_comm_e2e(const char* name, void* data, double amount)
81 {
82   SD_task_t res        = SD_task_create_sized(name, data, amount, 2);
83   res->bytes_amount[2] = amount;
84   res->kind            = SD_TASK_COMM_E2E;
85
86   return res;
87 }
88
89 /** @brief create a sequential computation task that can then be auto-scheduled
90  *
91  * Auto-scheduling mean that the task can be used with SD_task_schedulev(). This allows one to specify the task costs at
92  * creation, and decouple them from the scheduling process where you just specify which resource should deliver the
93  * mandatory power.
94  *
95  * A sequential computation must be scheduled on 1 host, and the amount specified at creation to be run on hosts[0].
96  *
97  * @param name the name of the task (can be @c nullptr)
98  * @param data the user data you want to associate with the task (can be @c nullptr)
99  * @param flops_amount amount of compute work to be done by the task
100  * @return the new SD_TASK_COMP_SEQ typed task
101  */
102 SD_task_t SD_task_create_comp_seq(const char* name, void* data, double flops_amount)
103 {
104   SD_task_t res        = SD_task_create_sized(name, data, flops_amount, 1);
105   res->flops_amount[0] = flops_amount;
106   res->kind            = SD_TASK_COMP_SEQ;
107
108   return res;
109 }
110
111 /** @brief create a parallel computation task that can then be auto-scheduled
112  *
113  * Auto-scheduling mean that the task can be used with SD_task_schedulev(). This allows one to specify the task costs at
114  * creation, and decouple them from the scheduling process where you just specify which resource should deliver the
115  * mandatory power.
116  *
117  * A parallel computation can be scheduled on any number of host.
118  * The underlying speedup model is Amdahl's law.
119  * To be auto-scheduled, @see SD_task_distribute_comp_amdahl has to be called first.
120  * @param name the name of the task (can be @c nullptr)
121  * @param data the user data you want to associate with the task (can be @c nullptr)
122  * @param flops_amount amount of compute work to be done by the task
123  * @param alpha purely serial fraction of the work to be done (in [0.;1.[)
124  * @return the new task
125  */
126 SD_task_t SD_task_create_comp_par_amdahl(const char* name, void* data, double flops_amount, double alpha)
127 {
128   xbt_assert(alpha < 1. && alpha >= 0., "Invalid parameter: alpha must be in [0.;1.[");
129
130   SD_task_t res = SD_task_create(name, data, flops_amount);
131   res->alpha    = alpha;
132   res->kind     = SD_TASK_COMP_PAR_AMDAHL;
133
134   return res;
135 }
136
137 /** @brief create a complex data redistribution task that can then be  auto-scheduled
138  *
139  * Auto-scheduling mean that the task can be used with SD_task_schedulev().
140  * This allows one to specify the task costs at creation, and decouple them from the scheduling process where you just
141  * specify which resource should communicate.
142  *
143  * A data redistribution can be scheduled on any number of host.
144  * The assumed distribution is a 1D block distribution. Each host owns the same share of the @see amount.
145  * To be auto-scheduled, @see SD_task_distribute_comm_mxn_1d_block has to be  called first.
146  * @param name the name of the task (can be @c nullptr)
147  * @param data the user data you want to associate with the task (can be @c nullptr)
148  * @param amount amount of data to redistribute by the task
149  * @return the new task
150  */
151 SD_task_t SD_task_create_comm_par_mxn_1d_block(const char* name, void* data, double amount)
152 {
153   SD_task_t res = SD_task_create(name, data, amount);
154   res->kind     = SD_TASK_COMM_PAR_MXN_1D_BLOCK;
155
156   return res;
157 }
158
159 /**
160  * @brief Destroys a task.
161  *
162  * The user data (if any) should have been destroyed first.
163  *
164  * @param task the task you want to destroy
165  * @see SD_task_create()
166  */
167 void SD_task_destroy(SD_task_t task)
168 {
169   XBT_DEBUG("Destroying task %s...", SD_task_get_name(task));
170
171   /* First Remove all dependencies associated with the task. */
172   while (not task->predecessors->empty())
173     SD_task_dependency_remove(*(task->predecessors->begin()), task);
174   while (not task->inputs->empty())
175     SD_task_dependency_remove(*(task->inputs->begin()), task);
176   while (not task->successors->empty())
177     SD_task_dependency_remove(task, *(task->successors->begin()));
178   while (not task->outputs->empty())
179     SD_task_dependency_remove(task, *(task->outputs->begin()));
180
181   if (task->state == SD_SCHEDULED || task->state == SD_RUNNABLE)
182     __SD_task_destroy_scheduling_data(task);
183
184   xbt_free(task->name);
185
186   if (task->surf_action != nullptr)
187     task->surf_action->unref();
188
189   delete task->allocation;
190   xbt_free(task->bytes_amount);
191   xbt_free(task->flops_amount);
192   delete task->inputs;
193   delete task->outputs;
194   delete task->predecessors;
195   delete task->successors;
196   xbt_free(task);
197
198   XBT_DEBUG("Task destroyed.");
199 }
200
201 /**
202  * @brief Returns the user data of a task
203  *
204  * @param task a task
205  * @return the user data associated with this task (can be @c nullptr)
206  * @see SD_task_set_data()
207  */
208 void* SD_task_get_data(const_SD_task_t task)
209 {
210   return task->data;
211 }
212
213 /**
214  * @brief Sets the user data of a task
215  *
216  * The new data can be @c nullptr. The old data should have been freed first, if it was not @c nullptr.
217  *
218  * @param task a task
219  * @param data the new data you want to associate with this task
220  * @see SD_task_get_data()
221  */
222 void SD_task_set_data(SD_task_t task, void* data)
223 {
224   task->data = data;
225 }
226
227 /**
228  * @brief Sets the rate of a task
229  *
230  * This will change the network bandwidth a task can use. This rate  cannot be dynamically changed. Once the task has
231  * started, this call is ineffective. This rate depends on both the nominal bandwidth on the route onto which the task
232  * is scheduled (@see SD_task_get_current_bandwidth) and the amount of data to transfer.
233  *
234  * To divide the nominal bandwidth by 2, the rate then has to be :
235  *    rate = bandwidth/(2*amount)
236  *
237  * @param task a @see SD_TASK_COMM_E2E task (end-to-end communication)
238  * @param rate the new rate you want to associate with this task.
239  */
240 void SD_task_set_rate(SD_task_t task, double rate)
241 {
242   xbt_assert(task->kind == SD_TASK_COMM_E2E, "The rate can be modified for end-to-end communications only.");
243   if (task->state < SD_RUNNING) {
244     task->rate = rate;
245   } else {
246     XBT_WARN("Task %p has started. Changing rate is ineffective.", task);
247   }
248 }
249
250 /**
251  * @brief Returns the state of a task
252  *
253  * @param task a task
254  * @return the current @ref e_SD_task_state_t "state" of this task:
255  * #SD_NOT_SCHEDULED, #SD_SCHEDULED, #SD_RUNNABLE, #SD_RUNNING, #SD_DONE or #SD_FAILED
256  * @see e_SD_task_state_t
257  */
258 e_SD_task_state_t SD_task_get_state(const_SD_task_t task)
259 {
260   return task->state;
261 }
262
263 /* Changes the state of a task. Updates the sd_global->watch_point_reached flag.
264  */
265 void SD_task_set_state(SD_task_t task, e_SD_task_state_t new_state)
266 {
267   std::set<SD_task_t>::iterator idx;
268   XBT_DEBUG("Set state of '%s' to %d", task->name, new_state);
269   if ((new_state == SD_NOT_SCHEDULED || new_state == SD_SCHEDULABLE) && task->state == SD_FAILED) {
270     sd_global->completed_tasks.erase(task);
271     sd_global->initial_tasks.insert(task);
272   }
273
274   if (new_state == SD_SCHEDULED && task->state == SD_RUNNABLE) {
275     sd_global->initial_tasks.insert(task);
276     sd_global->runnable_tasks.erase(task);
277   }
278
279   if (new_state == SD_RUNNABLE) {
280     idx = sd_global->initial_tasks.find(task);
281     if (idx != sd_global->initial_tasks.end()) {
282       sd_global->runnable_tasks.insert(*idx);
283       sd_global->initial_tasks.erase(idx);
284     }
285   }
286
287   if (new_state == SD_RUNNING)
288     sd_global->runnable_tasks.erase(task);
289
290   if (new_state == SD_DONE || new_state == SD_FAILED) {
291     sd_global->completed_tasks.insert(task);
292     task->start_time = task->surf_action->get_start_time();
293     if (new_state == SD_DONE) {
294       task->finish_time = task->surf_action->get_finish_time();
295 #if SIMGRID_HAVE_JEDULE
296       jedule_log_sd_event(task);
297 #endif
298     } else
299       task->finish_time = surf_get_clock();
300     task->surf_action->unref();
301     task->surf_action = nullptr;
302     task->allocation->clear();
303   }
304
305   task->state = new_state;
306
307   if (task->watch_points & new_state) {
308     XBT_VERB("Watch point reached with task '%s'!", task->name);
309     sd_global->watch_point_reached = true;
310     SD_task_unwatch(task, new_state); /* remove the watch point */
311   }
312 }
313
314 /**
315  * @brief Returns the name of a task
316  *
317  * @param task a task
318  * @return the name of this task (can be @c nullptr)
319  */
320 const char* SD_task_get_name(const_SD_task_t task)
321 {
322   return task->name;
323 }
324
325 /** @brief Allows to change the name of a task */
326 void SD_task_set_name(SD_task_t task, const char* name)
327 {
328   xbt_free(task->name);
329   task->name = xbt_strdup(name);
330 }
331
332 /** @brief Returns the dynar of the parents of a task
333  *
334  * @param task a task
335  * @return a newly allocated dynar comprising the parents of this task
336  */
337
338 xbt_dynar_t SD_task_get_parents(const_SD_task_t task)
339 {
340   xbt_dynar_t parents = xbt_dynar_new(sizeof(SD_task_t), nullptr);
341
342   for (auto const& it : *task->predecessors)
343     xbt_dynar_push(parents, &it);
344   for (auto const& it : *task->inputs)
345     xbt_dynar_push(parents, &it);
346
347   return parents;
348 }
349
350 /** @brief Returns the dynar of the parents of a task
351  *
352  * @param task a task
353  * @return a newly allocated dynar comprising the parents of this task
354  */
355 xbt_dynar_t SD_task_get_children(const_SD_task_t task)
356 {
357   xbt_dynar_t children = xbt_dynar_new(sizeof(SD_task_t), nullptr);
358
359   for (auto const& it : *task->successors)
360     xbt_dynar_push(children, &it);
361   for (auto const& it : *task->outputs)
362     xbt_dynar_push(children, &it);
363
364   return children;
365 }
366
367 /**
368  * @brief Returns the number of workstations involved in a task
369  *
370  * Only call this on already scheduled tasks!
371  * @param task a task
372  */
373 int SD_task_get_workstation_count(const_SD_task_t task)
374 {
375   return static_cast<int>(task->allocation->size());
376 }
377
378 /**
379  * @brief Returns the list of workstations involved in a task
380  *
381  * Only call this on already scheduled tasks!
382  * @param task a task
383  */
384 sg_host_t* SD_task_get_workstation_list(const_SD_task_t task)
385 {
386   return task->allocation->data();
387 }
388
389 /**
390  * @brief Returns the total amount of work contained in a task
391  *
392  * @param task a task
393  * @return the total amount of work (computation or data transfer) for this task
394  * @see SD_task_get_remaining_amount()
395  */
396 double SD_task_get_amount(const_SD_task_t task)
397 {
398   return task->amount;
399 }
400
401 /** @brief Sets the total amount of work of a task
402  * For sequential typed tasks (COMP_SEQ and COMM_E2E), it also sets the appropriate values in the flops_amount and
403  * bytes_amount arrays respectively. Nothing more than modifying task->amount is done for parallel  typed tasks
404  * (COMP_PAR_AMDAHL and COMM_PAR_MXN_1D_BLOCK) as the distribution of the amount of work is done at scheduling time.
405  *
406  * @param task a task
407  * @param amount the new amount of work to execute
408  */
409 void SD_task_set_amount(SD_task_t task, double amount)
410 {
411   task->amount = amount;
412   if (task->kind == SD_TASK_COMP_SEQ)
413     task->flops_amount[0] = amount;
414   if (task->kind == SD_TASK_COMM_E2E)
415     task->bytes_amount[2] = amount;
416 }
417
418 /**
419  * @brief Returns the alpha parameter of a SD_TASK_COMP_PAR_AMDAHL task
420  *
421  * @param task a parallel task assuming Amdahl's law as speedup model
422  * @return the alpha parameter (serial part of a task in percent) for this task
423  */
424 double SD_task_get_alpha(const_SD_task_t task)
425 {
426   xbt_assert(SD_task_get_kind(task) == SD_TASK_COMP_PAR_AMDAHL, "Alpha parameter is not defined for this kind of task");
427   return task->alpha;
428 }
429
430 /**
431  * @brief Returns the remaining amount work to do till the completion of a task
432  *
433  * @param task a task
434  * @return the remaining amount of work (computation or data transfer) of this task
435  * @see SD_task_get_amount()
436  */
437 double SD_task_get_remaining_amount(const_SD_task_t task)
438 {
439   if (task->surf_action)
440     return task->surf_action->get_remains();
441   else
442     return (task->state == SD_DONE) ? 0 : task->amount;
443 }
444
445 e_SD_task_kind_t SD_task_get_kind(const_SD_task_t task)
446 {
447   return task->kind;
448 }
449
450 /** @brief Displays debugging information about a task */
451 void SD_task_dump(const_SD_task_t task)
452 {
453   XBT_INFO("Displaying task %s", SD_task_get_name(task));
454   if (task->state == SD_RUNNABLE)
455     XBT_INFO("  - state: runnable");
456   else if (task->state < SD_RUNNABLE)
457     XBT_INFO("  - state: %s not runnable", __get_state_name(task->state));
458   else
459     XBT_INFO("  - state: not runnable %s", __get_state_name(task->state));
460
461   if (task->kind != 0) {
462     switch (task->kind) {
463       case SD_TASK_COMM_E2E:
464         XBT_INFO("  - kind: end-to-end communication");
465         break;
466       case SD_TASK_COMP_SEQ:
467         XBT_INFO("  - kind: sequential computation");
468         break;
469       case SD_TASK_COMP_PAR_AMDAHL:
470         XBT_INFO("  - kind: parallel computation following Amdahl's law");
471         break;
472       case SD_TASK_COMM_PAR_MXN_1D_BLOCK:
473         XBT_INFO("  - kind: MxN data redistribution assuming 1D block distribution");
474         break;
475       default:
476         XBT_INFO("  - (unknown kind %d)", task->kind);
477     }
478   }
479
480   XBT_INFO("  - amount: %.0f", SD_task_get_amount(task));
481   if (task->kind == SD_TASK_COMP_PAR_AMDAHL)
482     XBT_INFO("  - alpha: %.2f", task->alpha);
483   XBT_INFO("  - Dependencies to satisfy: %zu", task->inputs->size() + task->predecessors->size());
484   if ((task->inputs->size() + task->predecessors->size()) > 0) {
485     XBT_INFO("  - pre-dependencies:");
486     for (auto const& it : *task->predecessors)
487       XBT_INFO("    %s", it->name);
488
489     for (auto const& it : *task->inputs)
490       XBT_INFO("    %s", it->name);
491   }
492   if ((task->outputs->size() + task->successors->size()) > 0) {
493     XBT_INFO("  - post-dependencies:");
494
495     for (auto const& it : *task->successors)
496       XBT_INFO("    %s", it->name);
497     for (auto const& it : *task->outputs)
498       XBT_INFO("    %s", it->name);
499   }
500 }
501
502 /** @brief Dumps the task in dotty formalism into the FILE* passed as second argument */
503 void SD_task_dotty(const_SD_task_t task, void* out)
504 {
505   auto* fout = static_cast<FILE*>(out);
506   fprintf(fout, "  T%p [label=\"%.20s\"", task, task->name);
507   switch (task->kind) {
508     case SD_TASK_COMM_E2E:
509     case SD_TASK_COMM_PAR_MXN_1D_BLOCK:
510       fprintf(fout, ", shape=box");
511       break;
512     case SD_TASK_COMP_SEQ:
513     case SD_TASK_COMP_PAR_AMDAHL:
514       fprintf(fout, ", shape=circle");
515       break;
516     default:
517       xbt_die("Unknown task type!");
518   }
519   fprintf(fout, "];\n");
520   for (auto const& it : *task->predecessors)
521     fprintf(fout, " T%p -> T%p;\n", it, task);
522   for (auto const& it : *task->inputs)
523     fprintf(fout, " T%p -> T%p;\n", it, task);
524 }
525
526 /**
527  * @brief Adds a dependency between two tasks
528  *
529  * @a dst will depend on @a src, ie @a dst will not start before @a src is finished.
530  * Their @ref e_SD_task_state_t "state" must be #SD_NOT_SCHEDULED, #SD_SCHEDULED or #SD_RUNNABLE.
531  *
532  * @param src the task which must be executed first
533  * @param dst the task you want to make depend on @a src
534  * @see SD_task_dependency_remove()
535  */
536 void SD_task_dependency_add(SD_task_t src, SD_task_t dst)
537 {
538   if (src == dst)
539     throw std::invalid_argument(
540         simgrid::xbt::string_printf("Cannot add a dependency between task '%s' and itself", SD_task_get_name(src)));
541
542   if (src->state == SD_DONE || src->state == SD_FAILED)
543     throw std::invalid_argument(simgrid::xbt::string_printf(
544         "Task '%s' must be SD_NOT_SCHEDULED, SD_SCHEDULABLE, SD_SCHEDULED, SD_RUNNABLE, or SD_RUNNING", src->name));
545
546   if (dst->state == SD_DONE || dst->state == SD_FAILED || dst->state == SD_RUNNING)
547     throw std::invalid_argument(simgrid::xbt::string_printf(
548         "Task '%s' must be SD_NOT_SCHEDULED, SD_SCHEDULABLE, SD_SCHEDULED, or SD_RUNNABLE", dst->name));
549
550   if (dst->inputs->find(src) != dst->inputs->end() || src->outputs->find(dst) != src->outputs->end() ||
551       src->successors->find(dst) != src->successors->end() || dst->predecessors->find(src) != dst->predecessors->end())
552     throw std::invalid_argument(simgrid::xbt::string_printf(
553         "A dependency already exists between task '%s' and task '%s'", src->name, dst->name));
554
555   XBT_DEBUG("SD_task_dependency_add: src = %s, dst = %s", src->name, dst->name);
556
557   if (src->kind == SD_TASK_COMM_E2E || src->kind == SD_TASK_COMM_PAR_MXN_1D_BLOCK) {
558     if (dst->kind == SD_TASK_COMP_SEQ || dst->kind == SD_TASK_COMP_PAR_AMDAHL)
559       dst->inputs->insert(src);
560     else
561       dst->predecessors->insert(src);
562     src->successors->insert(dst);
563   } else {
564     if (dst->kind == SD_TASK_COMM_E2E || dst->kind == SD_TASK_COMM_PAR_MXN_1D_BLOCK)
565       src->outputs->insert(dst);
566     else
567       src->successors->insert(dst);
568     dst->predecessors->insert(src);
569   }
570
571   /* if the task was runnable, the task goes back to SD_SCHEDULED because of the new dependency*/
572   if (dst->state == SD_RUNNABLE) {
573     XBT_DEBUG("SD_task_dependency_add: %s was runnable and becomes scheduled!", dst->name);
574     SD_task_set_state(dst, SD_SCHEDULED);
575   }
576 }
577
578 /**
579  * @brief Indicates whether there is a dependency between two tasks.
580  *
581  * @param src a task
582  * @param dst a task depending on @a src
583  *
584  * If src is nullptr, checks whether dst has any pre-dependency.
585  * If dst is nullptr, checks whether src has any post-dependency.
586  */
587 int SD_task_dependency_exists(const_SD_task_t src, SD_task_t dst)
588 {
589   xbt_assert(src != nullptr || dst != nullptr, "Invalid parameter: both src and dst are nullptr");
590
591   if (src) {
592     if (dst) {
593       return (src->successors->find(dst) != src->successors->end() || src->outputs->find(dst) != src->outputs->end());
594     } else {
595       return static_cast<int>(src->successors->size() + src->outputs->size());
596     }
597   } else {
598     return static_cast<int>(dst->predecessors->size() + dst->inputs->size());
599   }
600 }
601
602 /**
603  * @brief Remove a dependency between two tasks
604  *
605  * @param src a task
606  * @param dst a task depending on @a src
607  * @see SD_task_dependency_add()
608  */
609 void SD_task_dependency_remove(SD_task_t src, SD_task_t dst)
610 {
611   XBT_DEBUG("SD_task_dependency_remove: src = %s, dst = %s", SD_task_get_name(src), SD_task_get_name(dst));
612
613   if (src->successors->find(dst) == src->successors->end() && src->outputs->find(dst) == src->outputs->end())
614     throw std::invalid_argument(simgrid::xbt::string_printf(
615         "No dependency found between task '%s' and '%s': task '%s' is not a successor of task '%s'", src->name,
616         dst->name, dst->name, src->name));
617
618   if (src->kind == SD_TASK_COMM_E2E || src->kind == SD_TASK_COMM_PAR_MXN_1D_BLOCK) {
619     if (dst->kind == SD_TASK_COMP_SEQ || dst->kind == SD_TASK_COMP_PAR_AMDAHL)
620       dst->inputs->erase(src);
621     else
622       dst->predecessors->erase(src);
623     src->successors->erase(dst);
624   } else {
625     if (dst->kind == SD_TASK_COMM_E2E || dst->kind == SD_TASK_COMM_PAR_MXN_1D_BLOCK)
626       src->outputs->erase(dst);
627     else
628       src->successors->erase(dst);
629     dst->predecessors->erase(src);
630   }
631
632   /* if the task was scheduled and dependencies are satisfied, we can make it runnable */
633   if (dst->predecessors->empty() && dst->inputs->empty() && dst->state == SD_SCHEDULED)
634     SD_task_set_state(dst, SD_RUNNABLE);
635 }
636
637 /**
638  * @brief Adds a watch point to a task
639  *
640  * SD_simulate() will stop as soon as the @ref e_SD_task_state_t "state" of this task becomes the one given in argument.
641  * The watch point is then automatically removed.
642  *
643  * @param task a task
644  * @param state the @ref e_SD_task_state_t "state" you want to watch (cannot be #SD_NOT_SCHEDULED)
645  * @see SD_task_unwatch()
646  */
647 void SD_task_watch(SD_task_t task, e_SD_task_state_t state)
648 {
649   if (state & SD_NOT_SCHEDULED)
650     throw std::invalid_argument("Cannot add a watch point for state SD_NOT_SCHEDULED");
651
652   task->watch_points = task->watch_points | state;
653 }
654
655 /**
656  * @brief Removes a watch point from a task
657  *
658  * @param task a task
659  * @param state the @ref e_SD_task_state_t "state" you no longer want to watch
660  * @see SD_task_watch()
661  */
662 void SD_task_unwatch(SD_task_t task, e_SD_task_state_t state)
663 {
664   xbt_assert(state != SD_NOT_SCHEDULED, "SimDag error: Cannot have a watch point for state SD_NOT_SCHEDULED");
665   task->watch_points = task->watch_points & ~state;
666 }
667
668 /**
669  * @brief Returns an approximative estimation of the execution time of a task.
670  *
671  * The estimation is very approximative because the value returned is the time the task would take if it was executed
672  * now and if it was the only task.
673  *
674  * @param host_count number of hosts on which the task would be executed
675  * @param host_list the hosts on which the task would be executed
676  * @param flops_amount computation amount for each host(i.e., an array of host_count doubles)
677  * @param bytes_amount communication amount between each pair of hosts (i.e., a matrix of host_count*host_count doubles)
678  * @see SD_schedule()
679  */
680 double SD_task_get_execution_time(const_SD_task_t /*task*/, int host_count, const sg_host_t* host_list,
681                                   const double* flops_amount, const double* bytes_amount)
682 {
683   xbt_assert(host_count > 0, "Invalid parameter");
684   double max_time = 0.0;
685
686   /* the task execution time is the maximum execution time of the parallel tasks */
687   for (int i = 0; i < host_count; i++) {
688     double time = 0.0;
689     if (flops_amount != nullptr)
690       time = flops_amount[i] / host_list[i]->get_speed();
691
692     if (bytes_amount != nullptr)
693       for (int j = 0; j < host_count; j++)
694         if (bytes_amount[i * host_count + j] != 0)
695           time += (sg_host_get_route_latency(host_list[i], host_list[j]) +
696                    bytes_amount[i * host_count + j] / sg_host_get_route_bandwidth(host_list[i], host_list[j]));
697
698     if (time > max_time)
699       max_time = time;
700   }
701   return max_time;
702 }
703
704 static inline void SD_task_do_schedule(SD_task_t task)
705 {
706   if (SD_task_get_state(task) > SD_SCHEDULABLE)
707     throw std::invalid_argument(
708         simgrid::xbt::string_printf("Task '%s' has already been scheduled", SD_task_get_name(task)));
709
710   if (task->predecessors->empty() && task->inputs->empty())
711     SD_task_set_state(task, SD_RUNNABLE);
712   else
713     SD_task_set_state(task, SD_SCHEDULED);
714 }
715
716 /**
717  * @brief Schedules a task
718  *
719  * The task state must be #SD_NOT_SCHEDULED.
720  * Once scheduled, a task is executed as soon as possible in @see SD_simulate, i.e. when its dependencies are satisfied.
721  *
722  * @param task the task you want to schedule
723  * @param host_count number of hosts on which the task will be executed
724  * @param host_list the hosts on which the task will be executed
725  * @param flops_amount computation amount for each hosts (i.e., an array of host_count doubles)
726  * @param bytes_amount communication amount between each pair of hosts (i.e., a matrix of host_count*host_count doubles)
727  * @param rate task execution speed rate
728  * @see SD_task_unschedule()
729  */
730 void SD_task_schedule(SD_task_t task, int host_count, const sg_host_t* host_list, const double* flops_amount,
731                       const double* bytes_amount, double rate)
732 {
733   xbt_assert(host_count > 0, "host_count must be positive");
734
735   task->rate = rate;
736
737   if (flops_amount) {
738     task->flops_amount = static_cast<double*>(xbt_realloc(task->flops_amount, sizeof(double) * host_count));
739     memcpy(task->flops_amount, flops_amount, sizeof(double) * host_count);
740   } else {
741     xbt_free(task->flops_amount);
742     task->flops_amount = nullptr;
743   }
744
745   int communication_nb = host_count * host_count;
746   if (bytes_amount) {
747     task->bytes_amount = static_cast<double*>(xbt_realloc(task->bytes_amount, sizeof(double) * communication_nb));
748     memcpy(task->bytes_amount, bytes_amount, sizeof(double) * communication_nb);
749   } else {
750     xbt_free(task->bytes_amount);
751     task->bytes_amount = nullptr;
752   }
753
754   for (int i = 0; i < host_count; i++)
755     task->allocation->push_back(host_list[i]);
756
757   SD_task_do_schedule(task);
758 }
759
760 /**
761  * @brief Unschedules a task
762  *
763  * The task state must be #SD_SCHEDULED, #SD_RUNNABLE, #SD_RUNNING or #SD_FAILED.
764  * If you call this function, the task state becomes #SD_NOT_SCHEDULED.
765  * Call SD_task_schedule() to schedule it again.
766  *
767  * @param task the task you want to unschedule
768  * @see SD_task_schedule()
769  */
770 void SD_task_unschedule(SD_task_t task)
771 {
772   if (task->state == SD_NOT_SCHEDULED || task->state == SD_SCHEDULABLE)
773     throw std::invalid_argument(simgrid::xbt::string_printf(
774         "Task %s: the state must be SD_SCHEDULED, SD_RUNNABLE, SD_RUNNING or SD_FAILED", task->name));
775
776   if (task->state == SD_SCHEDULED || task->state == SD_RUNNABLE) /* if the task is scheduled or runnable */ {
777     task->allocation->clear();
778     if (task->kind == SD_TASK_COMP_PAR_AMDAHL || task->kind == SD_TASK_COMM_PAR_MXN_1D_BLOCK) {
779       /* Don't free scheduling data for typed tasks */
780       __SD_task_destroy_scheduling_data(task);
781     }
782   }
783
784   if (SD_task_get_state(task) == SD_RUNNING)
785     /* the task should become SD_FAILED */
786     task->surf_action->cancel();
787   else {
788     if (task->predecessors->empty() && task->inputs->empty())
789       SD_task_set_state(task, SD_SCHEDULABLE);
790     else
791       SD_task_set_state(task, SD_NOT_SCHEDULED);
792   }
793   task->start_time = -1.0;
794 }
795
796 /* Runs a task. */
797 void SD_task_run(SD_task_t task)
798 {
799   xbt_assert(task->state == SD_RUNNABLE, "Task '%s' is not runnable! Task state: %d", task->name, (int)task->state);
800   xbt_assert(task->allocation != nullptr, "Task '%s': host_list is nullptr!", task->name);
801
802   XBT_VERB("Executing task '%s'", task->name);
803
804   /* Beware! The scheduling data are now used by the surf action directly! no copy was done */
805   // FIXME[donassolo]: verify if all hosts belongs to the same netZone?
806   auto host_model = (*task->allocation).front()->get_netpoint()->get_englobing_zone()->get_host_model();
807   task->surf_action =
808       host_model->execute_parallel(*task->allocation, task->flops_amount, task->bytes_amount, task->rate);
809
810   task->surf_action->set_data(task);
811
812   XBT_DEBUG("surf_action = %p", task->surf_action);
813
814   SD_task_set_state(task, SD_RUNNING);
815   sd_global->return_set.insert(task);
816 }
817
818 /**
819  * @brief Returns the start time of a task
820  *
821  * The task state must be SD_RUNNING, SD_DONE or SD_FAILED.
822  *
823  * @param task: a task
824  * @return the start time of this task
825  */
826 double SD_task_get_start_time(const_SD_task_t task)
827 {
828   if (task->surf_action)
829     return task->surf_action->get_start_time();
830   else
831     return task->start_time;
832 }
833
834 /**
835  * @brief Returns the finish time of a task
836  *
837  * The task state must be SD_RUNNING, SD_DONE or SD_FAILED.
838  * If the state is not completed yet, the returned value is an estimation of the task finish time. This value can
839  * vary until the task is completed.
840  *
841  * @param task: a task
842  * @return the start time of this task
843  */
844 double SD_task_get_finish_time(const_SD_task_t task)
845 {
846   if (task->surf_action) /* should never happen as actions are destroyed right after their completion */
847     return task->surf_action->get_finish_time();
848   else
849     return task->finish_time;
850 }
851
852 void SD_task_distribute_comp_amdahl(SD_task_t task, int count)
853 {
854   xbt_assert(task->kind == SD_TASK_COMP_PAR_AMDAHL,
855              "Task %s is not a SD_TASK_COMP_PAR_AMDAHL typed task."
856              "Cannot use this function.",
857              task->name);
858   task->flops_amount = xbt_new0(double, count);
859   task->bytes_amount = xbt_new0(double, count* count);
860
861   for (int i = 0; i < count; i++) {
862     task->flops_amount[i] = (task->alpha + (1 - task->alpha) / count) * task->amount;
863   }
864 }
865
866 void SD_task_build_MxN_1D_block_matrix(SD_task_t task, int src_nb, int dst_nb)
867 {
868   xbt_assert(task->kind == SD_TASK_COMM_PAR_MXN_1D_BLOCK,
869              "Task %s is not a SD_TASK_COMM_PAR_MXN_1D_BLOCK typed task."
870              "Cannot use this function.",
871              task->name);
872   xbt_free(task->bytes_amount);
873   task->bytes_amount = xbt_new0(double, task->allocation->size() * task->allocation->size());
874
875   for (int i = 0; i < src_nb; i++) {
876     double src_start = i * task->amount / src_nb;
877     double src_end   = src_start + task->amount / src_nb;
878     for (int j = 0; j < dst_nb; j++) {
879       double dst_start = j * task->amount / dst_nb;
880       double dst_end   = dst_start + task->amount / dst_nb;
881       XBT_VERB("(%d->%d): (%.2f, %.2f)-> (%.2f, %.2f)", i, j, src_start, src_end, dst_start, dst_end);
882       task->bytes_amount[i * (src_nb + dst_nb) + src_nb + j] = 0.0;
883       if ((src_end > dst_start) && (dst_end > src_start)) { /* There is something to send */
884         task->bytes_amount[i * (src_nb + dst_nb) + src_nb + j] =
885             std::min(src_end, dst_end) - std::max(src_start, dst_start);
886         XBT_VERB("==> %.2f", task->bytes_amount[i * (src_nb + dst_nb) + src_nb + j]);
887       }
888     }
889   }
890 }
891
892 /** @brief Auto-schedules a task.
893  *
894  * Auto-scheduling mean that the task can be used with SD_task_schedulev(). This allows one to specify the task costs at
895  * creation, and decouple them from the scheduling process where you just specify which resource should deliver the
896  * mandatory power.
897  *
898  * To be auto-schedulable, a task must be a typed computation SD_TASK_COMP_SEQ or SD_TASK_COMP_PAR_AMDAHL.
899  */
900 void SD_task_schedulev(SD_task_t task, int count, const sg_host_t* list)
901 {
902   xbt_assert(task->kind == SD_TASK_COMP_SEQ || task->kind == SD_TASK_COMP_PAR_AMDAHL,
903              "Task %s is not typed. Cannot automatically schedule it.", SD_task_get_name(task));
904
905   for (int i = 0; i < count; i++)
906     task->allocation->push_back(list[i]);
907
908   XBT_VERB("Schedule computation task %s on %zu host(s)", task->name, task->allocation->size());
909
910   if (task->kind == SD_TASK_COMP_SEQ) {
911     if (not task->flops_amount) { /*This task has failed and is rescheduled. Reset the flops_amount*/
912       task->flops_amount    = xbt_new0(double, 1);
913       task->flops_amount[0] = task->amount;
914     }
915     XBT_VERB("It costs %.f flops", task->flops_amount[0]);
916   }
917
918   if (task->kind == SD_TASK_COMP_PAR_AMDAHL) {
919     SD_task_distribute_comp_amdahl(task, count);
920     XBT_VERB("%.f flops will be distributed following Amdahl's Law", task->flops_amount[0]);
921   }
922
923   SD_task_do_schedule(task);
924
925   /* Iterate over all inputs and outputs to say where I am located (and start them if runnable) */
926   for (auto const& input : *task->inputs) {
927     int src_nb = static_cast<int>(input->allocation->size());
928     int dst_nb = count;
929     if (input->allocation->empty())
930       XBT_VERB("Sender side of '%s' not scheduled. Set receiver side to '%s''s allocation", input->name, task->name);
931
932     for (int i = 0; i < count; i++)
933       input->allocation->push_back(task->allocation->at(i));
934
935     if (input->allocation->size() > task->allocation->size()) {
936       if (task->kind == SD_TASK_COMP_PAR_AMDAHL)
937         SD_task_build_MxN_1D_block_matrix(input, src_nb, dst_nb);
938
939       SD_task_do_schedule(input);
940       XBT_VERB("Auto-Schedule Communication task '%s'. Send %.f bytes from %d hosts to %d hosts.", input->name,
941                input->amount, src_nb, dst_nb);
942     }
943   }
944
945   for (auto const& output : *task->outputs) {
946     int src_nb = count;
947     int dst_nb = static_cast<int>(output->allocation->size());
948     if (output->allocation->empty())
949       XBT_VERB("Receiver side of '%s' not scheduled. Set sender side to '%s''s allocation", output->name, task->name);
950
951     for (int i = 0; i < count; i++)
952       output->allocation->insert(output->allocation->begin() + i, task->allocation->at(i));
953
954     if (output->allocation->size() > task->allocation->size()) {
955       if (task->kind == SD_TASK_COMP_PAR_AMDAHL)
956         SD_task_build_MxN_1D_block_matrix(output, src_nb, dst_nb);
957
958       SD_task_do_schedule(output);
959       XBT_VERB("Auto-Schedule Communication task %s. Send %.f bytes from %d hosts to %d hosts.", output->name,
960                output->amount, src_nb, dst_nb);
961     }
962   }
963 }
964
965 /** @brief autoschedule a task on a list of hosts
966  *
967  * This function is similar to SD_task_schedulev(), but takes the list of hosts to schedule onto as separate parameters.
968  * It builds a proper vector of hosts and then call SD_task_schedulev()
969  */
970 void SD_task_schedulel(SD_task_t task, int count, ...)
971 {
972   va_list ap;
973   std::vector<sg_host_t> list(count);
974   va_start(ap, count);
975   for (int i = 0; i < count; i++)
976     list[i] = va_arg(ap, sg_host_t);
977
978   va_end(ap);
979   SD_task_schedulev(task, count, list.data());
980 }