Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[src/simdag,examples/simdag] add the simulator inside the dot loader if the user...
[simgrid.git] / src / simdag / sd_dotloader.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "simdag/simdag.h"
9 #include "xbt/misc.h"
10 #include "xbt/log.h"
11 #include <stdbool.h>
12
13 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_dotparse, sd, "Parsing DOT files");
14
15 #undef CLEANUP
16
17 #ifdef HAVE_CGRAPH_H
18 #include <graphviz/cgraph.h>
19 #elif HAVE_AGRAPH_H
20 #include <graphviz/agraph.h>
21 #endif
22
23 void dot_add_task(Agnode_t * dag_node);
24 void dot_add_input_dependencies(SD_task_t current_job, Agedge_t * edge);
25 void dot_add_output_dependencies(SD_task_t current_job, Agedge_t * edge);
26 xbt_dynar_t SD_dotload_FILE(FILE * in_file);
27
28 static double dot_parse_double(const char *string)
29 {
30   if (string == NULL)
31     return -1;
32   int ret = 0;
33   double value = -1;
34
35   ret = sscanf(string, "%lg", &value);
36   if (ret != 1)
37     WARN1("%s is not a double", string);
38   return value;
39 }
40
41 static int dot_parse_int(const char *string)
42 {
43   if (string == NULL)
44     return -10;
45   int ret = 0;
46   int value = -1;
47
48   ret = sscanf(string, "%d", &value);
49   if (ret != 1)
50     WARN1("%s is not an integer", string);
51   return value;
52 }
53
54 static xbt_dynar_t result;
55 static xbt_dict_t jobs;
56 static xbt_dict_t files;
57 static xbt_dict_t computers;
58 static SD_task_t root_task, end_task;
59 static Agraph_t *dag_dot;
60 static bool schedule = true;
61
62 static void dump_res()
63 {
64   unsigned int cursor;
65   SD_task_t task;
66   xbt_dynar_foreach(result, cursor, task) {
67     INFO1("Task %d", cursor);
68     SD_task_dump(task);
69   }
70 }
71
72 bool child_are_marked(SD_task_t task){
73   SD_task_t child_task = NULL;
74   bool all_marked = true;
75   SD_dependency_t depafter = NULL;
76   int count;
77   xbt_dynar_foreach(task->tasks_after,count,depafter){
78     child_task = depafter->dst;
79     //test marked
80     if(child_task->marked == 0) {
81       all_marked = false;
82       break;
83     }
84     child_task = NULL;
85   }
86   return all_marked;
87 }
88
89 bool acyclic_graph_detection(xbt_dynar_t dag){
90   int count=0, count_current=0;
91   bool all_marked = true;
92   SD_task_t task = NULL, parent_task = NULL;
93   SD_dependency_t depbefore = NULL;
94   xbt_dynar_t next = NULL, current = xbt_dynar_new(sizeof(SD_task_t),NULL);
95
96   xbt_dynar_foreach(dag,count,task){
97     if(task->kind == SD_TASK_COMM_E2E) continue;
98     task->marked = 0;
99     if(xbt_dynar_length(task->tasks_after) == 0){
100       xbt_dynar_push(current, &task);
101     }
102   }
103   task = NULL;
104   count = 0;
105   //test if something has to be done for the next iteration
106   while(xbt_dynar_length(current) != 0){
107     next = xbt_dynar_new(sizeof(SD_task_t),NULL);
108     //test if the current iteration is done
109     count_current=0;
110     xbt_dynar_foreach(current,count_current,task){
111       if (task == NULL) continue;
112       count = 0;
113       //push task in next
114       task->marked = 1;
115       int count = 0;
116       xbt_dynar_foreach(task->tasks_before,count,depbefore){
117         parent_task = depbefore->src;
118         if(parent_task->kind == SD_TASK_COMM_E2E){
119           int j=0;
120           parent_task->marked = 1;
121           xbt_dynar_foreach(parent_task->tasks_before,j,depbefore){
122             parent_task = depbefore->src;
123             if(child_are_marked(parent_task))
124               xbt_dynar_push(next, &parent_task);
125           }
126         } else{
127           if(child_are_marked(parent_task))
128             xbt_dynar_push(next, &parent_task);
129         }
130         parent_task = NULL;
131       }
132       task = NULL;
133       count = 0;
134     }
135     xbt_dynar_free(&current);
136     current = next;
137     next = NULL;
138   }
139   xbt_dynar_free(&current);
140   current = NULL;
141   all_marked = true;
142   xbt_dynar_foreach(dag,count,task){
143     if(task->kind == SD_TASK_COMM_E2E) continue;
144     //test if all tasks are marked
145     if(task->marked == 0){
146       WARN1("test %d",task->name);
147       all_marked = false;
148       break;
149     }
150   }
151   task = NULL;
152   if(all_marked){
153     WARN0("there are no cycle in your DAG");
154   }else{
155     WARN0("there are a cycle in your DAG");
156   }
157 }
158
159 static void dot_task_free(void *task)
160 {
161   SD_task_t t = task;
162   SD_task_destroy(t);
163 }
164
165 static void TRACE_sd_dotloader (SD_task_t task, const char *category)
166 {
167   if (category){
168     if (strlen (category) != 0){
169       TRACE_category (category);
170       TRACE_sd_set_task_category (task, category);
171     }
172   }
173 }
174
175 /** @brief loads a DOT file describing a DAG
176  * 
177  * See http://www.graphviz.org/doc/info/lang.html
178  * for more details.
179  * To obtain information about transfers and tasks, two attributes are
180  * required : size on task (execution time in Flop) and size on edge
181  * (the amount of data transfer in bit).
182  * if they aren't here, there choose to be equal to zero.
183  */
184 xbt_dynar_t SD_dotload(const char *filename)
185 {
186   FILE *in_file = fopen(filename, "r");
187   xbt_assert1(in_file, "Unable to open \"%s\"\n", filename);
188   SD_dotload_FILE(in_file);
189   fclose(in_file);
190   return result;
191 }
192
193 xbt_dynar_t SD_dotload_FILE(FILE * in_file)
194 {
195   xbt_assert0(in_file, "Unable to use a null file descriptor\n");
196   dag_dot = agread(in_file, NIL(Agdisc_t *));
197
198   result = xbt_dynar_new(sizeof(SD_task_t), dot_task_free);
199   files = xbt_dict_new();
200   jobs = xbt_dict_new();
201   computers = xbt_dict_new();
202   root_task = SD_task_create_comp_seq("root", NULL, 0);
203   /* by design the root task is always SCHEDULABLE */
204   __SD_task_set_state(root_task, SD_SCHEDULABLE);
205
206   xbt_dict_set(jobs, "root", root_task, NULL);
207   xbt_dynar_push(result, &root_task);
208   end_task = SD_task_create_comp_seq("end", NULL, 0);
209   xbt_dict_set(jobs, "end", end_task, NULL);
210
211   Agnode_t *dag_node = NULL;
212   for (dag_node = agfstnode(dag_dot); dag_node;
213 #ifdef HAVE_CGRAPH_H
214        dag_node = agnxtnode(dag_dot, dag_node)
215 #elif HAVE_AGRAPH_H
216        dag_node = agnxtnode(dag_node)
217 #endif
218        ) {
219
220   dot_add_task(dag_node);
221   }
222   agclose(dag_dot);
223   xbt_dict_free(&jobs);
224
225   /* And now, post-process the files.
226    * We want a file task per pair of computation tasks exchanging the file. Duplicate on need
227    * Files not produced in the system are said to be produced by root task (top of DAG).
228    * Files not consumed in the system are said to be consumed by end task (bottom of DAG).
229    */
230   xbt_dict_cursor_t cursor;
231   SD_task_t file;
232   char *name;
233   xbt_dict_foreach(files, cursor, name, file) {
234     unsigned int cpt1, cpt2;
235     SD_task_t newfile = NULL;
236     SD_dependency_t depbefore, depafter;
237     if (xbt_dynar_length(file->tasks_before) == 0) {
238       xbt_dynar_foreach(file->tasks_after, cpt2, depafter) {
239         SD_task_t newfile =
240             SD_task_create_comm_e2e(file->name, NULL, file->amount);
241         SD_task_dependency_add(NULL, NULL, root_task, newfile);
242         SD_task_dependency_add(NULL, NULL, newfile, depafter->dst);
243         xbt_dynar_push(result, &newfile);
244       }
245     } else if (xbt_dynar_length(file->tasks_after) == 0) {
246       xbt_dynar_foreach(file->tasks_before, cpt2, depbefore) {
247         SD_task_t newfile =
248             SD_task_create_comm_e2e(file->name, NULL, file->amount);
249         SD_task_dependency_add(NULL, NULL, depbefore->src, newfile);
250         SD_task_dependency_add(NULL, NULL, newfile, end_task);
251         xbt_dynar_push(result, &newfile);
252       }
253     } else {
254       xbt_dynar_foreach(file->tasks_before, cpt1, depbefore) {
255         xbt_dynar_foreach(file->tasks_after, cpt2, depafter) {
256           if (depbefore->src == depafter->dst) {
257             WARN2
258                 ("File %s is produced and consumed by task %s. This loop dependency will prevent the execution of the task.",
259                  file->name, depbefore->src->name);
260           }
261           newfile =
262               SD_task_create_comm_e2e(file->name, NULL, file->amount);
263           SD_task_dependency_add(NULL, NULL, depbefore->src, newfile);
264           SD_task_dependency_add(NULL, NULL, newfile, depafter->dst);
265           xbt_dynar_push(result, &newfile);
266         }
267       }
268     }
269   }
270
271   /* Push end task last */
272   xbt_dynar_push(result, &end_task);
273
274   /* Free previous copy of the files */
275   xbt_dict_free(&files);
276   if(schedule == false){
277     WARN0("No scheduling provided");
278   }else{
279     xbt_dynar_t computer = NULL;
280     xbt_dict_cursor_t dict_cursor;
281     char *computer_name;
282     const SD_workstation_t *workstations = SD_workstation_get_list ();
283     xbt_dict_foreach(computers,dict_cursor,computer_name,computer){
284       int count_computer = dot_parse_int(computer_name);
285       int count=0;
286       SD_task_t task;
287       SD_task_t task_previous = NULL;
288       xbt_dynar_foreach(computer,count,task){
289         // add dependency between the previous and the task to avoid
290         // parallel execution
291         if(task != NULL ){
292           if(task_previous != NULL &&
293              !SD_task_dependency_exists(task_previous, task))
294             SD_task_dependency_add(NULL, NULL, task_previous, task);
295           SD_task_schedulel(task, 1, workstations[count_computer]);
296           task_previous = task;
297         }
298       }
299       xbt_dynar_free(&computer);
300     }
301     xbt_dict_free(&computers);
302   }
303   acyclic_graph_detection(result);
304
305   return result;
306 }
307
308 /* dot_add_task create a sd_task and all transfers required for this
309  * task. The execution time of the task is given by the attribute size.
310  * The unit of size is the Flop.*/
311 void dot_add_task(Agnode_t * dag_node)
312 {
313   char *name = agnameof(dag_node);
314   SD_task_t current_job;
315   double runtime = dot_parse_double(agget(dag_node, (char *) "size"));
316
317   DEBUG3("See <job id=%s runtime=%s %.0f>", name,
318         agget(dag_node, (char *) "size"), runtime);
319   current_job = xbt_dict_get_or_null(jobs, name);
320   if (current_job == NULL) {
321     current_job =
322         SD_task_create_comp_seq(name, NULL , runtime);
323 #ifdef HAVE_TRACING
324    TRACE_sd_dotloader (current_job, agget (dag_node, (char*)"category"));
325 #endif
326     xbt_dict_set(jobs, name, current_job, NULL);
327     xbt_dynar_push(result, &current_job);
328   }
329   Agedge_t *e;
330   int count = 0;
331
332 #ifdef HAVE_CGRAPH_H
333   for (e = agfstin(dag_dot, dag_node); e; e = agnxtin(dag_dot, e))
334 #elif HAVE_AGRAPH_H
335   for (e = agfstin(dag_node); e; e = agnxtin(e))
336 #endif
337   {
338   dot_add_input_dependencies(current_job, e);
339   count++;
340   }
341   if (count == 0 && current_job != root_task) {
342     SD_task_dependency_add(NULL, NULL, root_task, current_job);
343   }
344   count = 0;
345 #ifdef HAVE_CGRAPH_H
346   for (e = agfstout(dag_dot, dag_node); e; e = agnxtout(dag_dot, e))
347 #elif HAVE_AGRAPH_H
348   for (e = agfstout(dag_node); e; e = agnxtout(e))
349 #endif
350   {
351
352     dot_add_output_dependencies(current_job, e);
353     count++;
354   }
355   if (count == 0 && current_job != end_task) {
356     SD_task_dependency_add(NULL, NULL, current_job, end_task);
357   }
358
359   if(schedule || true){
360     /* try to take the information to schedule the task only if all is
361      * right*/
362     // performer is the computer which execute the task
363     unsigned long performer = -1;
364     char * char_performer = agget(dag_node, (char *) "performer");
365     if (char_performer != NULL)
366       performer = (long) dot_parse_int(char_performer);
367
368     // order is giving the task order on one computer
369     unsigned long order = -1;
370     char * char_order = agget(dag_node, (char *) "order");
371     if (char_order != NULL)
372       order = (long) dot_parse_int(char_order);
373     xbt_dynar_t computer = NULL;
374     //INFO2("performer = %d, order=%d",performer,order);
375     if(performer != -1 && order != -1){
376       //necessary parameters are given
377       computer = xbt_dict_get_or_null(computers, char_performer);
378       if(computer == NULL){
379         computer = xbt_dynar_new(sizeof(SD_task_t), NULL);
380         xbt_dict_set(computers, char_performer, computer, NULL);
381       }
382       if(performer < sd_global->workstation_count){
383         // the  wanted computer is available
384         SD_task_t *task_test = NULL;
385         if(order < computer->used)
386           task_test = xbt_dynar_get_ptr(computer,order);
387         if(task_test != NULL && *task_test != NULL && *task_test != current_job){
388           /*the user gives the same order to several tasks*/
389           schedule = false;
390           WARN0("scheduling does not take into account, several task has\
391                 the same order");
392         }else{
393           //the parameter seems to be ok
394           xbt_dynar_set_as(computer, order, SD_task_t, current_job);
395         }
396       }else{
397         /*the platform has not enough processors to schedule the DAG like
398         *the user wants*/
399         schedule = false;
400         WARN0("scheduling does not take into account, not enough computers");
401       }
402     }
403     else if((performer == -1 && order != -1) ||
404             (performer != -1 && order == -1)){
405       //one of necessary parameters are not given
406       schedule = false;
407       WARN0("scheduling does not take into account");
408     } else {
409       //No schedule available
410       schedule = false;
411     }
412   }
413 }
414
415 /* dot_add_output_dependencies create the dependencies between a task
416  * and a transfers. This is given by the edges in the dot file. 
417  * The amount of data transfers is given by the attribute size on the
418  * edge. */
419 void dot_add_input_dependencies(SD_task_t current_job, Agedge_t * edge)
420 {
421   SD_task_t file;
422
423   char name[80];
424   sprintf(name, "%s->%s", agnameof(agtail(edge)), agnameof(aghead(edge)));
425   double size = dot_parse_double(agget(edge, (char *) "size"));
426   DEBUG2("size : %e, get size : %s", size, agget(edge, (char *) "size"));
427
428   if (size > 0) {
429     file = xbt_dict_get_or_null(files, name);
430     if (file == NULL) {
431       file = SD_task_create_comm_e2e(name, NULL, size);
432 #ifdef HAVE_TRACING
433       TRACE_sd_dotloader (file, agget (edge, (char*)"category"));
434 #endif
435       xbt_dict_set(files, name, file, &dot_task_free);
436     } else {
437       if (SD_task_get_amount(file) != size) {
438         WARN3("Ignoring file %s size redefinition from %.0f to %.0f",
439               name, SD_task_get_amount(file), size);
440       }
441     }
442     SD_task_dependency_add(NULL, NULL, file, current_job);
443   } else {
444     file = xbt_dict_get_or_null(jobs, agnameof(agtail(edge)));
445     if (file != NULL) {
446       SD_task_dependency_add(NULL, NULL, file, current_job);
447     }
448   }
449 }
450
451 /* dot_add_output_dependencies create the dependencies between a
452  * transfers and a task. This is given by the edges in the dot file.
453  * The amount of data transfers is given by the attribute size on the
454  * edge. */
455 void dot_add_output_dependencies(SD_task_t current_job, Agedge_t * edge)
456 {
457   SD_task_t file;
458   char name[80];
459   sprintf(name, "%s->%s", agnameof(agtail(edge)), agnameof(aghead(edge)));
460   double size = dot_parse_double(agget(edge, (char *) "size"));
461   DEBUG2("size : %e, get size : %s", size, agget(edge, (char *) "size"));
462
463   if (size > 0) {
464     file = xbt_dict_get_or_null(files, name);
465     if (file == NULL) {
466       file = SD_task_create_comm_e2e(name, NULL, size);
467 #ifdef HAVE_TRACING
468       TRACE_sd_dotloader (file, agget (edge, (char*)"category"));
469 #endif
470       xbt_dict_set(files, name, file, &dot_task_free);
471     } else {
472       if (SD_task_get_amount(file) != size) {
473         WARN3("Ignoring file %s size redefinition from %.0f to %.0f",
474               name, SD_task_get_amount(file), size);
475       }
476     }
477     SD_task_dependency_add(NULL, NULL, current_job, file);
478     if (xbt_dynar_length(file->tasks_before) > 1) {
479       WARN1("File %s created at more than one location...", file->name);
480     }
481   } else {
482     file = xbt_dict_get_or_null(jobs, agnameof(aghead(edge)));
483     if (file != NULL) {
484       SD_task_dependency_add(NULL, NULL, current_job, file);
485     }
486   }
487 }