Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
bafe5f86d8c8a0c2ebe43cb7fd16f3ddfd69cbf3
[simgrid.git] / src / simdag / sd_daxloader.cpp
1 /* Copyright (c) 2009-2016. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "src/simdag/simdag_private.h"
8 #include "simgrid/simdag.h"
9 #include "xbt/misc.h"
10 #include "xbt/log.h"
11 #include "xbt/file.h" /* xbt_basename() */
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_daxparse, sd, "Parsing DAX files");
14
15 extern "C" {
16         #undef CLEANUP
17         #include "dax_dtd.h"
18         #include "dax_dtd.c"
19 }
20
21 bool children_are_marked(SD_task_t task);
22 bool parents_are_marked(SD_task_t task);
23
24 /* Parsing helpers */
25
26 static double dax_parse_double(const char *string)
27 {
28   int ret = 0;
29   double value;
30
31   ret = sscanf(string, "%lg", &value);
32   xbt_assert (ret == 1, "Parse error on line %d: %s is not a double",
33               dax_lineno, string);
34   return value;
35 }
36
37 /* Ensure that transfer tasks have unique names even though a file is used
38  * several times */
39
40 void uniq_transfer_task_name(SD_task_t task)
41 {
42   SD_task_t child, parent;
43   xbt_dynar_t children, parents;
44   char *new_name;
45
46   children = SD_task_get_children(task);
47   parents = SD_task_get_parents(task);
48
49   xbt_dynar_get_cpy(children, 0, &child);
50   xbt_dynar_get_cpy(parents, 0, &parent);
51
52   new_name = bprintf("%s_%s_%s",
53                      SD_task_get_name(parent),
54                      SD_task_get_name(task), SD_task_get_name(child));
55
56   SD_task_set_name(task, new_name);
57
58   xbt_dynar_free_container(&children);
59   xbt_dynar_free_container(&parents);
60   free(new_name);
61 }
62
63 bool children_are_marked(SD_task_t task){
64   SD_task_t child_task = NULL;
65   bool all_marked = true;
66   SD_dependency_t depafter = NULL;
67   unsigned int count;
68   xbt_dynar_foreach(task->tasks_after,count,depafter){
69     child_task = depafter->dst;
70     //test marked
71     if(child_task->marked == 0) {
72       all_marked = false;
73       break;
74     }
75     child_task = NULL;
76   }
77   return all_marked;
78 }
79
80 bool parents_are_marked(SD_task_t task){
81   SD_task_t parent_task = NULL;
82   bool all_marked = true;
83   SD_dependency_t depbefore = NULL;
84   unsigned int count;
85   xbt_dynar_foreach(task->tasks_before,count,depbefore){
86     parent_task = depbefore->src;
87     //test marked
88     if(parent_task->marked == 0) {
89       all_marked = false;
90       break;
91     }
92     parent_task = NULL;
93   }
94   return all_marked;
95 }
96
97 bool acyclic_graph_detail(xbt_dynar_t dag){
98   unsigned int count=0, count_current=0;
99   bool all_marked = true;
100   SD_task_t task = NULL, parent_task = NULL, child_task = NULL;
101   SD_dependency_t depbefore = NULL, depafter = NULL;
102   xbt_dynar_t next = NULL, current = xbt_dynar_new(sizeof(SD_task_t),NULL);
103
104   xbt_dynar_foreach(dag,count,task){
105     if(task->kind == SD_TASK_COMM_E2E) continue;
106     task->marked = 0;
107     if(xbt_dynar_is_empty(task->tasks_after)){
108       xbt_dynar_push(current, &task);
109     }
110   }
111   task = NULL;
112   count = 0;
113   //test if something has to be done for the next iteration
114   while(!xbt_dynar_is_empty(current)){
115     next = xbt_dynar_new(sizeof(SD_task_t),NULL);
116     //test if the current iteration is done
117     count_current=0;
118     xbt_dynar_foreach(current,count_current,task){
119       if (task == NULL) continue;
120       count = 0;
121       //push task in next
122       task->marked = 1;
123       count = 0;
124       xbt_dynar_foreach(task->tasks_before,count,depbefore){
125         parent_task = depbefore->src;
126         if(parent_task->kind == SD_TASK_COMM_E2E){
127           unsigned int j=0;
128           parent_task->marked = 1;
129           SD_task_t parent_task_2 = NULL;
130           xbt_dynar_foreach(parent_task->tasks_before,j,depbefore){
131             parent_task_2 = depbefore->src;
132             if(children_are_marked(parent_task_2))
133               xbt_dynar_push(next, &parent_task_2);
134           }
135         } else{
136           if(children_are_marked(parent_task))
137             xbt_dynar_push(next, &parent_task);
138         }
139         parent_task = NULL;
140       }
141       task = NULL;
142       count = 0;
143     }
144     xbt_dynar_free(&current);
145     current = next;
146     next = NULL;
147   }
148   xbt_dynar_free(&current);
149   current = NULL;
150   all_marked = true;
151   xbt_dynar_foreach(dag,count,task){
152     if(task->kind == SD_TASK_COMM_E2E) continue;
153     //test if all tasks are marked
154     if(task->marked == 0){
155       XBT_WARN("the task %s is not marked",task->name);
156       all_marked = false;
157       break;
158     }
159   }
160   task = NULL;
161   if(!all_marked){
162     XBT_VERB("there is at least one cycle in your task graph");
163
164     current = xbt_dynar_new(sizeof(SD_task_t),NULL);
165     xbt_dynar_foreach(dag,count,task){
166       if(task->kind == SD_TASK_COMM_E2E) continue;
167       if(xbt_dynar_is_empty(task->tasks_before)){
168         xbt_dynar_push(current, &task);
169       }
170     }
171
172     count = 0;
173     task = NULL;
174     xbt_dynar_foreach(dag,count,task){
175       if(task->kind == SD_TASK_COMM_E2E) continue;
176       if(xbt_dynar_is_empty(task->tasks_before)){
177         task->marked = 1;
178         xbt_dynar_push(current, &task);
179       }
180     }
181     task = NULL;
182     count = 0;
183     //test if something has to be done for the next iteration
184     while(!xbt_dynar_is_empty(current)){
185       next = xbt_dynar_new(sizeof(SD_task_t),NULL);
186       //test if the current iteration is done
187       count_current=0;
188       xbt_dynar_foreach(current,count_current,task){
189         if (task == NULL) continue;
190         count = 0;
191         //push task in next
192         task->marked = 1;
193         count = 0;
194         xbt_dynar_foreach(task->tasks_after,count,depafter){
195           child_task = depbefore->dst;
196           if(child_task->kind == SD_TASK_COMM_E2E){
197             unsigned int j=0;
198             child_task->marked = 1;
199             SD_task_t child_task_2 = NULL;
200             xbt_dynar_foreach(child_task->tasks_after,j,depafter){
201               child_task_2 = depbefore->dst;
202               if(parents_are_marked(child_task_2))
203                 xbt_dynar_push(next, &child_task_2);
204             }
205           } else{
206             if(parents_are_marked(child_task))
207               xbt_dynar_push(next, &child_task);
208           }
209           child_task = NULL;
210         }
211         task = NULL;
212         count = 0;
213       }
214       xbt_dynar_free(&current);
215       current = next;
216       next = NULL;
217     }
218     xbt_dynar_free(&current);
219     current = NULL;
220     all_marked = true;
221     xbt_dynar_foreach(dag,count,task){
222       if(task->kind == SD_TASK_COMM_E2E) continue;
223       //test if all tasks are marked
224       if(task->marked == 0){
225         XBT_WARN("the task %s is in a cycle",task->name);
226         all_marked = false;
227       }
228     }
229   }
230   return all_marked;
231 }
232
233
234
235 static YY_BUFFER_STATE input_buffer;
236
237 static xbt_dynar_t result;
238 static xbt_dict_t jobs;
239 static xbt_dict_t files;
240 static SD_task_t current_job;
241 static SD_task_t root_task, end_task;
242
243 static void dax_task_free(void *task)
244 {
245   SD_task_destroy((SD_task_t)task);
246 }
247
248 /** @brief loads a DAX file describing a DAG
249  * 
250  * See https://confluence.pegasus.isi.edu/display/pegasus/WorkflowGenerator
251  * for more details.
252  */
253 xbt_dynar_t SD_daxload(const char *filename)
254 {
255   xbt_dict_cursor_t cursor;
256   SD_task_t file;
257   char *name;
258   FILE *in_file = fopen(filename, "r");
259   xbt_assert(in_file, "Unable to open \"%s\"\n", filename);
260   input_buffer = dax__create_buffer(in_file, 10);
261   dax__switch_to_buffer(input_buffer);
262   dax_lineno = 1;
263
264   result = xbt_dynar_new(sizeof(SD_task_t), dax_task_free);
265   files = xbt_dict_new_homogeneous(&dax_task_free);
266   jobs = xbt_dict_new_homogeneous(NULL);
267   root_task = SD_task_create_comp_seq("root", NULL, 0);
268   /* by design the root task is always SCHEDULABLE */
269   SD_task_set_state(root_task, SD_SCHEDULABLE);
270
271   xbt_dynar_push(result, &root_task);
272   end_task = SD_task_create_comp_seq("end", NULL, 0);
273
274   int res = dax_lex();
275   if (res != 0)
276     xbt_die("Parse error in %s: %s", filename, dax__parse_err_msg());
277   dax__delete_buffer(input_buffer);
278   fclose(in_file);
279   dax_lex_destroy();
280   xbt_dict_free(&jobs);
281
282   /* And now, post-process the files.
283    * We want a file task per pair of computation tasks exchanging the file. Duplicate on need
284    * Files not produced in the system are said to be produced by root task (top of DAG).
285    * Files not consumed in the system are said to be consumed by end task (bottom of DAG).
286    */
287
288   xbt_dict_foreach(files, cursor, name, file) {
289     unsigned int cpt1, cpt2;
290     SD_task_t newfile;
291     SD_dependency_t depbefore, depafter;
292     if (xbt_dynar_is_empty(file->tasks_before)) {
293       xbt_dynar_foreach(file->tasks_after, cpt2, depafter) {
294         newfile = SD_task_create_comm_e2e(file->name, NULL, file->amount);
295         SD_task_dependency_add(NULL, NULL, root_task, newfile);
296         SD_task_dependency_add(NULL, NULL, newfile, depafter->dst);
297         xbt_dynar_push(result, &newfile);
298       }
299     } else if (xbt_dynar_is_empty(file->tasks_after)) {
300       xbt_dynar_foreach(file->tasks_before, cpt2, depbefore) {
301         newfile = SD_task_create_comm_e2e(file->name, NULL, file->amount);
302         SD_task_dependency_add(NULL, NULL, depbefore->src, newfile);
303         SD_task_dependency_add(NULL, NULL, newfile, end_task);
304         xbt_dynar_push(result, &newfile);
305       }
306     } else {
307       xbt_dynar_foreach(file->tasks_before, cpt1, depbefore) {
308         xbt_dynar_foreach(file->tasks_after, cpt2, depafter) {
309           if (depbefore->src == depafter->dst) {
310             XBT_WARN
311                 ("File %s is produced and consumed by task %s. This loop dependency will prevent the execution of the task.",
312                  file->name, depbefore->src->name);
313           }
314           newfile = SD_task_create_comm_e2e(file->name, NULL, file->amount);
315           SD_task_dependency_add(NULL, NULL, depbefore->src, newfile);
316           SD_task_dependency_add(NULL, NULL, newfile, depafter->dst);
317           xbt_dynar_push(result, &newfile);
318         }
319       }
320     }
321   }
322
323   /* Push end task last */
324   xbt_dynar_push(result, &end_task);
325
326   /* Free previous copy of the files */
327   xbt_dict_free(&files);
328   unsigned int cpt;
329   xbt_dynar_foreach(result, cpt, file) {
330     if (SD_task_get_kind(file) == SD_TASK_COMM_E2E) {
331       uniq_transfer_task_name(file);
332     } else if (SD_task_get_kind(file) == SD_TASK_COMP_SEQ){
333       /* If some tasks do not take files as input, connect them to the root, if
334        * they don't produce files, connect them to the end node.
335        */
336       if ((file != root_task) && xbt_dynar_is_empty(file->tasks_before)) {
337         SD_task_dependency_add(NULL, NULL, root_task, file);
338       }
339       if ((file != end_task) && xbt_dynar_is_empty(file->tasks_after)) {
340         SD_task_dependency_add(NULL, NULL, file, end_task);
341       }
342     }
343   }
344
345   if (!acyclic_graph_detail(result)){
346     XBT_ERROR("The DAX described in %s is not a DAG. It contains a cycle.", 
347               xbt_basename(filename));
348     xbt_dynar_foreach(result, cpt, file)
349       SD_task_destroy(file);
350      xbt_dynar_free_container(&result);
351     return NULL;
352   } else {
353     return result;
354   }
355 }
356
357 void STag_dax__adag(void)
358 {
359   XBT_ATTRIB_UNUSED double version;
360   version = dax_parse_double(A_dax__adag_version);
361
362   xbt_assert(version == 2.1,
363               "Expected version 2.1 in <adag> tag, got %f. Fix the parser or your file",
364               version);
365 }
366
367 void STag_dax__job(void)
368 {
369   double runtime = dax_parse_double(A_dax__job_runtime);
370   char *name = bprintf("%s@%s", A_dax__job_id, A_dax__job_name);
371   runtime *= 4200000000.;       /* Assume that timings were done on a 4.2GFlops machine. I mean, why not? */
372 //  XBT_INFO("See <job id=%s runtime=%s %.0f>",A_dax__job_id,A_dax__job_runtime,runtime);
373   current_job = SD_task_create_comp_seq(name, NULL, runtime);
374   xbt_dict_set(jobs, A_dax__job_id, current_job, NULL);
375   free(name);
376   xbt_dynar_push(result, &current_job);
377 }
378
379 void STag_dax__uses(void)
380 {
381   SD_task_t file;
382   double size = dax_parse_double(A_dax__uses_size);
383   int is_input = (A_dax__uses_link == A_dax__uses_link_input);
384
385 //  XBT_INFO("See <uses file=%s %s>",A_dax__uses_file,(is_input?"in":"out"));
386   file = (SD_task_t)xbt_dict_get_or_null(files, A_dax__uses_file);
387   if (file == NULL) {
388     file = SD_task_create_comm_e2e(A_dax__uses_file, NULL, size);
389     xbt_dict_set(files, A_dax__uses_file, file, NULL);
390   } else {
391     if (SD_task_get_amount(file) != size) {
392       XBT_WARN("Ignoring file %s size redefinition from %.0f to %.0f",
393             A_dax__uses_file, SD_task_get_amount(file), size);
394     }
395   }
396   if (is_input) {
397     SD_task_dependency_add(NULL, NULL, file, current_job);
398   } else {
399     SD_task_dependency_add(NULL, NULL, current_job, file);
400     if (xbt_dynar_length(file->tasks_before) > 1) {
401       XBT_WARN("File %s created at more than one location...", file->name);
402     }
403   }
404 }
405
406 static SD_task_t current_child;
407 void STag_dax__child(void)
408 {
409   current_child = (SD_task_t)xbt_dict_get_or_null(jobs, A_dax__child_ref);
410   xbt_assert(current_child != NULL,"Parse error on line %d:"
411              "Asked to add dependencies to the non-existent %s task",
412              dax_lineno, A_dax__child_ref);
413 }
414
415 void ETag_dax__child(void)
416 {
417   current_child = NULL;
418 }
419
420 void STag_dax__parent(void)
421 {
422   SD_task_t parent = (SD_task_t)xbt_dict_get_or_null(jobs, A_dax__parent_ref);
423   xbt_assert(parent != NULL, "Parse error on line %d: "
424              "Asked to add a dependency from %s to %s, but %s does not exist",
425              dax_lineno, current_child->name, A_dax__parent_ref,
426              A_dax__parent_ref);
427   SD_task_dependency_add(NULL, NULL, parent, current_child);
428   XBT_DEBUG("Control-flow dependency from %s to %s", current_child->name,
429          parent->name);
430 }
431
432 void ETag_dax__adag(void)
433 {
434 //  XBT_INFO("See </adag>");
435 }
436
437 void ETag_dax__job(void)
438 {
439   current_job = NULL;
440 //  XBT_INFO("See </job>");
441 }
442
443 void ETag_dax__parent(void)
444 {
445 //  XBT_INFO("See </parent>");
446 }
447
448 void ETag_dax__uses(void)
449 {
450 //  XBT_INFO("See </uses>");
451 }