Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
DAX loader seem to be working
authormquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 6 Oct 2009 16:59:11 +0000 (16:59 +0000)
committermquinson <mquinson@48e7efb5-ca39-0410-a469-dd3cf9ba447f>
Tue, 6 Oct 2009 16:59:11 +0000 (16:59 +0000)
git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@6729 48e7efb5-ca39-0410-a469-dd3cf9ba447f

src/simdag/sd_daxloader.c

index f37b375..5deef85 100644 (file)
@@ -44,26 +44,15 @@ static YY_BUFFER_STATE input_buffer;
 static xbt_dynar_t result;
 static xbt_dict_t files;
 static SD_task_t current_job;
+static SD_task_t root_task,end_task;
 
-typedef struct {
-  xbt_dynar_t inputs;
-  xbt_dynar_t outputs;
-} dax_comp_task;
-
-typedef struct {
-  double size;
-} dax_comm_task;
-
-typedef struct {
-  short int is_comm_task;
-  union {
-    dax_comp_task comp;
-    dax_comm_task comm;
-  };
-} *dax_task_t;
-
-static void dax_task_rmdata(SD_task_t t) {
-
+static void dump_res() {
+  unsigned int cursor;
+  SD_task_t task;
+  xbt_dynar_foreach(result,cursor,task) {
+    INFO1("Task %d",cursor);
+    SD_task_dump(task);
+  }
 }
 
 static void dax_task_free(void*task){
@@ -72,7 +61,6 @@ static void dax_task_free(void*task){
 }
 
 xbt_dynar_t SD_daxload(const char*filename) {
-  SD_task_t task;
   FILE* in_file = fopen(filename,"r");
   xbt_assert1(in_file, "Unable to open \"%s\"\n", filename);
   input_buffer = dax__create_buffer(in_file, 10);
@@ -81,13 +69,57 @@ xbt_dynar_t SD_daxload(const char*filename) {
 
   result = xbt_dynar_new(sizeof(SD_task_t),dax_task_free);
   files=xbt_dict_new();
-  task = SD_task_create("top",NULL,0);
-  xbt_dynar_push(result,&task);
+  root_task = SD_task_create("root",NULL,0);
+  xbt_dynar_push(result,&root_task);
+  end_task = SD_task_create("end",NULL,0);
 
   xbt_assert2(!dax_lex(),"Parse error in %s: %s",filename,dax__parse_err_msg());
   dax__delete_buffer(input_buffer);
-  xbt_dict_free(&files);
   fclose(in_file);
+
+  /* And now, post-process the files.
+   * We want a file task per pair of computation tasks exchanging the file. Dupplicate on need
+   * Files not produced in the system are said to be produced by root task (top of DAG).
+   * Files not consumed in the system are said to be consumed by end task (bottom of DAG).
+   */
+  xbt_dict_cursor_t cursor;
+  SD_task_t file;
+  char *name;
+  xbt_dict_foreach(files,cursor,name,file) {
+    unsigned int cpt1,cpt2;
+    SD_dependency_t depbefore,depafter;
+    if (xbt_dynar_length(file->tasks_before) == 0) {
+      xbt_dynar_foreach(file->tasks_after,cpt2,depafter) {
+        SD_task_t newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount);
+        SD_task_dependency_add(NULL,NULL,root_task,newfile);
+        SD_task_dependency_add(NULL,NULL,newfile,depafter->dst);
+        xbt_dynar_push(result,&newfile);
+      }
+    } else if (xbt_dynar_length(file->tasks_after) == 0) {
+      xbt_dynar_foreach(file->tasks_before,cpt2,depbefore) {
+        SD_task_t newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount);
+        SD_task_dependency_add(NULL,NULL,depbefore->src,newfile);
+        SD_task_dependency_add(NULL,NULL,newfile,end_task);
+        xbt_dynar_push(result,&newfile);
+      }
+    } else {
+      xbt_dynar_foreach(file->tasks_before,cpt1,depbefore) {
+        xbt_dynar_foreach(file->tasks_after,cpt2,depafter) {
+          SD_task_t newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount);
+          SD_task_dependency_add(NULL,NULL,depbefore->src,newfile);
+          SD_task_dependency_add(NULL,NULL,newfile,depafter->dst);
+          xbt_dynar_push(result,&newfile);
+        }
+      }
+    }
+  }
+
+  /* Push end task last */
+  xbt_dynar_push(result,&end_task);
+
+  /* Free previous copy of the files */
+  xbt_dict_free(&files);
+
   return result;
 }
 
@@ -99,9 +131,10 @@ void STag_dax__adag(void) {
 void STag_dax__job(void) {
   double runtime = dax_parse_double(A_dax__job_runtime);
   runtime*=4200000000.; /* Assume that timings were done on a 4.2GFlops machine. I mean, why not? */
-  INFO3("See <job id=%s runtime=%s %.0f>",A_dax__job_id,A_dax__job_runtime,runtime);
-  current_job = SD_task_create(A_dax__job_id,NULL,runtime);
-  xbt_dynar_push(result,current_job);
+//  INFO3("See <job id=%s runtime=%s %.0f>",A_dax__job_id,A_dax__job_runtime,runtime);
+  current_job = SD_task_create_comp_seq(A_dax__job_id,NULL,runtime);
+  xbt_dynar_push(result,&current_job);
+
 }
 void STag_dax__child(void) {
 //  INFO0("See <child>");
@@ -114,11 +147,11 @@ void STag_dax__uses(void) {
   double size = dax_parse_double(A_dax__uses_size);
   int is_input = (A_dax__uses_link == A_dax__uses_link_input);
 
-  INFO2("See <uses file=%s %s>",A_dax__uses_file,(is_input?"in":"out"));
+//  INFO2("See <uses file=%s %s>",A_dax__uses_file,(is_input?"in":"out"));
   file = xbt_dict_get_or_null(files,A_dax__uses_file);
   if (file==NULL) {
-    file = SD_task_create(A_dax__uses_file,NULL,size);
-    xbt_dict_set(files,A_dax__uses_file,file,NULL);
+    file = SD_task_create_comm_e2e(A_dax__uses_file,NULL,size);
+    xbt_dict_set(files,A_dax__uses_file,file,&dax_task_free);
   } else {
     if (SD_task_get_amount(file)!=size) {
       WARN3("Ignoring file %s size redefinition from %.0f to %.0f",
@@ -128,7 +161,10 @@ void STag_dax__uses(void) {
   if (is_input) {
     SD_task_dependency_add(NULL,NULL,file,current_job);
   } else {
-    SD_task_dependency_add(NULL,NULL,file,current_job);
+    SD_task_dependency_add(NULL,NULL,current_job,file);
+    if (xbt_dynar_length(file->tasks_before)>1) {
+      WARN1("File %s created at more than one location...",file->name);
+    }
   }
 }
 void ETag_dax__adag(void) {