From 5565b4a70104f361b3b457df696904aacead25ba Mon Sep 17 00:00:00 2001 From: alegrand Date: Thu, 13 Sep 2007 14:27:20 +0000 Subject: [PATCH] * full complience with execute_parallel_task * add support for a bound based on latencies. git-svn-id: svn+ssh://scm.gforge.inria.fr/svn/simgrid/simgrid/trunk@4622 48e7efb5-ca39-0410-a469-dd3cf9ba447f --- src/surf/workstation_ptask_L07.c | 83 ++++++++++++++++++++++++++++---- 1 file changed, 74 insertions(+), 9 deletions(-) diff --git a/src/surf/workstation_ptask_L07.c b/src/surf/workstation_ptask_L07.c index 232558ea21..ba2f737386 100644 --- a/src/surf/workstation_ptask_L07.c +++ b/src/surf/workstation_ptask_L07.c @@ -60,6 +60,10 @@ typedef struct s_route_L07 { typedef struct surf_action_workstation_L07 { s_surf_action_t generic_action; lmm_variable_t variable; + int workstation_nb; + cpu_L07_t *workstation_list; + double *computation_amount; + double *communication_amount; double latency; double rate; int suspended; @@ -75,6 +79,44 @@ static network_link_L07_t loopback = NULL; static xbt_dict_t parallel_task_network_link_set = NULL; lmm_system_t ptask_maxmin_system = NULL; + +static void update_action_bound(surf_action_workstation_L07_t action) +{ + int workstation_nb = action->workstation_nb; + double lat_current = 0.0; + double lat_bound = -1.0; + int i, j, k; + + for (i = 0; i < workstation_nb; i++) { + for (j = 0; j < workstation_nb; j++) { + cpu_L07_t card_src = action->workstation_list[i]; + cpu_L07_t card_dst = action->workstation_list[j]; + int route_size = ROUTE(card_src->id, card_dst->id).size; + network_link_L07_t *route = ROUTE(card_src->id, card_dst->id).links; + double lat = 0.0; + + if (action->communication_amount[i * workstation_nb + j] > 0) { + for (k = 0; k < route_size; k++) { + lat += route[k]->lat_current; + xbt_dict_set(parallel_task_network_link_set, route[k]->name, + route[k], NULL); + } + lat_current=MAX(lat_current,lat/action->communication_amount[i * workstation_nb + j]); + } + } + } + lat_bound = SG_TCP_CTE_GAMMA / (2.0 * lat_current); + DEBUG2("action (%p) : lat_bound = %g", action, lat_bound); + if ((action->latency == 0.0) && (action->suspended == 0)) { + if (action->rate < 0) + lmm_update_variable_bound(ptask_maxmin_system, action->variable, + lat_bound); + else + lmm_update_variable_bound(ptask_maxmin_system, action->variable, + min(action->rate,lat_bound)); + } +} + /**************************************/ /******* Resource Public **********/ /**************************************/ @@ -110,6 +152,9 @@ static int action_free(surf_action_t action) lmm_variable_free(ptask_maxmin_system, ((surf_action_workstation_L07_t) action)-> variable); + free(((surf_action_workstation_L07_t)action)->workstation_list); + free(((surf_action_workstation_L07_t)action)->communication_amount); + free(((surf_action_workstation_L07_t)action)->computation_amount); free(action); return 1; } @@ -149,8 +194,7 @@ static void action_resume(surf_action_t action) XBT_IN1("(%p)", act); if (act->suspended != 2) { - lmm_update_variable_weight(ptask_maxmin_system, - act->variable, 1.0); + lmm_update_variable_weight(ptask_maxmin_system,action->variable, 1.0); act->suspended = 0; } XBT_OUT; @@ -241,7 +285,8 @@ static void update_actions_state(double now, double delta) action->latency = 0.0; } if ((action->latency == 0.0) && (action->suspended == 0)) { - lmm_update_variable_weight(ptask_maxmin_system, action->variable, 1.0); + update_action_bound(action); + lmm_update_variable_weight(ptask_maxmin_system,action->variable, 1.0); } } DEBUG3("Action (%p) : remains (%g) updated by %g.", @@ -321,6 +366,19 @@ static void update_resource_state(void *id, nw_link->bw_current = value; lmm_update_constraint_bound(ptask_maxmin_system, nw_link->constraint, nw_link->bw_current); + } else if (event_type == nw_link->lat_event) { + lmm_variable_t var = NULL; + surf_action_workstation_L07_t action = NULL; + + nw_link->lat_current = value; + while (lmm_get_var_from_cnst + (ptask_maxmin_system, nw_link->constraint, &var)) { + + + action = lmm_variable_id(var); + update_action_bound(action); + } + } else if (event_type == nw_link->state_event) { if (value > 0) nw_link->state_current = SURF_NETWORK_LINK_ON; @@ -466,6 +524,10 @@ static surf_action_t execute_parallel_task(int workstation_nb, (surf_model_t) surf_workstation_model; action->suspended = 0; /* Should be useless because of the calloc but it seems to help valgrind... */ + action->workstation_nb = workstation_nb; + action->workstation_list = (cpu_L07_t *)workstation_list; + action->computation_amount = computation_amount; + action->communication_amount = communication_amount; action->latency = latency; action->generic_action.state_set = surf_workstation_model->common_public->states.running_action_set; @@ -517,9 +579,16 @@ static surf_action_t execute_parallel_task(int workstation_nb, static surf_action_t execute(void *cpu, double size) { - double val = 0.0; + void **workstation_list = xbt_new0(void *, 1); + double *computation_amount = xbt_new0(double, 1); + double *communication_amount = xbt_new0(double, 1); + + workstation_list[0] = cpu; + communication_amount[0] = 0.0; + computation_amount[0] = size; - return execute_parallel_task(1, &cpu, &size, &val, 1, -1); + return execute_parallel_task(1, workstation_list, communication_amount, + communication_amount, 1, -1); } static surf_action_t communicate(void *src, void *dst, double size, @@ -538,10 +607,6 @@ static surf_action_t communicate(void *src, void *dst, double size, computation_amount, communication_amount, 1, rate); - free(computation_amount); - free(communication_amount); - free(workstation_list); - return res; } -- 2.20.1