Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
correction to compile with warning and th indentation in my files
[simgrid.git] / src / simdag / sd_dotloader.c
1 /* Copyright (c) 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "private.h"
8 #include "simdag/simdag.h"
9 #include "xbt/misc.h"
10 #include "xbt/log.h"
11
12 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_dotparse, sd,"Parsing DOT files");
13
14 #undef CLEANUP
15 #include <graphviz/cgraph.h>
16
17 static double parse_double(const char *string) {
18     if (string == NULL) return -10;
19     int ret = 0;
20     double value;
21
22     ret = sscanf(string, "%lg", &value);
23     if (ret != 1)
24         WARN1("%s is not a double", string);
25     return value;
26 }
27 static int parse_int(const char *string) {
28     if (string == NULL) return -10;
29     int ret = 0;
30     int value;
31
32     ret = sscanf(string, "%d", &value);
33     if (ret != 1)
34         WARN1("%s is not an integer", string);
35     return value;
36 }
37
38 static xbt_dynar_t result;
39 static xbt_dict_t jobs;
40 static xbt_dict_t files;
41 static SD_task_t root_task,end_task;
42 static Agraph_t * dag_dot;
43
44 static void dump_res() {
45     unsigned int cursor;
46     SD_task_t task;
47     xbt_dynar_foreach(result,cursor,task) {
48         INFO1("Task %d",cursor);
49         SD_task_dump(task);
50     }
51 }
52
53 static void dot_task_free(void*task){
54     SD_task_t t=task;
55     SD_task_destroy(t);
56 }
57 void Start_dot_job(Agnode_t *dag_node) ;
58 void input_edge(SD_task_t current_job, Agedge_t *edge) ;
59 void output_edge(SD_task_t current_job, Agedge_t *edge) ;
60
61 /** @brief loads a DAX file describing a DAG
62  * 
63  * See https://confluence.pegasus.isi.edu/display/pegasus/WorkflowGenerator
64  * for more details.
65  */
66 xbt_dynar_t SD_dotload(const char*filename){
67     FILE* in_file = fopen(filename,"r");
68     xbt_assert1(in_file, "Unable to open \"%s\"\n", filename);
69     SD_dotload_FILE(in_file);
70     fclose(in_file);
71     return result;
72 }
73
74 xbt_dynar_t SD_dotload_FILE(FILE* in_file){
75     xbt_assert0(in_file, "Unable to use a null file descriptor\n");
76     dag_dot = agread(in_file,NIL(Agdisc_t*));
77
78     result = xbt_dynar_new(sizeof(SD_task_t),dot_task_free);
79     files=xbt_dict_new();
80     jobs=xbt_dict_new();
81     root_task = SD_task_create_comp_seq("root",NULL,0);
82     /* by design the root task is always SCHEDULABLE */
83     __SD_task_set_state(root_task, SD_SCHEDULABLE);
84
85     xbt_dynar_push(result,&root_task);
86     end_task = SD_task_create_comp_seq("end",NULL,0);
87
88     Agnode_t *dag_node   = NULL;
89     for (dag_node = agfstnode(dag_dot); dag_node; dag_node = agnxtnode(dag_dot,dag_node)){      
90         Start_dot_job(dag_node);        
91     }
92     agclose(dag_dot);
93     xbt_dict_free(&jobs);
94
95     /* And now, post-process the files.
96      * We want a file task per pair of computation tasks exchanging the file. Duplicate on need
97      * Files not produced in the system are said to be produced by root task (top of DAG).
98      * Files not consumed in the system are said to be consumed by end task (bottom of DAG).
99      */
100     xbt_dict_cursor_t cursor;
101     SD_task_t file;
102     char *name;
103     xbt_dict_foreach(files,cursor,name,file) {
104         unsigned int cpt1,cpt2;
105         SD_task_t newfile = NULL;
106         SD_dependency_t depbefore,depafter;
107         if (xbt_dynar_length(file->tasks_before) == 0) {
108             xbt_dynar_foreach(file->tasks_after,cpt2,depafter) {
109                 SD_task_t newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount);
110                 SD_task_dependency_add(NULL,NULL,root_task,newfile);
111                 SD_task_dependency_add(NULL,NULL,newfile,depafter->dst);
112                 xbt_dynar_push(result,&newfile);
113             }
114         } else if (xbt_dynar_length(file->tasks_after) == 0) {
115             xbt_dynar_foreach(file->tasks_before,cpt2,depbefore) {
116                 SD_task_t newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount);
117                 SD_task_dependency_add(NULL,NULL,depbefore->src,newfile);
118                 SD_task_dependency_add(NULL,NULL,newfile,end_task);
119                 xbt_dynar_push(result,&newfile);
120             }
121         } else {
122             xbt_dynar_foreach(file->tasks_before,cpt1,depbefore) {
123                 xbt_dynar_foreach(file->tasks_after,cpt2,depafter) {
124                     if (depbefore->src == depafter->dst) {
125                         WARN2("File %s is produced and consumed by task %s. This loop dependency will prevent the execution of the task.",
126                               file->name,depbefore->src->name);
127                     }
128                     newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount);
129                     SD_task_dependency_add(NULL,NULL,depbefore->src,newfile);
130                     SD_task_dependency_add(NULL,NULL,newfile,depafter->dst);
131                     xbt_dynar_push(result,&newfile);
132                 }
133             }
134         }
135     }
136
137     /* Push end task last */
138     xbt_dynar_push(result,&end_task);
139
140     /* Free previous copy of the files */
141     xbt_dict_free(&files);
142
143     //INFO2("result : %lld, dot : %lld",result,*dot);
144     return result;
145 }
146
147 void Start_dot_job(Agnode_t *dag_node) {
148     char *name = agnameof(dag_node);
149     double runtime = parse_double(agget(dag_node,(char*)"size"));
150     long performer = (long)parse_int((char *) agget(dag_node,(char*)"performer"));
151     INFO3("See <job id=%s runtime=%s %.0f>",name,agget(dag_node,(char*)"size"),runtime);
152     SD_task_t current_job = SD_task_create_comp_seq(name,(void*)performer,runtime);
153     xbt_dict_set(jobs,name,current_job,NULL);
154     xbt_dynar_push(result,&current_job);
155     Agedge_t    *e;
156     int count = 0;
157     for (e = agfstin(dag_dot,dag_node); e; e = agnxtin(dag_dot,e)) {
158         input_edge(current_job,e);
159         count++;
160     }
161     if (count==0){
162         SD_task_t file;
163         char *name= (char*)"root->many";
164         double size = 0;
165
166         file = xbt_dict_get_or_null(files,name);
167         if (file==NULL) {
168             file = SD_task_create_comm_e2e(name,NULL,size);
169             xbt_dict_set(files,name,file,&dot_task_free);
170         } else {
171             if (SD_task_get_amount(file)!=size) {
172                 WARN3("Ignoring file %s size redefinition from %.0f to %.0f",
173                       name,SD_task_get_amount(file),size);
174             }
175         }
176         SD_task_dependency_add(NULL,NULL,file,current_job);
177     }
178     count = 0;
179     for (e = agfstout(dag_dot,dag_node); e; e = agnxtout(dag_dot,e)) {
180         output_edge(current_job,e);
181         count++;
182     }
183     if (count==0){
184         SD_task_t file;
185         char *name = (char*)"many->end";
186         double size = 0;
187
188         //  INFO2("See <uses file=%s %s>",A_dot__uses_file,(is_input?"in":"out"));
189         file = xbt_dict_get_or_null(files,name);
190         if (file==NULL) {
191             file = SD_task_create_comm_e2e(name,NULL,size);
192             xbt_dict_set(files,name,file,&dot_task_free);
193         } else {
194             if (SD_task_get_amount(file)!=size) {
195                 WARN3("Ignoring file %s size redefinition from %.0f to %.0f",
196                       name,SD_task_get_amount(file),size);
197             }
198         }
199         SD_task_dependency_add(NULL,NULL,current_job,file);
200         if (xbt_dynar_length(file->tasks_before)>1) {
201             WARN1("File %s created at more than one location...",file->name);
202         }
203     }
204 }
205
206 void input_edge(SD_task_t current_job, Agedge_t *edge) {
207     SD_task_t file;
208
209     char name[80];
210     sprintf(name ,"%s->%s",agnameof(agtail(edge)) ,agnameof(aghead(edge)));
211     double size = parse_double(agget(edge,(char*)"size"));
212     //int sender = parse_int(agget(edge,(char*)"sender"));
213     //int reciever = parse_int(agget(edge,(char*)"reciever"));
214
215     file = xbt_dict_get_or_null(files,name);
216     if (file==NULL) {
217         file = SD_task_create_comm_e2e(name,NULL,size);
218         xbt_dict_set(files,name,file,&dot_task_free);
219     } else {
220         if (SD_task_get_amount(file)!=size) {
221             WARN3("Ignoring file %s size redefinition from %.0f to %.0f",
222                   name,SD_task_get_amount(file),size);
223         }
224     }
225     SD_task_dependency_add(NULL,NULL,file,current_job);
226 }
227
228 void output_edge(SD_task_t current_job, Agedge_t *edge) {
229     SD_task_t file;
230     char name[80];
231     sprintf(name ,"%s->%s",agnameof(agtail(edge)) ,agnameof(aghead(edge)));
232     double size = parse_double(agget(edge,(char*)"size"));
233     //int sender = parse_int(agget(edge,(char*)"sender"));
234     //int reciever = parse_int(agget(edge,(char*)"reciever"));
235
236     //INFO2("See <uses file=%s %s>",A_dot__uses_file,(is_input?"in":"out"));
237     file = xbt_dict_get_or_null(files,name);
238     if (file==NULL) {
239         file = SD_task_create_comm_e2e(name,NULL,size);
240         xbt_dict_set(files,name,file,&dot_task_free);
241     } else {
242         if (SD_task_get_amount(file)!=size) {
243             WARN3("Ignoring file %s size redefinition from %.0f to %.0f",
244                   name,SD_task_get_amount(file),size);
245         }
246     }
247     SD_task_dependency_add(NULL,NULL,current_job,file);
248     if (xbt_dynar_length(file->tasks_before)>1) {
249         WARN1("File %s created at more than one location...",file->name);
250     }
251 }
252