Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
adding parallel task to SURF workstations
[simgrid.git] / src / surf / workstation.c
index 2949934..c236629 100644 (file)
@@ -38,7 +38,7 @@ static void workstation_free(void *workstation)
 
 static void create_workstations(void)
 {
-  xbt_dict_cursor_t cursor = NULL;
+   xbt_dict_cursor_t cursor = NULL;
   char *name = NULL;
   void *cpu = NULL;
   void *nw_card = NULL;
@@ -73,34 +73,57 @@ static int resource_used(void *resource_id)
   return 0;
 }
 
+static int parallel_action_free(surf_action_t action)
+{
+  action->using--;
+  if(!action->using) {
+    xbt_swag_remove(action, action->state_set);
+    if(((surf_action_parallel_task_CSL05_t)action)->variable)
+      lmm_variable_free(maxmin_system, ((surf_action_parallel_task_CSL05_t)action)->variable);
+    free(action);
+    return 1;
+  }
+  return 0;
+}
+
+static void parallel_action_use(surf_action_t action)
+{
+  action->using++;
+}
+
 static int action_free(surf_action_t action)
 {
   if(action->resource_type==(surf_resource_t)surf_network_resource) 
     return surf_network_resource->common_public->action_free(action);
   else if(action->resource_type==(surf_resource_t)surf_cpu_resource) 
     return surf_cpu_resource->common_public->action_free(action);
+  else if(action->resource_type==(surf_resource_t)surf_workstation_resource)
+    return parallel_action_free(action);
   else DIE_IMPOSSIBLE;
   return 0;
 }
 
-
 static void action_use(surf_action_t action)
 {
   if(action->resource_type==(surf_resource_t)surf_network_resource) 
     surf_network_resource->common_public->action_use(action);
   else if(action->resource_type==(surf_resource_t)surf_cpu_resource) 
     surf_cpu_resource->common_public->action_use(action);
+  else if(action->resource_type==(surf_resource_t)surf_workstation_resource)
+    return parallel_action_use(action);
   else DIE_IMPOSSIBLE;
   return;
 }
 
 static void action_cancel(surf_action_t action)
 {
+  DIE_IMPOSSIBLE;
   return;
 }
 
 static void action_recycle(surf_action_t action)
 {
+  DIE_IMPOSSIBLE;
   return;
 }
 
@@ -111,6 +134,8 @@ static void action_change_state(surf_action_t action,
     surf_network_resource->common_public->action_change_state(action,state);
   else if(action->resource_type==(surf_resource_t)surf_cpu_resource) 
     surf_cpu_resource->common_public->action_change_state(action,state);
+  else if(action->resource_type==(surf_resource_t)surf_workstation_resource)
+    surf_action_change_state(action, state);
   else DIE_IMPOSSIBLE;
   return;
 }
@@ -120,9 +145,57 @@ static double share_resources(double now)
   return -1.0;
 }
 
-
 static void update_actions_state(double now, double delta)
 {
+  surf_action_parallel_task_CSL05_t action = NULL;
+  surf_action_parallel_task_CSL05_t next_action = NULL;
+  xbt_swag_t running_actions =
+      surf_workstation_resource->common_public->states.running_action_set;
+  xbt_swag_t failed_actions =
+      surf_workstation_resource->common_public->states.failed_action_set;
+
+  xbt_swag_foreach_safe(action, next_action, running_actions) {
+    surf_double_update(&(action->generic_action.remains),
+       lmm_variable_getvalue(action->variable) * delta);
+    if (action->generic_action.max_duration != NO_MAX_DURATION)
+      surf_double_update(&(action->generic_action.max_duration), delta);
+    if ((action->generic_action.remains <= 0) && 
+       (lmm_get_variable_weight(action->variable)>0)) {
+      action->generic_action.finish = surf_get_clock();
+      action_change_state((surf_action_t) action, SURF_ACTION_DONE);
+    } else if ((action->generic_action.max_duration != NO_MAX_DURATION) &&
+              (action->generic_action.max_duration <= 0)) {
+      action->generic_action.finish = surf_get_clock();
+      action_change_state((surf_action_t) action, SURF_ACTION_DONE);
+    } else {                   /* Need to check that none of the resource has failed */
+      lmm_constraint_t cnst = NULL;
+      int i = 0;
+      surf_resource_t resource = NULL;
+
+      while ((cnst =
+             lmm_get_cnst_from_var(maxmin_system, action->variable,
+                                   i++))) {
+       resource = (surf_resource_t) lmm_constraint_id(cnst);
+       if(resource== (surf_resource_t) surf_cpu_resource) {
+         cpu_Cas01_t cpu = lmm_constraint_id(cnst);
+         if (cpu->state_current == SURF_CPU_OFF) {
+           action->generic_action.finish = surf_get_clock();
+           action_change_state((surf_action_t) action, SURF_ACTION_FAILED);
+           break;
+         }
+       } else if (resource== (surf_resource_t) surf_network_resource) {
+         network_link_CM02_t nw_link = lmm_constraint_id(cnst);
+
+         if (nw_link->state_current == SURF_NETWORK_LINK_OFF) {
+           action->generic_action.finish = surf_get_clock();
+           action_change_state((surf_action_t) action, SURF_ACTION_FAILED);
+           break;
+         }
+       } 
+      }
+    }
+  }
+
   return;
 }
 
@@ -196,6 +269,90 @@ static e_surf_cpu_state_t get_state(void *workstation)
       get_state(((workstation_CLM03_t) workstation)->cpu);
 }
 
+static surf_action_t execute_parallel_task (int workstation_nb,
+                                           void **workstation_list,
+                                           double *computation_amount,
+                                           double *communication_amount,
+                                           double amount,
+                                           double rate)
+{
+  surf_action_parallel_task_CSL05_t action = NULL;
+  int i, j, k;
+  xbt_dict_t network_link_set = xbt_dict_new();
+  xbt_dict_cursor_t cursor = NULL;
+  char *name = NULL;
+  int nb_link = 0;
+  network_link_CM02_t link;
+
+  /* Compute the number of affected resources... */
+  for(i=0; i< workstation_nb; i++) {
+    for(j=0; j< workstation_nb; j++) {
+      network_card_CM02_t card_src = ((workstation_CLM03_t*)workstation_list)[i]->network_card;
+      network_card_CM02_t card_dst = ((workstation_CLM03_t*)workstation_list)[j]->network_card;
+      int route_size = ROUTE_SIZE(card_src->id, card_dst->id);
+      network_link_CM02_t *route = ROUTE(card_src->id, card_dst->id);
+      
+      if(communication_amount[i*workstation_nb+j]>=0)
+       for(k=0; k< route_size; k++) {
+         xbt_dict_set(network_link_set, route[k]->name, route[k], NULL);
+       }
+    }
+  }
+  xbt_dict_foreach(network_link_set, cursor, name, link) {
+    nb_link++;
+  }
+  
+  action = xbt_new0(s_surf_action_parallel_task_CSL05_t, 1);
+  action->generic_action.using = 1;
+  action->generic_action.cost = amount;
+  action->generic_action.remains = amount;
+  action->generic_action.max_duration = NO_MAX_DURATION;
+  action->generic_action.start = -1.0;
+  action->generic_action.finish = -1.0;
+  action->generic_action.resource_type =
+      (surf_resource_t) surf_workstation_resource;
+  action->suspended = 0;  /* Should be useless because of the
+                            calloc but it seems to help valgrind... */
+  action->generic_action.state_set =
+      surf_network_resource->common_public->states.running_action_set;
+
+  xbt_swag_insert(action, action->generic_action.state_set);
+  action->rate = rate;
+
+  if(action->rate>0)
+    action->variable = lmm_variable_new(maxmin_system, action, 1.0, -1.0,
+                                       workstation_nb + nb_link);
+  else   
+    action->variable = lmm_variable_new(maxmin_system, action, 1.0, action->rate,
+                                       workstation_nb + nb_link);
+
+  if(nb_link + workstation_nb == 0)
+    action_change_state((surf_action_t) action, SURF_ACTION_DONE);
+
+  for (i = 0; i<workstation_nb; i++)
+    lmm_expand(maxmin_system, ((cpu_Cas01_t) ((workstation_CLM03_t) workstation_list[i])->cpu)->constraint, 
+              action->variable, computation_amount[i]);
+
+  for (i=0; i<workstation_nb; i++) {
+    for(j=0; j< workstation_nb; j++) {
+      network_card_CM02_t card_src = ((workstation_CLM03_t*)workstation_list)[i]->network_card;
+      network_card_CM02_t card_dst = ((workstation_CLM03_t*)workstation_list)[j]->network_card;
+      int route_size = ROUTE_SIZE(card_src->id, card_dst->id);
+      network_link_CM02_t *route = ROUTE(card_src->id, card_dst->id);
+      
+      for(k=0; k< route_size; k++) {
+       if(communication_amount[i*workstation_nb+j]>=0) {
+         lmm_expand_add(maxmin_system, route[k]->constraint, 
+                      action->variable, communication_amount[i*workstation_nb+j]);
+       }
+      }
+    }
+  }
+  
+  return (surf_action_t) action;
+}
+
 static void finalize(void)
 {
   xbt_dict_free(&workstation_set);
@@ -270,6 +427,8 @@ static void surf_workstation_resource_init_internal(void)
   surf_workstation_resource->extension_public->sleep = action_sleep;
   surf_workstation_resource->extension_public->get_state = get_state;
   surf_workstation_resource->extension_public->communicate = communicate;
+  surf_workstation_resource->extension_public->execute_parallel_task = 
+    execute_parallel_task;
 
   workstation_set = xbt_dict_new();