From: quintin Date: Thu, 1 Jul 2010 13:28:33 +0000 (+0000) Subject: Add a dot loader to have a connection with the random dag generator ggen. X-Git-Tag: v3_5~861 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/00b8b9a6c24efa391b997f26d5ff6d2dfd0b0dd1 Add a dot loader to have a connection with the random dag generator ggen. I will add a small documentation to explain how to use this loader. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@7956 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- diff --git a/buildtools/Cmake/CompleteInFiles.cmake b/buildtools/Cmake/CompleteInFiles.cmake index eb46988be7..479b997c3f 100644 --- a/buildtools/Cmake/CompleteInFiles.cmake +++ b/buildtools/Cmake/CompleteInFiles.cmake @@ -2,6 +2,9 @@ include(CheckFunctionExists) include(CheckIncludeFile) include(CheckIncludeFiles) include(CheckLibraryExists) +if(APPLE) +set( CMAKE_REQUIRED_INCLUDES "${CMAKE_REQUIRED_INCLUDES} /opt/local/include") +endif(APPLE) # Checks for header libraries functions. @@ -35,7 +38,7 @@ CHECK_INCLUDE_FILE(stdlib.h HAVE_STDLIB_H) CHECK_INCLUDE_FILE(strings.h HAVE_STRINGS_H) CHECK_INCLUDE_FILE(string.h HAVE_STRING_H) CHECK_INCLUDE_FILE(ucontext.h HAVE_UCONTEXT_H) - +CHECK_INCLUDE_FILE(graphviz/cgraph.h HAVE_CGRAPH_H) CHECK_FUNCTION_EXISTS(gettimeofday HAVE_GETTIMEOFDAY) CHECK_FUNCTION_EXISTS(usleep HAVE_USLEEP) CHECK_FUNCTION_EXISTS(getdtablesize HAVE_GETDTABLESIZE) diff --git a/buildtools/Cmake/DefinePackages.cmake b/buildtools/Cmake/DefinePackages.cmake index 6756c21f97..067538c8b1 100755 --- a/buildtools/Cmake/DefinePackages.cmake +++ b/buildtools/Cmake/DefinePackages.cmake @@ -268,6 +268,13 @@ set(SIMDAG_SRC src/simdag/sd_workstation.c src/simdag/sd_daxloader.c ) +if(HAVE_CGRAPH_H) + set(SIMDAG_SRC + ${SIMDAG_SRC} + src/simdag/sd_dotloader.c + ) + +endif(HAVE_CGRAPH_H) set(GRAS_COMMON_SRC src/gras/gras.c @@ -792,4 +799,4 @@ set(source_to_pack src/bindings/rubyDag/rb_SD_workstation.h src/bindings/rubyDag/rb_simdag.c src/bindings/rubyDag/simdag.rb -) \ No newline at end of file +) diff --git a/buildtools/Cmake/MakeExeLib.cmake b/buildtools/Cmake/MakeExeLib.cmake index 515aa443a0..0b0c5264f8 100644 --- a/buildtools/Cmake/MakeExeLib.cmake +++ b/buildtools/Cmake/MakeExeLib.cmake @@ -58,6 +58,10 @@ if(HAVE_LUA) DEPENDS ${PROJECT_DIRECTORY}/examples/lua/simgrid.so) endif(HAVE_LUA) +if(HAVE_CGRAPH_H) + SET(SIMGRID_DEP "${SIMGRID_DEP} -lcgraph") +endif(HAVE_CGRAPH_H) + if(HAVE_GTNETS) SET(SIMGRID_DEP "${SIMGRID_DEP} -lgtnets") endif(HAVE_GTNETS) @@ -151,9 +155,10 @@ add_subdirectory(${PROJECT_DIRECTORY}/examples/amok/saturate) add_subdirectory(${PROJECT_DIRECTORY}/examples/simdag) add_subdirectory(${PROJECT_DIRECTORY}/examples/simdag/dax) +add_subdirectory(${PROJECT_DIRECTORY}/examples/simdag/dot) add_subdirectory(${PROJECT_DIRECTORY}/examples/simdag/metaxml) add_subdirectory(${PROJECT_DIRECTORY}/examples/simdag/properties) add_subdirectory(${PROJECT_DIRECTORY}/examples/simdag/scheduling) if(enable_smpi) add_subdirectory(${PROJECT_DIRECTORY}/examples/smpi) -endif(enable_smpi) \ No newline at end of file +endif(enable_smpi) diff --git a/examples/simdag/dot/CMakeLists.txt b/examples/simdag/dot/CMakeLists.txt new file mode 100644 index 0000000000..c67aa480ca --- /dev/null +++ b/examples/simdag/dot/CMakeLists.txt @@ -0,0 +1,9 @@ +cmake_minimum_required(VERSION 2.6) + +set(EXECUTABLE_OUTPUT_PATH "${PROJECT_DIRECTORY}/examples/simdag/dot") +set(LIBRARY_OUTPUT_PATH "${PROJECT_DIRECTORY}/lib") + +add_executable(dot_test dot_test.c) #add_executable( ) + +### Add definitions for compile +target_link_libraries(dot_test simgrid m pthread -fprofile-arcs) #target_link_libraries( ) diff --git a/examples/simdag/dot/dag.dot b/examples/simdag/dot/dag.dot new file mode 100644 index 0000000000..d675d65429 --- /dev/null +++ b/examples/simdag/dot/dag.dot @@ -0,0 +1,21 @@ +digraph G { +0 [size="10000000129.452715"]; +1 [size="10000000131.133657"]; +2 [size="10000000121.12487"]; +3 [size="10000000230.608025"]; +4 [size="10000000004.994019"]; +5 [size="10000000046.016401"]; +6 [size="10000000091.598791"]; +7 [size="10000000040.679438"]; +8 [size="10000000250.490017"]; +9 [size="10000000079.267649"]; +0->1 [size="10001.389601075407"]; +1->2 [size="10004.164631264117"]; +2->3 [size="10001.781644976922"]; +3->4 [size="10005.191659731605"]; +4->5 [size="10029.262823275711"]; +5->6 [size="10035.687920631362"]; +6->7 [size="10004.920415194067"]; +7->8 [size="10000.234048984707"]; +8->9 [size="10014.203269230178"]; +} diff --git a/examples/simdag/dot/dot_test.c b/examples/simdag/dot/dot_test.c new file mode 100644 index 0000000000..2a24180455 --- /dev/null +++ b/examples/simdag/dot/dot_test.c @@ -0,0 +1,125 @@ +/* simple test trying to load a DAX file. */ + +/* Copyright (c) 2009, 2010. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include +#include "simdag/simdag.h" +#include "xbt/log.h" +#include "xbt/ex.h" +#include + +XBT_LOG_NEW_DEFAULT_CATEGORY(test, + "Logging specific to this SimDag example"); + +static int name_compare_hosts(const void *n1, const void *n2) +{ + char name1[80], name2[80]; + strcpy(name1, SD_workstation_get_name(*((SD_workstation_t *) n1))); + strcpy(name2, SD_workstation_get_name(*((SD_workstation_t *) n2))); + + return strcmp(name1, name2); +} + +int main(int argc, char **argv) { + xbt_dynar_t dot, changed; + unsigned int cursor; + SD_task_t task; + + /* initialisation of SD */ + SD_init(&argc, argv); + + /* Check our arguments */ + if (argc < 3) { + INFO1("Usage: %s platform_file dot_file [trace_file]", argv[0]); + INFO1("example: %s ../sd_platform.xml Montage_50.xml Montage_50.mytrace", argv[0]); + exit(1); + } + char *tracefilename; + if (argc == 3) { + char *last=strrchr(argv[2],'.'); + + tracefilename=bprintf("%.*s.trace",(int)(last==NULL?strlen(argv[2]):last-argv[2]),argv[2]); + } else { + tracefilename = xbt_strdup(argv[3]); + } + + /* creation of the environment */ + SD_create_environment(argv[1]); + + /* load the DAX file */ + dot=SD_dotload(argv[2]); + INFO1("dot : %lld",dot); + + /* Display all the tasks */ + INFO0("------------------- Display all tasks of the loaded DAG ---------------------------"); + xbt_dynar_foreach(dot,cursor,task) { + SD_task_dump(task); + } + + FILE *dotout = fopen("dot.dot","w"); + fprintf(dotout,"digraph A {\n"); + xbt_dynar_foreach(dot,cursor,task) { + SD_task_dotty(task,dotout); + } + fprintf(dotout,"}\n"); + fclose(dotout); + + /* Schedule them all on the first workstation */ + INFO0("------------------- Schedule tasks ---------------------------"); + const SD_workstation_t *ws_list = SD_workstation_get_list(); + int totalHosts = SD_workstation_get_number(); + qsort((void *) ws_list, totalHosts, sizeof(SD_workstation_t), + name_compare_hosts); + + int count = SD_workstation_get_number(); + xbt_dynar_foreach(dot,cursor,task) { + if (SD_task_get_kind(task) == SD_TASK_COMP_SEQ) { + if (!strcmp(SD_task_get_name(task),"end")) + SD_task_schedulel(task,1,ws_list[0]); + else + SD_task_schedulel(task,1,ws_list[cursor%count]); + } + } + + INFO0("------------------- Run the schedule ---------------------------"); + changed = SD_simulate(-1); + xbt_dynar_free_container(&changed); + INFO0("------------------- Produce the trace file---------------------------"); + INFO1("Producing the trace of the run into %s",tracefilename); + FILE*out = fopen(tracefilename,"w"); + xbt_assert1(out,"Cannot write to %s",tracefilename); + free(tracefilename); + + xbt_dynar_foreach(dot,cursor,task) { + int kind = SD_task_get_kind(task); + SD_workstation_t *wsl = SD_task_get_workstation_list(task); + switch (kind) { + case SD_TASK_COMP_SEQ: + fprintf(out,"[%f] %s compute %f # %s\n",SD_task_get_start_time(task), + SD_workstation_get_name(wsl[0]),SD_task_get_amount(task), + SD_task_get_name(task)); + break; + case SD_TASK_COMM_E2E: + fprintf(out,"[%f] %s send %s %f # %s\n",SD_task_get_start_time(task), + SD_workstation_get_name(wsl[0]),SD_workstation_get_name(wsl[1]), + SD_task_get_amount(task), SD_task_get_name(task)); + fprintf(out,"[%f] %s recv %s %f # %s\n",SD_task_get_finish_time(task), + SD_workstation_get_name(wsl[1]),SD_workstation_get_name(wsl[0]), + SD_task_get_amount(task), SD_task_get_name(task)); + break; + default: + xbt_die(bprintf("Task %s is of unknown kind %d",SD_task_get_name(task),SD_task_get_kind(task))); + } + SD_task_destroy(task); + } + fclose(out); + + /* exit */ + SD_exit(); + return 0; +} diff --git a/include/simdag/simdag.h b/include/simdag/simdag.h index 76a1d4c6b7..a95d27b0d3 100644 --- a/include/simdag/simdag.h +++ b/include/simdag/simdag.h @@ -7,6 +7,7 @@ #ifndef SIMDAG_SIMDAG_H #define SIMDAG_SIMDAG_H +#include #include "simdag/datatypes.h" #include "xbt/misc.h" #include "xbt/dynar.h" @@ -200,6 +201,8 @@ XBT_PUBLIC(xbt_dynar_t) SD_simulate(double how_long); XBT_PUBLIC(double) SD_get_clock(void); XBT_PUBLIC(void) SD_exit(void); XBT_PUBLIC(xbt_dynar_t) SD_daxload(const char*filename); +XBT_PUBLIC(xbt_dynar_t) SD_dotload(const char*filename); +XBT_PUBLIC(xbt_dynar_t) SD_dotload_FILE(FILE* in_file); /** @} */ diff --git a/src/msg/gos.c b/src/msg/gos.c index f59e937a9f..263ecbda29 100644 --- a/src/msg/gos.c +++ b/src/msg/gos.c @@ -474,7 +474,7 @@ MSG_error_t MSG_comm_wait(msg_comm_t comm, double timeout) { TRY { SIMIX_network_wait(comm,timeout); - if(!(comm->src_proc == SIMIX_process_self())) + if(!(comm->src_proc == SIMIX_process_self())) { m_task_t task; task = (m_task_t) SIMIX_communication_get_src_buf(comm); diff --git a/src/simdag/sd_dotloader.c b/src/simdag/sd_dotloader.c new file mode 100644 index 0000000000..d65f46f832 --- /dev/null +++ b/src/simdag/sd_dotloader.c @@ -0,0 +1,253 @@ +/* Copyright (c) 2009, 2010. The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "private.h" +#include "simdag/simdag.h" +#include "xbt/misc.h" +#include "xbt/log.h" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_dotparse, sd,"Parsing DOT files"); + +#undef CLEANUP +#include + +static double parse_double(const char *string) { + if (string == NULL) return -10; + int ret = 0; + double value; + + ret = sscanf(string, "%lg", &value); + if (ret != 1) + WARN1("%s is not a double", string); + return value; +} +static int parse_int(const char *string) { + if (string == NULL) return -10; + int ret = 0; + int value; + + ret = sscanf(string, "%d", &value); + if (ret != 1) + WARN1("%s is not an integer", string); + return value; +} + +static xbt_dynar_t result; +static xbt_dict_t jobs; +static xbt_dict_t files; +static SD_task_t root_task,end_task; +static Agraph_t * dag_dot; + +static void dump_res() { + unsigned int cursor; + SD_task_t task; + xbt_dynar_foreach(result,cursor,task) { + INFO1("Task %d",cursor); + SD_task_dump(task); + } +} + +static void dot_task_free(void*task){ + SD_task_t t=task; + SD_task_destroy(t); +} +void Start_dot_job(Agnode_t *dag_node) ; +void input_edge(SD_task_t current_job, Agedge_t *edge) ; +void output_edge(SD_task_t current_job, Agedge_t *edge) ; + +/** @brief loads a DAX file describing a DAG + * + * See https://confluence.pegasus.isi.edu/display/pegasus/WorkflowGenerator + * for more details. + */ +xbt_dynar_t SD_dotload(const char*filename){ + FILE* in_file = fopen(filename,"r"); + xbt_assert1(in_file, "Unable to open \"%s\"\n", filename); + SD_dotload_FILE(in_file); + fclose(in_file); + return result; +} + +xbt_dynar_t SD_dotload_FILE(FILE* in_file){ + xbt_assert0(in_file, "Unable to use a null file descriptor\n"); + dag_dot = agread(in_file,NIL(Agdisc_t*)); + + result = xbt_dynar_new(sizeof(SD_task_t),dot_task_free); + files=xbt_dict_new(); + jobs=xbt_dict_new(); + root_task = SD_task_create_comp_seq("root",NULL,0); + /* by design the root task is always SCHEDULABLE */ + __SD_task_set_state(root_task, SD_SCHEDULABLE); + + xbt_dynar_push(result,&root_task); + end_task = SD_task_create_comp_seq("end",NULL,0); + + Agnode_t *dag_node = NULL; + SD_task_t task_node = NULL; + for (dag_node = agfstnode(dag_dot); dag_node; dag_node = agnxtnode(dag_dot,dag_node)){ + Start_dot_job(dag_node); + } + agclose(dag_dot); + xbt_dict_free(&jobs); + + /* And now, post-process the files. + * We want a file task per pair of computation tasks exchanging the file. Duplicate on need + * Files not produced in the system are said to be produced by root task (top of DAG). + * Files not consumed in the system are said to be consumed by end task (bottom of DAG). + */ + xbt_dict_cursor_t cursor; + SD_task_t file; + char *name; + xbt_dict_foreach(files,cursor,name,file) { + unsigned int cpt1,cpt2; + SD_task_t newfile = NULL; + SD_dependency_t depbefore,depafter; + if (xbt_dynar_length(file->tasks_before) == 0) { + xbt_dynar_foreach(file->tasks_after,cpt2,depafter) { + SD_task_t newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount); + SD_task_dependency_add(NULL,NULL,root_task,newfile); + SD_task_dependency_add(NULL,NULL,newfile,depafter->dst); + xbt_dynar_push(result,&newfile); + } + } else if (xbt_dynar_length(file->tasks_after) == 0) { + xbt_dynar_foreach(file->tasks_before,cpt2,depbefore) { + SD_task_t newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount); + SD_task_dependency_add(NULL,NULL,depbefore->src,newfile); + SD_task_dependency_add(NULL,NULL,newfile,end_task); + xbt_dynar_push(result,&newfile); + } + } else { + xbt_dynar_foreach(file->tasks_before,cpt1,depbefore) { + xbt_dynar_foreach(file->tasks_after,cpt2,depafter) { + if (depbefore->src == depafter->dst) { + WARN2("File %s is produced and consumed by task %s. This loop dependency will prevent the execution of the task.", + file->name,depbefore->src->name); + } + newfile = SD_task_create_comm_e2e(file->name,NULL,file->amount); + SD_task_dependency_add(NULL,NULL,depbefore->src,newfile); + SD_task_dependency_add(NULL,NULL,newfile,depafter->dst); + xbt_dynar_push(result,&newfile); + } + } + } + } + + /* Push end task last */ + xbt_dynar_push(result,&end_task); + + /* Free previous copy of the files */ + xbt_dict_free(&files); + + //INFO2("result : %lld, dot : %lld",result,*dot); + return result; +} + +void Start_dot_job(Agnode_t *dag_node) { + char *name = agnameof(dag_node); + double runtime = parse_double(agget(dag_node,"size")); + int performer = parse_int(agget(dag_node,"performer")); + INFO3("See ",name,agget(dag_node,"size"),runtime); + SD_task_t current_job = SD_task_create_comp_seq(name,(void*)performer,runtime); + xbt_dict_set(jobs,name,current_job,NULL); + xbt_dynar_push(result,¤t_job); + Agedge_t *e; + int count = 0; + for (e = agfstin(dag_dot,dag_node); e; e = agnxtin(dag_dot,e)) { + input_edge(current_job,e); + count++; + } + if (count==0){ + SD_task_t file; + char *name="root->many"; + double size = 0; + + file = xbt_dict_get_or_null(files,name); + if (file==NULL) { + file = SD_task_create_comm_e2e(name,NULL,size); + xbt_dict_set(files,name,file,&dot_task_free); + } else { + if (SD_task_get_amount(file)!=size) { + WARN3("Ignoring file %s size redefinition from %.0f to %.0f", + name,SD_task_get_amount(file),size); + } + } + SD_task_dependency_add(NULL,NULL,file,current_job); + } + count = 0; + for (e = agfstout(dag_dot,dag_node); e; e = agnxtout(dag_dot,e)) { + output_edge(current_job,e); + count++; + } + if (count==0){ + SD_task_t file; + char *name = "many->end"; + double size = 0; + + // INFO2("See ",A_dot__uses_file,(is_input?"in":"out")); + file = xbt_dict_get_or_null(files,name); + if (file==NULL) { + file = SD_task_create_comm_e2e(name,NULL,size); + xbt_dict_set(files,name,file,&dot_task_free); + } else { + if (SD_task_get_amount(file)!=size) { + WARN3("Ignoring file %s size redefinition from %.0f to %.0f", + name,SD_task_get_amount(file),size); + } + } + SD_task_dependency_add(NULL,NULL,current_job,file); + if (xbt_dynar_length(file->tasks_before)>1) { + WARN1("File %s created at more than one location...",file->name); + } + } +} + +void input_edge(SD_task_t current_job, Agedge_t *edge) { + SD_task_t file; + + char name[80]; + sprintf(name ,"%s->%s",agnameof(agtail(edge)) ,agnameof(aghead(edge))); + double size = parse_double(agget(edge,"size")); + int sender = parse_int(agget(edge,"sender")); + int reciever = parse_int(agget(edge,"reciever")); + + file = xbt_dict_get_or_null(files,name); + if (file==NULL) { + file = SD_task_create_comm_e2e(name,NULL,size); + xbt_dict_set(files,name,file,&dot_task_free); + } else { + if (SD_task_get_amount(file)!=size) { + WARN3("Ignoring file %s size redefinition from %.0f to %.0f", + name,SD_task_get_amount(file),size); + } + } + SD_task_dependency_add(NULL,NULL,file,current_job); +} + +void output_edge(SD_task_t current_job, Agedge_t *edge) { + SD_task_t file; + char name[80]; + sprintf(name ,"%s->%s",agnameof(agtail(edge)) ,agnameof(aghead(edge))); + double size = parse_double(agget(edge,"size")); + int sender = parse_int(agget(edge,"sender")); + int reciever = parse_int(agget(edge,"reciever")); + +// INFO2("See ",A_dot__uses_file,(is_input?"in":"out")); + file = xbt_dict_get_or_null(files,name); + if (file==NULL) { + file = SD_task_create_comm_e2e(name,NULL,size); + xbt_dict_set(files,name,file,&dot_task_free); + } else { + if (SD_task_get_amount(file)!=size) { + WARN3("Ignoring file %s size redefinition from %.0f to %.0f", + name,SD_task_get_amount(file),size); + } + } + SD_task_dependency_add(NULL,NULL,current_job,file); + if (xbt_dynar_length(file->tasks_before)>1) { + WARN1("File %s created at more than one location...",file->name); + } +} +