From 3196e54e1f004f07f2348d059f351b95146732ef Mon Sep 17 00:00:00 2001 From: alegrand Date: Fri, 22 Jul 2005 18:19:43 +0000 Subject: [PATCH] adding parallel task to SURF workstations git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@1557 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/include/surf/surf.h | 6 ++ src/surf/workstation.c | 165 +++++++++++++++++++++++++++++++- src/surf/workstation_KCCFLN05.c | 17 +++- src/surf/workstation_private.h | 7 ++ 4 files changed, 190 insertions(+), 5 deletions(-) diff --git a/src/include/surf/surf.h b/src/include/surf/surf.h index 9509d2c78b..147b84f6b1 100644 --- a/src/include/surf/surf.h +++ b/src/include/surf/surf.h @@ -154,6 +154,12 @@ typedef struct surf_workstation_resource_extension_public { surf_action_t(*communicate) (void *workstation_src, void *workstation_dst, double size, double max_rate); + surf_action_t(*execute_parallel_task) (int workstation_nb, + void **workstation_list, + double *computation_amount, + double *communication_amount, + double amount, + double rate); } s_surf_workstation_resource_extension_public_t, *surf_workstation_resource_extension_public_t; diff --git a/src/surf/workstation.c b/src/surf/workstation.c index 2949934cef..c23662912b 100644 --- a/src/surf/workstation.c +++ b/src/surf/workstation.c @@ -38,7 +38,7 @@ static void workstation_free(void *workstation) static void create_workstations(void) { - xbt_dict_cursor_t cursor = NULL; + xbt_dict_cursor_t cursor = NULL; char *name = NULL; void *cpu = NULL; void *nw_card = NULL; @@ -73,34 +73,57 @@ static int resource_used(void *resource_id) return 0; } +static int parallel_action_free(surf_action_t action) +{ + action->using--; + if(!action->using) { + xbt_swag_remove(action, action->state_set); + if(((surf_action_parallel_task_CSL05_t)action)->variable) + lmm_variable_free(maxmin_system, ((surf_action_parallel_task_CSL05_t)action)->variable); + free(action); + return 1; + } + return 0; +} + +static void parallel_action_use(surf_action_t action) +{ + action->using++; +} + static int action_free(surf_action_t action) { if(action->resource_type==(surf_resource_t)surf_network_resource) return surf_network_resource->common_public->action_free(action); else if(action->resource_type==(surf_resource_t)surf_cpu_resource) return surf_cpu_resource->common_public->action_free(action); + else if(action->resource_type==(surf_resource_t)surf_workstation_resource) + return parallel_action_free(action); else DIE_IMPOSSIBLE; return 0; } - static void action_use(surf_action_t action) { if(action->resource_type==(surf_resource_t)surf_network_resource) surf_network_resource->common_public->action_use(action); else if(action->resource_type==(surf_resource_t)surf_cpu_resource) surf_cpu_resource->common_public->action_use(action); + else if(action->resource_type==(surf_resource_t)surf_workstation_resource) + return parallel_action_use(action); else DIE_IMPOSSIBLE; return; } static void action_cancel(surf_action_t action) { + DIE_IMPOSSIBLE; return; } static void action_recycle(surf_action_t action) { + DIE_IMPOSSIBLE; return; } @@ -111,6 +134,8 @@ static void action_change_state(surf_action_t action, surf_network_resource->common_public->action_change_state(action,state); else if(action->resource_type==(surf_resource_t)surf_cpu_resource) surf_cpu_resource->common_public->action_change_state(action,state); + else if(action->resource_type==(surf_resource_t)surf_workstation_resource) + surf_action_change_state(action, state); else DIE_IMPOSSIBLE; return; } @@ -120,9 +145,57 @@ static double share_resources(double now) return -1.0; } - static void update_actions_state(double now, double delta) { + surf_action_parallel_task_CSL05_t action = NULL; + surf_action_parallel_task_CSL05_t next_action = NULL; + xbt_swag_t running_actions = + surf_workstation_resource->common_public->states.running_action_set; + xbt_swag_t failed_actions = + surf_workstation_resource->common_public->states.failed_action_set; + + xbt_swag_foreach_safe(action, next_action, running_actions) { + surf_double_update(&(action->generic_action.remains), + lmm_variable_getvalue(action->variable) * delta); + if (action->generic_action.max_duration != NO_MAX_DURATION) + surf_double_update(&(action->generic_action.max_duration), delta); + if ((action->generic_action.remains <= 0) && + (lmm_get_variable_weight(action->variable)>0)) { + action->generic_action.finish = surf_get_clock(); + action_change_state((surf_action_t) action, SURF_ACTION_DONE); + } else if ((action->generic_action.max_duration != NO_MAX_DURATION) && + (action->generic_action.max_duration <= 0)) { + action->generic_action.finish = surf_get_clock(); + action_change_state((surf_action_t) action, SURF_ACTION_DONE); + } else { /* Need to check that none of the resource has failed */ + lmm_constraint_t cnst = NULL; + int i = 0; + surf_resource_t resource = NULL; + + while ((cnst = + lmm_get_cnst_from_var(maxmin_system, action->variable, + i++))) { + resource = (surf_resource_t) lmm_constraint_id(cnst); + if(resource== (surf_resource_t) surf_cpu_resource) { + cpu_Cas01_t cpu = lmm_constraint_id(cnst); + if (cpu->state_current == SURF_CPU_OFF) { + action->generic_action.finish = surf_get_clock(); + action_change_state((surf_action_t) action, SURF_ACTION_FAILED); + break; + } + } else if (resource== (surf_resource_t) surf_network_resource) { + network_link_CM02_t nw_link = lmm_constraint_id(cnst); + + if (nw_link->state_current == SURF_NETWORK_LINK_OFF) { + action->generic_action.finish = surf_get_clock(); + action_change_state((surf_action_t) action, SURF_ACTION_FAILED); + break; + } + } + } + } + } + return; } @@ -196,6 +269,90 @@ static e_surf_cpu_state_t get_state(void *workstation) get_state(((workstation_CLM03_t) workstation)->cpu); } +static surf_action_t execute_parallel_task (int workstation_nb, + void **workstation_list, + double *computation_amount, + double *communication_amount, + double amount, + double rate) +{ + surf_action_parallel_task_CSL05_t action = NULL; + int i, j, k; + xbt_dict_t network_link_set = xbt_dict_new(); + xbt_dict_cursor_t cursor = NULL; + char *name = NULL; + int nb_link = 0; + network_link_CM02_t link; + + /* Compute the number of affected resources... */ + for(i=0; i< workstation_nb; i++) { + for(j=0; j< workstation_nb; j++) { + network_card_CM02_t card_src = ((workstation_CLM03_t*)workstation_list)[i]->network_card; + network_card_CM02_t card_dst = ((workstation_CLM03_t*)workstation_list)[j]->network_card; + int route_size = ROUTE_SIZE(card_src->id, card_dst->id); + network_link_CM02_t *route = ROUTE(card_src->id, card_dst->id); + + if(communication_amount[i*workstation_nb+j]>=0) + for(k=0; k< route_size; k++) { + xbt_dict_set(network_link_set, route[k]->name, route[k], NULL); + } + } + } + + xbt_dict_foreach(network_link_set, cursor, name, link) { + nb_link++; + } + + action = xbt_new0(s_surf_action_parallel_task_CSL05_t, 1); + action->generic_action.using = 1; + action->generic_action.cost = amount; + action->generic_action.remains = amount; + action->generic_action.max_duration = NO_MAX_DURATION; + action->generic_action.start = -1.0; + action->generic_action.finish = -1.0; + action->generic_action.resource_type = + (surf_resource_t) surf_workstation_resource; + action->suspended = 0; /* Should be useless because of the + calloc but it seems to help valgrind... */ + action->generic_action.state_set = + surf_network_resource->common_public->states.running_action_set; + + xbt_swag_insert(action, action->generic_action.state_set); + action->rate = rate; + + if(action->rate>0) + action->variable = lmm_variable_new(maxmin_system, action, 1.0, -1.0, + workstation_nb + nb_link); + else + action->variable = lmm_variable_new(maxmin_system, action, 1.0, action->rate, + workstation_nb + nb_link); + + if(nb_link + workstation_nb == 0) + action_change_state((surf_action_t) action, SURF_ACTION_DONE); + + for (i = 0; icpu)->constraint, + action->variable, computation_amount[i]); + + for (i=0; inetwork_card; + network_card_CM02_t card_dst = ((workstation_CLM03_t*)workstation_list)[j]->network_card; + int route_size = ROUTE_SIZE(card_src->id, card_dst->id); + network_link_CM02_t *route = ROUTE(card_src->id, card_dst->id); + + for(k=0; k< route_size; k++) { + if(communication_amount[i*workstation_nb+j]>=0) { + lmm_expand_add(maxmin_system, route[k]->constraint, + action->variable, communication_amount[i*workstation_nb+j]); + } + } + } + } + + return (surf_action_t) action; +} + static void finalize(void) { xbt_dict_free(&workstation_set); @@ -270,6 +427,8 @@ static void surf_workstation_resource_init_internal(void) surf_workstation_resource->extension_public->sleep = action_sleep; surf_workstation_resource->extension_public->get_state = get_state; surf_workstation_resource->extension_public->communicate = communicate; + surf_workstation_resource->extension_public->execute_parallel_task = + execute_parallel_task; workstation_set = xbt_dict_new(); diff --git a/src/surf/workstation_KCCFLN05.c b/src/surf/workstation_KCCFLN05.c index 3e0aa33724..956d49de51 100644 --- a/src/surf/workstation_KCCFLN05.c +++ b/src/surf/workstation_KCCFLN05.c @@ -16,7 +16,7 @@ static lmm_system_t maxmin_system_cpu_KCCFLN05 = NULL; static lmm_system_t maxmin_system_network_KCCFLN05 = NULL; static xbt_dict_t network_link_set = NULL; static int nb_workstation = 0; -s_route_KCCFLN05_t *routing_table = NULL; +static s_route_KCCFLN05_t *routing_table = NULL; #define ROUTE(i,j) routing_table[(i)+(j)*nb_workstation] /**************************************/ @@ -514,6 +514,17 @@ static surf_action_t communicate_KCCFLN05(void *src, void *dst, double size, return (surf_action_t) action; } +static surf_action_t execute_parallel_task_KCCFLN05 (int workstation_nb, + void **workstation_list, + double *computation_amount, + double *communication_amount, + double amount, + double rate) +{ + DIE_IMPOSSIBLE; + return NULL; +} + static void network_KCCFLN05_action_suspend(surf_action_t action) { lmm_update_variable_weight(maxmin_system_network_KCCFLN05, @@ -993,7 +1004,6 @@ static void network_KCCFLN05_resource_init_internal(void) network_link_set = xbt_dict_new(); - maxmin_system_network_KCCFLN05 = lmm_system_new(); } @@ -1063,6 +1073,9 @@ static void workstation_KCCFLN05_resource_init_internal(void) /*FIXME*//* surf_workstation_resource->extension_public->sleep = action_sleep; */ surf_workstation_resource->extension_public->get_state = get_state; surf_workstation_resource->extension_public->communicate = communicate_KCCFLN05; + surf_workstation_resource->extension_public->execute_parallel_task = + execute_parallel_task_KCCFLN05; + } /**************************************/ diff --git a/src/surf/workstation_private.h b/src/surf/workstation_private.h index ad7e9300d1..e25d18e1ec 100644 --- a/src/surf/workstation_private.h +++ b/src/surf/workstation_private.h @@ -18,4 +18,11 @@ typedef struct workstation_CLM03 { void *network_card; } s_workstation_CLM03_t, *workstation_CLM03_t; +typedef struct surf_action_parallel_task_CSL05 { + s_surf_action_t generic_action; + lmm_variable_t variable; + double rate; + int suspended; +} s_surf_action_parallel_task_CSL05_t, *surf_action_parallel_task_CSL05_t; + #endif /* _SURF_WORKSTATION_PRIVATE_H */ -- 2.20.1