Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
convert the last two simdag examples. Simdag can die
[simgrid.git] / examples / cpp / dag-scheduling / s4u-dag-scheduling.cpp
1 /* Copyright (c) 2009-2021. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /* simple test to schedule a DAX file with the Min-Min algorithm.           */
8 #include "simgrid/s4u.hpp"
9 #include <math.h>
10 #include <string.h>
11
12 #if SIMGRID_HAVE_JEDULE
13 #include "simgrid/jedule/jedule_sd_binding.h"
14 #endif
15
16 XBT_LOG_NEW_DEFAULT_CATEGORY(dag_scheduling, "Logging specific to this example");
17
18 typedef struct _HostAttribute* HostAttribute;
19 struct _HostAttribute {
20   /* Earliest time at which a host is ready to execute a task */
21   double available_at;
22   simgrid::s4u::Exec* last_scheduled_task;
23 };
24
25 static double sg_host_get_available_at(const simgrid::s4u::Host* host)
26 {
27   return static_cast<HostAttribute>(host->get_data())->available_at;
28 }
29
30 static void sg_host_set_available_at(simgrid::s4u::Host* host, double time)
31 {
32   auto* attr         = static_cast<HostAttribute>(host->get_data());
33   attr->available_at = time;
34   host->set_data(attr);
35 }
36
37 static simgrid::s4u::Exec* sg_host_get_last_scheduled_task(const simgrid::s4u::Host* host)
38 {
39   return static_cast<HostAttribute>(host->get_data())->last_scheduled_task;
40 }
41
42 static void sg_host_set_last_scheduled_task(simgrid::s4u::Host* host, simgrid::s4u::ExecPtr task)
43 {
44   auto* attr                = static_cast<HostAttribute>(host->get_data());
45   attr->last_scheduled_task = task.get();
46   host->set_data(attr);
47 }
48
49 static bool dependency_exists(simgrid::s4u::Exec* src, simgrid::s4u::Exec* dst)
50 {
51   const auto& dependencies = src->get_dependencies();
52   const auto& successors   = src->get_successors();
53   return (std::find(successors.begin(), successors.end(), dst) != successors.end() ||
54           dependencies.find(dst) != dependencies.end());
55 }
56
57 static std::vector<simgrid::s4u::Exec*> get_ready_tasks(const std::vector<simgrid::s4u::ActivityPtr> dax)
58 {
59   std::vector<simgrid::s4u::Exec*> ready_tasks;
60   std::map<simgrid::s4u::Exec*, unsigned int> candidate_execs;
61
62   for (auto& a : dax) {
63     // Only loot at activity that have their dependencies solved but are not assigned
64     if (a->dependencies_solved() && not a->is_assigned()) {
65       // if it is an exec, it's ready
66       auto* exec = dynamic_cast<simgrid::s4u::Exec*>(a.get());
67       if (exec != nullptr)
68         ready_tasks.push_back(exec);
69       // if it a comm, we consider its successor as a candidate. If a candidate solves all its dependencies,
70       // i.e., get all its input data, it's ready
71       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(a.get());
72       if (comm != nullptr) {
73         auto* next_exec = static_cast<simgrid::s4u::Exec*>(comm->get_successors().front().get());
74         candidate_execs[next_exec]++;
75         if (next_exec->get_dependencies().size() == candidate_execs[next_exec])
76           ready_tasks.push_back(next_exec);
77       }
78     }
79   }
80   XBT_DEBUG("There are %zu ready tasks", ready_tasks.size());
81   return ready_tasks;
82 }
83
84 static double finish_on_at(const simgrid::s4u::ExecPtr task, const simgrid::s4u::Host* host)
85 {
86   double result;
87
88   const auto& parents = task->get_dependencies();
89
90   if (not parents.empty()) {
91     double data_available = 0.;
92     double last_data_available;
93     /* compute last_data_available */
94     last_data_available = -1.0;
95     for (const auto& parent : parents) {
96       /* normal case */
97       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(parent.get());
98       if (comm != nullptr) {
99         auto source = comm->get_source();
100         XBT_DEBUG("transfer from %s to %s", source->get_cname(), host->get_cname());
101         /* Estimate the redistribution time from this parent */
102         double redist_time;
103         if (comm->get_remaining() <= 1e-6) {
104           redist_time = 0;
105         } else {
106           redist_time = sg_host_get_route_latency(source, host) +
107                         comm->get_remaining() / sg_host_get_route_bandwidth(source, host);
108         }
109         // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
110         // time
111         data_available = *(static_cast<double*>(comm->get_data())) + redist_time;
112       }
113
114       auto* exec = dynamic_cast<simgrid::s4u::Exec*>(parent.get());
115       /* no transfer, control dependency */
116       if (exec != nullptr) {
117         data_available = exec->get_finish_time();
118       }
119
120       if (last_data_available < data_available)
121         last_data_available = data_available;
122     }
123
124     result = fmax(sg_host_get_available_at(host), last_data_available) + task->get_remaining() / host->get_speed();
125   } else
126     result = sg_host_get_available_at(host) + task->get_remaining() / host->get_speed();
127
128   return result;
129 }
130
131 static simgrid::s4u::Host* get_best_host(const simgrid::s4u::ExecPtr exec)
132 {
133   std::vector<simgrid::s4u::Host*> hosts = simgrid::s4u::Engine::get_instance()->get_all_hosts();
134   auto best_host                         = hosts.front();
135   double min_EFT                         = finish_on_at(exec, best_host);
136
137   for (const auto& h : hosts) {
138     double EFT = finish_on_at(exec, h);
139     XBT_DEBUG("%s finishes on %s at %f", exec->get_cname(), h->get_cname(), EFT);
140
141     if (EFT < min_EFT) {
142       min_EFT   = EFT;
143       best_host = h;
144     }
145   }
146   return best_host;
147 }
148
149 int main(int argc, char** argv)
150 {
151   double min_finish_time            = -1.0;
152   simgrid::s4u::Exec* selected_task = nullptr;
153   simgrid::s4u::Host* selected_host = nullptr;
154   char* tracefilename               = nullptr;
155
156   simgrid::s4u::Engine e(&argc, argv);
157   std::set<simgrid::s4u::Activity*> vetoed;
158   e.track_vetoed_activities(&vetoed);
159
160   simgrid::s4u::Activity::on_completion.connect([](simgrid::s4u::Activity& activity) {
161     // when an Exec completes, we need to set the potential start time of all its ouput comms
162     const auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
163     if (exec == nullptr) // Only Execs are concerned here
164       return;
165     for (const auto& succ : exec->get_successors()) {
166       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(succ.get());
167       if (comm != nullptr) {
168         double* finish_time = new double(exec->get_finish_time());
169         // We use the user data field to store the finish time of the predecessor of the comm, i.e., its potential start
170         // time
171         comm->set_data(finish_time);
172       }
173     }
174   });
175   /* Check our arguments */
176   xbt_assert(argc > 2,
177              "Usage: %s platform_file dax_file [jedule_file]\n"
178              "\tExample: %s simulacrum_7_hosts.xml Montage_25.xml Montage_25.jed",
179              argv[0], argv[0]);
180
181   if (argc == 4)
182     tracefilename = xbt_strdup(argv[3]);
183
184   e.load_platform(argv[1]);
185
186   /*  Allocating the host attribute */
187   unsigned int total_nhosts = e.get_host_count();
188   const auto hosts          = e.get_all_hosts();
189
190   for (unsigned int i = 0; i < total_nhosts; i++)
191     hosts[i]->set_data(xbt_new0(struct _HostAttribute, 1));
192
193   /* load the DAX file */
194   std::vector<simgrid::s4u::ActivityPtr> dax = simgrid::s4u::create_DAG_from_DAX(argv[2]);
195
196   /* Schedule the root first */
197   auto* root = static_cast<simgrid::s4u::Exec*>(dax.front().get());
198   auto host  = get_best_host(root);
199   root->set_host(host);
200   // we can also set the source of all the output comms of the root node
201   for (const auto& succ : root->get_successors()) {
202     auto* comm = dynamic_cast<simgrid::s4u::Comm*>(succ.get());
203     if (comm != nullptr)
204       comm->set_source(host);
205   }
206
207   e.run();
208
209   while (not vetoed.empty()) {
210     XBT_DEBUG("Start new scheduling round");
211     /* Get the set of ready tasks */
212     auto ready_tasks = get_ready_tasks(dax);
213     vetoed.clear();
214
215     if (ready_tasks.empty()) {
216       ready_tasks.clear();
217       /* there is no ready task, let advance the simulation */
218       e.run();
219       continue;
220     }
221     /* For each ready task:
222      * get the host that minimizes the completion time.
223      * select the task that has the minimum completion time on its best host.
224      */
225     for (auto task : ready_tasks) {
226       XBT_DEBUG("%s is ready", task->get_cname());
227       host               = get_best_host(task);
228       double finish_time = finish_on_at(task, host);
229       if (min_finish_time < 0 || finish_time < min_finish_time) {
230         min_finish_time = finish_time;
231         selected_task   = task;
232         selected_host   = host;
233       }
234     }
235
236     XBT_INFO("Schedule %s on %s", selected_task->get_cname(), selected_host->get_cname());
237     selected_task->set_host(selected_host);
238     // we can also set the destination of all the input comms of the selected task
239     for (const auto& pred : selected_task->get_dependencies()) {
240       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(pred.get());
241       if (comm != nullptr) {
242         comm->set_destination(selected_host);
243         delete static_cast<double*>(comm->get_data());
244       }
245     }
246     // we can also set the source of all the output comms of the selected task
247     for (const auto& succ : selected_task->get_successors()) {
248       auto* comm = dynamic_cast<simgrid::s4u::Comm*>(succ.get());
249       if (comm != nullptr)
250         comm->set_source(selected_host);
251     }
252
253     /*
254      * tasks can be executed concurrently when they can by default.
255      * Yet schedulers take decisions assuming that tasks wait for resource availability to start.
256      * The solution (well crude hack is to keep track of the last task scheduled on a host and add a special type of
257      * dependency if needed to force the sequential execution meant by the scheduler.
258      * If the last scheduled task is already done, has failed or is a predecessor of the current task, no need for a
259      * new dependency
260      */
261
262     auto last_scheduled_task = sg_host_get_last_scheduled_task(selected_host);
263     if (last_scheduled_task && (last_scheduled_task->get_state() != simgrid::s4u::Activity::State::FINISHED) &&
264         (last_scheduled_task->get_state() != simgrid::s4u::Activity::State::FAILED) &&
265         not dependency_exists(sg_host_get_last_scheduled_task(selected_host), selected_task))
266       last_scheduled_task->add_successor(selected_task);
267
268     sg_host_set_last_scheduled_task(selected_host, selected_task);
269     sg_host_set_available_at(selected_host, min_finish_time);
270
271     ready_tasks.clear();
272     /* reset the min_finish_time for the next set of ready tasks */
273     min_finish_time = -1.;
274     e.run();
275   }
276
277   XBT_INFO("Simulation Time: %f", simgrid_get_clock());
278   XBT_INFO("------------------- Produce the trace file---------------------------");
279   XBT_INFO("Producing a jedule output (if active) of the run into %s",
280            tracefilename ? tracefilename : "minmin_test.jed");
281 #if SIMGRID_HAVE_JEDULE
282   jedule_sd_dump(tracefilename);
283 #endif
284   free(tracefilename);
285
286   for (auto h : hosts) {
287     xbt_free(h->get_data());
288     h->set_data(nullptr);
289   }
290
291   return 0;
292 }