X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/5ce62344fd195b0cf910efcc89deb03362cfadb5..a28a695108f7e9ae7dafc83a2d39d17778f5fc98:/src/simdag/sd_dotloader.c diff --git a/src/simdag/sd_dotloader.c b/src/simdag/sd_dotloader.c index 1974cb86e0..25795b68eb 100644 --- a/src/simdag/sd_dotloader.c +++ b/src/simdag/sd_dotloader.c @@ -8,64 +8,74 @@ #include "simdag/simdag.h" #include "xbt/misc.h" #include "xbt/log.h" +#include XBT_LOG_NEW_DEFAULT_SUBCATEGORY(sd_dotparse, sd, "Parsing DOT files"); #undef CLEANUP #ifdef HAVE_CGRAPH_H - #include +#include #elif HAVE_AGRAPH_H - #include +#include #endif void dot_add_task(Agnode_t * dag_node); void dot_add_input_dependencies(SD_task_t current_job, Agedge_t * edge); void dot_add_output_dependencies(SD_task_t current_job, Agedge_t * edge); -xbt_dynar_t SD_dotload_FILE(FILE * in_file); +xbt_dynar_t SD_dotload_generic(const char * filename); static double dot_parse_double(const char *string) { - if (string == NULL) - return -1; - int ret = 0; - double value = -1; + if (string == NULL) + return -1; + double value = -1; + char *err; - ret = sscanf(string, "%lg", &value); - if (ret != 1) - WARN1("%s is not a double", string); - return value; + //ret = sscanf(string, "%lg", &value); + errno = 0; + value = strtod(string,&err); + if(errno) + { + XBT_WARN("Failed to convert string to double: %s\n",strerror(errno)); + return -1; + } + return value; } + static int dot_parse_int(const char *string) { if (string == NULL) return -10; int ret = 0; - int value; + int value = -1; ret = sscanf(string, "%d", &value); if (ret != 1) - WARN1("%s is not an integer", string); + XBT_WARN("%s is not an integer", string); return value; } static xbt_dynar_t result; static xbt_dict_t jobs; static xbt_dict_t files; +static xbt_dict_t computers; static SD_task_t root_task, end_task; static Agraph_t *dag_dot; +static bool schedule = true; static void dump_res() { unsigned int cursor; SD_task_t task; xbt_dynar_foreach(result, cursor, task) { - INFO1("Task %d", cursor); + XBT_INFO("Task %d", cursor); SD_task_dump(task); } } + static void dot_task_free(void *task) { SD_task_t t = task; @@ -93,21 +103,65 @@ static void TRACE_sd_dotloader (SD_task_t task, const char *category) */ xbt_dynar_t SD_dotload(const char *filename) { - FILE *in_file = fopen(filename, "r"); - xbt_assert1(in_file, "Unable to open \"%s\"\n", filename); - SD_dotload_FILE(in_file); - fclose(in_file); + SD_dotload_generic(filename); + xbt_dynar_t computer = NULL; + xbt_dict_cursor_t dict_cursor; + char *computer_name; + xbt_dict_foreach(computers,dict_cursor,computer_name,computer){ + xbt_dynar_free(&computer); + } + xbt_dict_free(&computers); return result; } -xbt_dynar_t SD_dotload_FILE(FILE * in_file) +xbt_dynar_t SD_dotload_with_sched(const char *filename){ + SD_dotload_generic(filename); + + if(schedule == true){ + xbt_dynar_t computer = NULL; + xbt_dict_cursor_t dict_cursor; + char *computer_name; + const SD_workstation_t *workstations = SD_workstation_get_list (); + xbt_dict_foreach(computers,dict_cursor,computer_name,computer){ + int count_computer = dot_parse_int(computer_name); + unsigned int count=0; + SD_task_t task; + SD_task_t task_previous = NULL; + xbt_dynar_foreach(computer,count,task){ + // add dependency between the previous and the task to avoid + // parallel execution + if(task != NULL ){ + if(task_previous != NULL && + !SD_task_dependency_exists(task_previous, task)) + SD_task_dependency_add(NULL, NULL, task_previous, task); + SD_task_schedulel(task, 1, workstations[count_computer]); + task_previous = task; + } + } + xbt_dynar_free(&computer); + } + xbt_dict_free(&computers); + if(acyclic_graph_detail(result)) + return result; + else + XBT_WARN("There is at least one cycle in the provided task graph"); + }else{ + XBT_WARN("The scheduling is ignored"); + } + return NULL; +} + +xbt_dynar_t SD_dotload_generic(const char * filename) { - xbt_assert0(in_file, "Unable to use a null file descriptor\n"); + xbt_assert(filename, "Unable to use a null file descriptor\n"); +//dag_dot = agopen((char*)filename,Agstrictdirected,0); + FILE *in_file = fopen(filename, "r"); dag_dot = agread(in_file, NIL(Agdisc_t *)); result = xbt_dynar_new(sizeof(SD_task_t), dot_task_free); files = xbt_dict_new(); jobs = xbt_dict_new(); + computers = xbt_dict_new(); root_task = SD_task_create_comp_seq("root", NULL, 0); /* by design the root task is always SCHEDULABLE */ __SD_task_set_state(root_task, SD_SCHEDULABLE); @@ -119,11 +173,12 @@ xbt_dynar_t SD_dotload_FILE(FILE * in_file) Agnode_t *dag_node = NULL; for (dag_node = agfstnode(dag_dot); dag_node; - #ifdef HAVE_CGRAPH_H - dag_node = agnxtnode(dag_dot, dag_node)) { - #elif HAVE_AGRAPH_H - dag_node = agnxtnode(dag_node)) { - #endif +#ifdef HAVE_CGRAPH_H + dag_node = agnxtnode(dag_dot, dag_node) +#elif HAVE_AGRAPH_H + dag_node = agnxtnode(dag_node) +#endif + ) { dot_add_task(dag_node); } @@ -162,7 +217,7 @@ xbt_dynar_t SD_dotload_FILE(FILE * in_file) xbt_dynar_foreach(file->tasks_before, cpt1, depbefore) { xbt_dynar_foreach(file->tasks_after, cpt2, depafter) { if (depbefore->src == depafter->dst) { - WARN2 + XBT_WARN ("File %s is produced and consumed by task %s. This loop dependency will prevent the execution of the task.", file->name, depbefore->src->name); } @@ -181,8 +236,12 @@ xbt_dynar_t SD_dotload_FILE(FILE * in_file) /* Free previous copy of the files */ xbt_dict_free(&files); - - return result; + fclose(in_file); + if(acyclic_graph_detail(result)) + return result; + acyclic_graph_detail(result); + free(dag_dot); + return NULL; } /* dot_add_task create a sd_task and all transfers required for this @@ -193,14 +252,13 @@ void dot_add_task(Agnode_t * dag_node) char *name = agnameof(dag_node); SD_task_t current_job; double runtime = dot_parse_double(agget(dag_node, (char *) "size")); - long performer = - (long) dot_parse_int((char *) agget(dag_node, (char *) "performer")); - INFO3("See ", name, + + XBT_DEBUG("See ", name, agget(dag_node, (char *) "size"), runtime); current_job = xbt_dict_get_or_null(jobs, name); if (current_job == NULL) { current_job = - SD_task_create_comp_seq(name, (void *) performer, runtime); + SD_task_create_comp_seq(name, NULL , runtime); #ifdef HAVE_TRACING TRACE_sd_dotloader (current_job, agget (dag_node, (char*)"category")); #endif @@ -210,11 +268,12 @@ void dot_add_task(Agnode_t * dag_node) Agedge_t *e; int count = 0; - #ifdef HAVE_CGRAPH_H - for (e = agfstin(dag_dot, dag_node); e; e = agnxtin(dag_dot, e)) { - #elif HAVE_AGRAPH_H - for (e = agfstin(dag_node); e; e = agnxtin(e)) { - #endif +#ifdef HAVE_CGRAPH_H + for (e = agfstin(dag_dot, dag_node); e; e = agnxtin(dag_dot, e)) +#elif HAVE_AGRAPH_H + for (e = agfstin(dag_node); e; e = agnxtin(e)) +#endif + { dot_add_input_dependencies(current_job, e); count++; } @@ -222,11 +281,12 @@ void dot_add_task(Agnode_t * dag_node) SD_task_dependency_add(NULL, NULL, root_task, current_job); } count = 0; - #ifdef HAVE_CGRAPH_H - for (e = agfstout(dag_dot, dag_node); e; e = agnxtout(dag_dot, e)) { - #elif HAVE_AGRAPH_H - for (e = agfstout(dag_node); e; e = agnxtout(e)) { - #endif +#ifdef HAVE_CGRAPH_H + for (e = agfstout(dag_dot, dag_node); e; e = agnxtout(dag_dot, e)) +#elif HAVE_AGRAPH_H + for (e = agfstout(dag_node); e; e = agnxtout(e)) +#endif + { dot_add_output_dependencies(current_job, e); count++; @@ -234,6 +294,57 @@ void dot_add_task(Agnode_t * dag_node) if (count == 0 && current_job != end_task) { SD_task_dependency_add(NULL, NULL, current_job, end_task); } + + if(schedule || XBT_LOG_ISENABLED(sd_dotparse, xbt_log_priority_verbose)){ + /* try to take the information to schedule the task only if all is + * right*/ + // performer is the computer which execute the task + unsigned long performer = -1; + char * char_performer = agget(dag_node, (char *) "performer"); + if (char_performer != NULL) + performer = (long) dot_parse_int(char_performer); + + // order is giving the task order on one computer + unsigned long order = -1; + char * char_order = agget(dag_node, (char *) "order"); + if (char_order != NULL) + order = (long) dot_parse_int(char_order); + xbt_dynar_t computer = NULL; + //XBT_INFO("performer = %d, order=%d",performer,order); + if(performer != -1 && order != -1){ + //necessary parameters are given + computer = xbt_dict_get_or_null(computers, char_performer); + if(computer == NULL){ + computer = xbt_dynar_new(sizeof(SD_task_t), NULL); + xbt_dict_set(computers, char_performer, computer, NULL); + } + if(performer < xbt_lib_length(host_lib)){ + // the wanted computer is available + SD_task_t *task_test = NULL; + if(order < computer->used) + task_test = xbt_dynar_get_ptr(computer,order); + if(task_test != NULL && *task_test != NULL && *task_test != current_job){ + /*the user gives the same order to several tasks*/ + schedule = false; + XBT_VERB("The task %s starts on the computer %s at the position : %s like the task %s", + (*task_test)->name, char_performer, char_order, current_job->name); + }else{ + //the parameter seems to be ok + xbt_dynar_set_as(computer, order, SD_task_t, current_job); + } + }else{ + /*the platform has not enough processors to schedule the DAG like + *the user wants*/ + schedule = false; + XBT_VERB("The schedule is ignored, there are not enough computers"); + } + } + else { + //one of necessary parameters are not given + schedule = false; + XBT_VERB("The schedule is ignored, the task %s is not correctly schedule", current_job->name); + } + } } /* dot_add_output_dependencies create the dependencies between a task @@ -243,13 +354,13 @@ void dot_add_task(Agnode_t * dag_node) void dot_add_input_dependencies(SD_task_t current_job, Agedge_t * edge) { SD_task_t file; - - char name[80]; - sprintf(name, "%s->%s", agnameof(agtail(edge)), agnameof(aghead(edge))); + char *name_tail=agnameof(agtail(edge)); + char *name_head=agnameof(aghead(edge)); + char *name = malloc((strlen(name_head)+strlen(name_tail)+3)*sizeof(char)); + sprintf(name, "%s->%s", name_tail, name_head); double size = dot_parse_double(agget(edge, (char *) "size")); - INFO2("size : %e, get size : %s", size, agget(edge, (char *) "size")); - //int sender = dot_parse_int(agget(edge,(char*)"sender")); - //int reciever = dot_parse_int(agget(edge,(char*)"reciever")); + XBT_DEBUG("size : %e, get size : %s", size, agget(edge, (char *) "size")); + if (size > 0) { file = xbt_dict_get_or_null(files, name); if (file == NULL) { @@ -260,7 +371,7 @@ void dot_add_input_dependencies(SD_task_t current_job, Agedge_t * edge) xbt_dict_set(files, name, file, &dot_task_free); } else { if (SD_task_get_amount(file) != size) { - WARN3("Ignoring file %s size redefinition from %.0f to %.0f", + XBT_WARN("Ignoring file %s size redefinition from %.0f to %.0f", name, SD_task_get_amount(file), size); } } @@ -271,6 +382,7 @@ void dot_add_input_dependencies(SD_task_t current_job, Agedge_t * edge) SD_task_dependency_add(NULL, NULL, file, current_job); } } + free(name); } /* dot_add_output_dependencies create the dependencies between a @@ -280,14 +392,13 @@ void dot_add_input_dependencies(SD_task_t current_job, Agedge_t * edge) void dot_add_output_dependencies(SD_task_t current_job, Agedge_t * edge) { SD_task_t file; - char name[80]; - sprintf(name, "%s->%s", agnameof(agtail(edge)), agnameof(aghead(edge))); + char *name_tail=agnameof(agtail(edge)); + char *name_head=agnameof(aghead(edge)); + char *name = malloc((strlen(name_head)+strlen(name_tail)+3)*sizeof(char)); + sprintf(name, "%s->%s", name_tail, name_head); double size = dot_parse_double(agget(edge, (char *) "size")); - INFO2("size : %e, get size : %s", size, agget(edge, (char *) "size")); - //int sender = dot_parse_int(agget(edge,(char*)"sender")); - //int reciever = dot_parse_int(agget(edge,(char*)"reciever")); + XBT_DEBUG("size : %e, get size : %s", size, agget(edge, (char *) "size")); - //INFO2("See ",A_dot__uses_file,(is_input?"in":"out")); if (size > 0) { file = xbt_dict_get_or_null(files, name); if (file == NULL) { @@ -298,13 +409,13 @@ void dot_add_output_dependencies(SD_task_t current_job, Agedge_t * edge) xbt_dict_set(files, name, file, &dot_task_free); } else { if (SD_task_get_amount(file) != size) { - WARN3("Ignoring file %s size redefinition from %.0f to %.0f", + XBT_WARN("Ignoring file %s size redefinition from %.0f to %.0f", name, SD_task_get_amount(file), size); } } SD_task_dependency_add(NULL, NULL, current_job, file); if (xbt_dynar_length(file->tasks_before) > 1) { - WARN1("File %s created at more than one location...", file->name); + XBT_WARN("File %s created at more than one location...", file->name); } } else { file = xbt_dict_get_or_null(jobs, agnameof(aghead(edge))); @@ -312,4 +423,5 @@ void dot_add_output_dependencies(SD_task_t current_job, Agedge_t * edge) SD_task_dependency_add(NULL, NULL, current_job, file); } } + free(name); }