Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix missing proto for PID functions
[simgrid.git] / src / simdag / sd_daxloader.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "simdag/simdag.h"
9 #include "xbt/misc.h"
10 #include "xbt/log.h"
11 #include <libgen.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_daxparse, sd, "Parsing DAX files");
14
15 #undef CLEANUP
16 #include "dax_dtd.h"
17 #include "dax_dtd.c"
18
19 bool children_are_marked(SD_task_t task);
20 bool parents_are_marked(SD_task_t task);
21
22 /* Parsing helpers */
23 static void dax_parse_error(char *msg)
24 {
25   fprintf(stderr, "Parse error on line %d: %s\n", dax_lineno, msg);
26   xbt_abort();
27 }
28
29 static double dax_parse_double(const char *string)
30 {
31   int ret = 0;
32   double value;
33
34   ret = sscanf(string, "%lg", &value);
35   if (ret != 1)
36     dax_parse_error(bprintf("%s is not a double", string));
37   return value;
38 }
39
40 static int dax_parse_int(const char *string)
41 {
42   int ret = 0;
43   int value;
44
45   ret = sscanf(string, "%d", &value);
46   if (ret != 1)
47     dax_parse_error(bprintf("%s is not an integer", string));
48   return value;
49 }
50
51 /* Ensure that transfer tasks have unique names even though a file is used
52  * several times */
53
54 void uniq_transfer_task_name(SD_task_t task)
55 {
56   SD_task_t child, parent;
57   xbt_dynar_t children, parents;
58   char *new_name;
59
60   children = SD_task_get_children(task);
61   parents = SD_task_get_parents(task);
62
63   xbt_dynar_get_cpy(children, 0, &child);
64   xbt_dynar_get_cpy(parents, 0, &parent);
65
66   new_name = bprintf("%s_%s_%s",
67                      SD_task_get_name(parent),
68                      SD_task_get_name(task), SD_task_get_name(child));
69
70   SD_task_set_name(task, new_name);
71
72   xbt_dynar_free_container(&children);
73   xbt_dynar_free_container(&parents);
74   free(new_name);
75 }
76
77 bool children_are_marked(SD_task_t task){
78   SD_task_t child_task = NULL;
79   bool all_marked = true;
80   SD_dependency_t depafter = NULL;
81   unsigned int count;
82   xbt_dynar_foreach(task->tasks_after,count,depafter){
83     child_task = depafter->dst;
84     //test marked
85     if(child_task->marked == 0) {
86       all_marked = false;
87       break;
88     }
89     child_task = NULL;
90   }
91   return all_marked;
92 }
93
94 bool parents_are_marked(SD_task_t task){
95   SD_task_t parent_task = NULL;
96   bool all_marked = true;
97   SD_dependency_t depbefore = NULL;
98   unsigned int count;
99   xbt_dynar_foreach(task->tasks_before,count,depbefore){
100     parent_task = depbefore->src;
101     //test marked
102     if(parent_task->marked == 0) {
103       all_marked = false;
104       break;
105     }
106     parent_task = NULL;
107   }
108   return all_marked;
109 }
110
111 bool acyclic_graph_detail(xbt_dynar_t dag){
112   unsigned int count=0, count_current=0;
113   bool all_marked = true;
114   SD_task_t task = NULL, parent_task = NULL, child_task = NULL;
115   SD_dependency_t depbefore = NULL, depafter = NULL;
116   xbt_dynar_t next = NULL, current = xbt_dynar_new(sizeof(SD_task_t),NULL);
117
118   xbt_dynar_foreach(dag,count,task){
119     if(task->kind == SD_TASK_COMM_E2E) continue;
120     task->marked = 0;
121     if(xbt_dynar_is_empty(task->tasks_after)){
122       xbt_dynar_push(current, &task);
123     }
124   }
125   task = NULL;
126   count = 0;
127   //test if something has to be done for the next iteration
128   while(!xbt_dynar_is_empty(current)){
129     next = xbt_dynar_new(sizeof(SD_task_t),NULL);
130     //test if the current iteration is done
131     count_current=0;
132     xbt_dynar_foreach(current,count_current,task){
133       if (task == NULL) continue;
134       count = 0;
135       //push task in next
136       task->marked = 1;
137       count = 0;
138       xbt_dynar_foreach(task->tasks_before,count,depbefore){
139         parent_task = depbefore->src;
140         if(parent_task->kind == SD_TASK_COMM_E2E){
141           unsigned int j=0;
142           parent_task->marked = 1;
143           SD_task_t parent_task_2 = NULL;
144           xbt_dynar_foreach(parent_task->tasks_before,j,depbefore){
145             parent_task_2 = depbefore->src;
146             if(children_are_marked(parent_task_2))
147               xbt_dynar_push(next, &parent_task_2);
148           }
149         } else{
150           if(children_are_marked(parent_task))
151             xbt_dynar_push(next, &parent_task);
152         }
153         parent_task = NULL;
154       }
155       task = NULL;
156       count = 0;
157     }
158     xbt_dynar_free(&current);
159     current = next;
160     next = NULL;
161   }
162   xbt_dynar_free(&current);
163   current = NULL;
164   all_marked = true;
165   xbt_dynar_foreach(dag,count,task){
166     if(task->kind == SD_TASK_COMM_E2E) continue;
167     //test if all tasks are marked
168     if(task->marked == 0){
169       XBT_WARN("the task %s is not marked",task->name);
170       all_marked = false;
171       break;
172     }
173   }
174   task = NULL;
175   if(!all_marked){
176     XBT_VERB("there is at least one cycle in your task graph");
177
178     current = xbt_dynar_new(sizeof(SD_task_t),NULL);
179     xbt_dynar_foreach(dag,count,task){
180       if(task->kind == SD_TASK_COMM_E2E) continue;
181       if(xbt_dynar_is_empty(task->tasks_before)){
182         xbt_dynar_push(current, &task);
183       }
184     }
185
186     count = 0;
187     task = NULL;
188     xbt_dynar_foreach(dag,count,task){
189       if(task->kind == SD_TASK_COMM_E2E) continue;
190       if(xbt_dynar_is_empty(task->tasks_before)){
191         task->marked = 1;
192         xbt_dynar_push(current, &task);
193       }
194     }
195     task = NULL;
196     count = 0;
197     //test if something has to be done for the next iteration
198     while(!xbt_dynar_is_empty(current)){
199       next = xbt_dynar_new(sizeof(SD_task_t),NULL);
200       //test if the current iteration is done
201       count_current=0;
202       xbt_dynar_foreach(current,count_current,task){
203         if (task == NULL) continue;
204         count = 0;
205         //push task in next
206         task->marked = 1;
207         count = 0;
208         xbt_dynar_foreach(task->tasks_after,count,depafter){
209           child_task = depbefore->dst;
210           if(child_task->kind == SD_TASK_COMM_E2E){
211             unsigned int j=0;
212             child_task->marked = 1;
213             SD_task_t child_task_2 = NULL;
214             xbt_dynar_foreach(child_task->tasks_after,j,depafter){
215               child_task_2 = depbefore->dst;
216               if(parents_are_marked(child_task_2))
217                 xbt_dynar_push(next, &child_task_2);
218             }
219           } else{
220             if(parents_are_marked(child_task))
221               xbt_dynar_push(next, &child_task);
222           }
223           child_task = NULL;
224         }
225         task = NULL;
226         count = 0;
227       }
228       xbt_dynar_free(&current);
229       current = next;
230       next = NULL;
231     }
232     xbt_dynar_free(&current);
233     current = NULL;
234     all_marked = true;
235     xbt_dynar_foreach(dag,count,task){
236       if(task->kind == SD_TASK_COMM_E2E) continue;
237       //test if all tasks are marked
238       if(task->marked == 0){
239         XBT_WARN("the task %s is in a cycle",task->name);
240         all_marked = false;
241       }
242     }
243   }
244   return all_marked;
245 }
246
247
248
249 static YY_BUFFER_STATE input_buffer;
250
251 static xbt_dynar_t result;
252 static xbt_dict_t jobs;
253 static xbt_dict_t files;
254 static SD_task_t current_job;
255 static SD_task_t root_task, end_task;
256
257 static void dump_res()
258 {
259   unsigned int cursor;
260   SD_task_t task;
261   xbt_dynar_foreach(result, cursor, task) {
262     XBT_INFO("Task %u", cursor);
263     SD_task_dump(task);
264   }
265 }
266
267 static void dax_task_free(void *task)
268 {
269   SD_task_t t = task;
270   SD_task_destroy(t);
271 }
272
273 /** @brief loads a DAX file describing a DAG
274  * 
275  * See https://confluence.pegasus.isi.edu/display/pegasus/WorkflowGenerator
276  * for more details.
277  */
278 xbt_dynar_t SD_daxload(const char *filename)
279 {
280   xbt_dict_cursor_t cursor;
281   SD_task_t file;
282   char *name;
283   FILE *in_file = fopen(filename, "r");
284   xbt_assert(in_file, "Unable to open \"%s\"\n", filename);
285   input_buffer = dax__create_buffer(in_file, 10);
286   dax__switch_to_buffer(input_buffer);
287   dax_lineno = 1;
288
289   result = xbt_dynar_new(sizeof(SD_task_t), dax_task_free);
290   files = xbt_dict_new_homogeneous(&dax_task_free);
291   jobs = xbt_dict_new_homogeneous(NULL);
292   root_task = SD_task_create_comp_seq("root", NULL, 0);
293   /* by design the root task is always SCHEDULABLE */
294   __SD_task_set_state(root_task, SD_SCHEDULABLE);
295
296   xbt_dynar_push(result, &root_task);
297   end_task = SD_task_create_comp_seq("end", NULL, 0);
298
299   _XBT_GNUC_UNUSED int res;
300   res = dax_lex();
301   xbt_assert(!res, "Parse error in %s: %s", filename,
302               dax__parse_err_msg());
303   dax__delete_buffer(input_buffer);
304   fclose(in_file);
305   dax_lex_destroy();
306   xbt_dict_free(&jobs);
307
308   /* And now, post-process the files.
309    * We want a file task per pair of computation tasks exchanging the file. Duplicate on need
310    * Files not produced in the system are said to be produced by root task (top of DAG).
311    * Files not consumed in the system are said to be consumed by end task (bottom of DAG).
312    */
313
314   xbt_dict_foreach(files, cursor, name, file) {
315     unsigned int cpt1, cpt2;
316     SD_task_t newfile;
317     SD_dependency_t depbefore, depafter;
318     if (xbt_dynar_is_empty(file->tasks_before)) {
319       xbt_dynar_foreach(file->tasks_after, cpt2, depafter) {
320         newfile = SD_task_create_comm_e2e(file->name, NULL, file->amount);
321         SD_task_dependency_add(NULL, NULL, root_task, newfile);
322         SD_task_dependency_add(NULL, NULL, newfile, depafter->dst);
323 #ifdef HAVE_TRACING
324         if (depafter->src){
325           const char *category = depafter->src->category;
326           if (category){
327             TRACE_category (category);
328             SD_task_set_category (newfile, category);
329           }
330         }
331 #endif
332         xbt_dynar_push(result, &newfile);
333       }
334     } else if (xbt_dynar_is_empty(file->tasks_after)) {
335       xbt_dynar_foreach(file->tasks_before, cpt2, depbefore) {
336         newfile = SD_task_create_comm_e2e(file->name, NULL, file->amount);
337         SD_task_dependency_add(NULL, NULL, depbefore->src, newfile);
338         SD_task_dependency_add(NULL, NULL, newfile, end_task);
339 #ifdef HAVE_TRACING
340         if (depbefore->src){
341           const char *category = depbefore->src->category;
342           if (category){
343             TRACE_category (category);
344             SD_task_set_category (newfile, category);
345           }
346         }
347 #endif
348         xbt_dynar_push(result, &newfile);
349       }
350     } else {
351       xbt_dynar_foreach(file->tasks_before, cpt1, depbefore) {
352         xbt_dynar_foreach(file->tasks_after, cpt2, depafter) {
353           if (depbefore->src == depafter->dst) {
354             XBT_WARN
355                 ("File %s is produced and consumed by task %s. This loop dependency will prevent the execution of the task.",
356                  file->name, depbefore->src->name);
357           }
358           newfile = SD_task_create_comm_e2e(file->name, NULL, file->amount);
359           SD_task_dependency_add(NULL, NULL, depbefore->src, newfile);
360           SD_task_dependency_add(NULL, NULL, newfile, depafter->dst);
361 #ifdef HAVE_TRACING
362           if (depbefore->src){
363             const char *category = depbefore->src->category;
364             if (category){
365               TRACE_category (category);
366               SD_task_set_category (newfile, category);
367             }
368           }
369 #endif
370           xbt_dynar_push(result, &newfile);
371         }
372       }
373     }
374   }
375
376   /* Push end task last */
377   xbt_dynar_push(result, &end_task);
378
379   /* Free previous copy of the files */
380   xbt_dict_free(&files);
381   unsigned int cpt;
382   xbt_dynar_foreach(result, cpt, file) {
383     if (SD_task_get_kind(file) == SD_TASK_COMM_E2E) {
384       uniq_transfer_task_name(file);
385     }
386   }
387
388   if (!acyclic_graph_detail(result)){
389     XBT_ERROR("The DAX described in %s is not a DAG. It contains a cycle.",
390               basename((char*)filename));
391     xbt_dynar_foreach(result, cpt, file)
392       SD_task_destroy(file);
393      xbt_dynar_free_container(&result);
394     return NULL;
395   } else {
396     return result;
397   }
398 }
399
400 void STag_dax__adag(void)
401 {
402   _XBT_GNUC_UNUSED double version;
403   version = dax_parse_double(A_dax__adag_version);
404
405   xbt_assert(version == 2.1,
406               "Expected version 2.1 in <adag> tag, got %f. Fix the parser or your file",
407               version);
408 }
409
410 void STag_dax__job(void)
411 {
412   double runtime = dax_parse_double(A_dax__job_runtime);
413   char *name = bprintf("%s@%s", A_dax__job_id, A_dax__job_name);
414   runtime *= 4200000000.;       /* Assume that timings were done on a 4.2GFlops machine. I mean, why not? */
415 //  XBT_INFO("See <job id=%s runtime=%s %.0f>",A_dax__job_id,A_dax__job_runtime,runtime);
416   current_job = SD_task_create_comp_seq(name, NULL, runtime);
417 #ifdef HAVE_TRACING
418   char *category = A_dax__job_name;
419   if (category){
420     TRACE_category (category);
421     SD_task_set_category(current_job, category);
422   }
423 #endif
424   xbt_dict_set(jobs, A_dax__job_id, current_job, NULL);
425   free(name);
426   xbt_dynar_push(result, &current_job);
427 }
428
429 void STag_dax__uses(void)
430 {
431   SD_task_t file;
432   double size = dax_parse_double(A_dax__uses_size);
433   int is_input = (A_dax__uses_link == A_dax__uses_link_input);
434
435 //  XBT_INFO("See <uses file=%s %s>",A_dax__uses_file,(is_input?"in":"out"));
436   file = xbt_dict_get_or_null(files, A_dax__uses_file);
437   if (file == NULL) {
438     file = SD_task_create_comm_e2e(A_dax__uses_file, NULL, size);
439     xbt_dict_set(files, A_dax__uses_file, file, NULL);
440   } else {
441     if (SD_task_get_amount(file) != size) {
442       XBT_WARN("Ignoring file %s size redefinition from %.0f to %.0f",
443             A_dax__uses_file, SD_task_get_amount(file), size);
444     }
445   }
446   if (is_input) {
447     SD_task_dependency_add(NULL, NULL, file, current_job);
448   } else {
449     SD_task_dependency_add(NULL, NULL, current_job, file);
450     if (xbt_dynar_length(file->tasks_before) > 1) {
451       XBT_WARN("File %s created at more than one location...", file->name);
452     }
453   }
454 }
455
456 static SD_task_t current_child;
457 void STag_dax__child(void)
458 {
459   current_child = xbt_dict_get_or_null(jobs, A_dax__child_ref);
460   if (current_child == NULL)
461     dax_parse_error(bprintf
462                     ("Asked to add dependencies to the non-existent %s task",
463                      A_dax__child_ref));
464 }
465
466 void ETag_dax__child(void)
467 {
468   current_child = NULL;
469 }
470
471 void STag_dax__parent(void)
472 {
473   SD_task_t parent = xbt_dict_get_or_null(jobs, A_dax__parent_ref);
474   if (parent == NULL)
475     dax_parse_error(bprintf
476                     ("Asked to add a dependency from %s to %s, but %s does not exist",
477                      current_child->name, A_dax__parent_ref,
478                      A_dax__parent_ref));
479   SD_task_dependency_add(NULL, NULL, parent, current_child);
480   XBT_DEBUG("Control-flow dependency from %s to %s", current_child->name,
481          parent->name);
482 }
483
484 void ETag_dax__adag(void)
485 {
486 //  XBT_INFO("See </adag>");
487 }
488
489 void ETag_dax__job(void)
490 {
491   current_job = NULL;
492 //  XBT_INFO("See </job>");
493 }
494
495 void ETag_dax__parent(void)
496 {
497 //  XBT_INFO("See </parent>");
498 }
499
500 void ETag_dax__uses(void)
501 {
502 //  XBT_INFO("See </uses>");
503 }