Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Taking latencies into account (but not for the bandwidth limitation yet).
[simgrid.git] / src / surf / network.c
1 /* Authors: Arnaud Legrand                                                  */
2
3 /* This program is free software; you can redistribute it and/or modify it
4    under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "network_private.h"
7 #include "xbt/dict.h"
8
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(network, surf,
10                                 "Logging specific to the SURF network module");
11
12 surf_network_resource_t surf_network_resource = NULL;
13
14 static xbt_dict_t network_link_set = NULL;
15 static xbt_dict_t network_card_set = NULL;
16
17 static int card_number = 0;
18 static network_link_t **routing_table = NULL;
19 static int *routing_table_size = NULL;
20
21 #define ROUTE(i,j) routing_table[(i)+(j)*card_number]
22 #define ROUTE_SIZE(i,j) routing_table_size[(i)+(j)*card_number]
23
24 static void create_routing_table(void)
25 {
26   routing_table = xbt_new0(network_link_t *,card_number*card_number);
27   routing_table_size = xbt_new0(int,card_number*card_number);
28 }
29
30 static void network_link_free(void *nw_link)
31 {
32   xbt_free(nw_link);
33 }
34
35 static network_link_t network_link_new(const char *name,
36                                        xbt_maxmin_float_t bw_initial,
37                                        tmgr_trace_t bw_trace,
38                                        xbt_maxmin_float_t lat_initial,
39                                        tmgr_trace_t lat_trace,
40                                        e_surf_network_link_state_t state_initial,
41                                        tmgr_trace_t state_trace)
42 {
43   network_link_t nw_link = xbt_new0(s_network_link_t, 1);
44
45   
46   nw_link->resource = (surf_resource_t) surf_network_resource;
47   nw_link->name = name;
48   nw_link->bw_current = bw_initial;
49   if (bw_trace)
50     nw_link->bw_event =
51         tmgr_history_add_trace(history, bw_trace, 0.0, 0, nw_link);
52   nw_link->lat_current = lat_initial;
53   if (lat_trace)
54     nw_link->lat_event =
55         tmgr_history_add_trace(history, lat_trace, 0.0, 0, nw_link);
56   nw_link->state_current = state_initial;
57   if (state_trace)
58     nw_link->state_event =
59         tmgr_history_add_trace(history, state_trace, 0.0, 0, nw_link);
60
61   nw_link->constraint =
62       lmm_constraint_new(maxmin_system, nw_link,
63                          nw_link->bw_current);
64
65   xbt_dict_set(network_link_set, name, nw_link, network_link_free);
66
67   return nw_link;
68 }
69
70 static int network_card_new(const char *card_name)
71 {
72   network_card_t card = NULL;
73
74   xbt_dict_get(network_card_set, card_name, (void *) &card);
75
76   if(!card) {
77     card = xbt_new0(s_network_card_t,1);
78     card->name=xbt_strdup(card_name);
79     card->id=card_number++;
80     xbt_dict_set(network_card_set, card_name, card, NULL);
81   } 
82   return card->id;
83 }
84
85 static void route_new(int src_id, int dst_id,
86                       char* *links, int nb_link)
87 {
88   network_link_t *link_list = NULL;
89   int i ; 
90
91   ROUTE_SIZE(src_id,dst_id) = nb_link;
92   link_list = (ROUTE(src_id,dst_id) = xbt_new0(network_link_t,nb_link));
93   for(i=0; i < nb_link; i++) {
94     xbt_dict_get(network_link_set,links[i], (void *) &(link_list[i]));
95     xbt_free(links[i]);
96   }
97   xbt_free(links);  
98 }
99
100 /*  
101    Semantic:  name       initial   bandwidth   initial   latency    initial     state
102                         bandwidth    trace     latency    trace      state      trace
103    
104    Token:   TOKEN_WORD TOKEN_WORD TOKEN_WORD TOKEN_WORD TOKEN_WORD TOKEN_WORD TOKEN_WORD
105    Type:     string       float      string    float      string     ON/OFF     string
106 */
107 static void parse_network_link(void)
108 {
109   e_surf_token_t token;
110   char *name;
111   xbt_maxmin_float_t bw_initial;
112   tmgr_trace_t bw_trace;
113   xbt_maxmin_float_t lat_initial;
114   tmgr_trace_t lat_trace;
115   e_surf_network_link_state_t state_initial;
116   tmgr_trace_t state_trace;
117
118   name = xbt_strdup(surf_parse_text);
119
120   surf_parse_float(&bw_initial);
121   surf_parse_trace(&bw_trace);
122   surf_parse_float(&lat_initial);
123   surf_parse_trace(&lat_trace);
124
125   token = surf_parse();         /* state_initial */
126   xbt_assert1((token == TOKEN_WORD), "Parse error line %d", line_pos);
127   if (strcmp(surf_parse_text, "ON") == 0)
128     state_initial = SURF_NETWORK_LINK_ON;
129   else if (strcmp(surf_parse_text, "OFF") == 0)
130     state_initial = SURF_NETWORK_LINK_OFF;
131   else {
132     CRITICAL2("Invalid cpu state (line %d): %s neq ON or OFF\n", line_pos,
133               surf_parse_text);
134     xbt_abort();
135   }
136
137   surf_parse_trace(&state_trace);
138
139   network_link_new(name, bw_initial, bw_trace, 
140                    lat_initial, lat_trace,
141                    state_initial, state_trace);  
142 }
143
144 /*  
145    Semantic:  source   destination           network
146                                               links
147    
148    Token:   TOKEN_WORD TOKEN_WORD TOKEN_LP TOKEN_WORD* TOKEN_RP
149    Type:     string       string             string
150 */
151 static void parse_route(int fake)
152 {
153   int src_id = -1;
154   int dst_id = -1;
155   int nb_link = 0;
156   char **link_name = NULL;
157   e_surf_token_t token;
158
159   src_id = network_card_new(surf_parse_text);
160
161   token = surf_parse();
162   xbt_assert1((token == TOKEN_WORD), "Parse error line %d", line_pos);  
163   dst_id = network_card_new(surf_parse_text);
164
165   token = surf_parse();
166   xbt_assert1((token == TOKEN_LP), "Parse error line %d", line_pos);  
167   
168   while((token = surf_parse())==TOKEN_WORD) {
169     if(!fake) {
170       nb_link++;
171       link_name=xbt_realloc(link_name, (nb_link) * sizeof(char*));
172       link_name[(nb_link)-1]=xbt_strdup(surf_parse_text);
173     }
174   }
175   xbt_assert1((token == TOKEN_RP), "Parse error line %d", line_pos);  
176
177   if(!fake) route_new(src_id,dst_id,link_name, nb_link);
178 }
179
180 static void parse_file(const char *file)
181 {
182   e_surf_token_t token;
183
184   /* Figuring out the network links in the system */
185   find_section(file, "NETWORK");
186   while (1) {
187     token = surf_parse();
188
189     if (token == TOKEN_END_SECTION)
190       break;
191     if (token == TOKEN_NEWLINE)
192       continue;
193
194     if (token == TOKEN_WORD)
195       parse_network_link();
196     else {
197       CRITICAL1("Parse error line %d\n", line_pos);
198       xbt_abort();
199     }
200   }
201   close_section("NETWORK");
202
203   /* Figuring out the network cards used */
204   find_section(file, "ROUTE");
205   while (1) {
206     token = surf_parse();
207
208     if (token == TOKEN_END_SECTION)
209       break;
210     if (token == TOKEN_NEWLINE)
211       continue;
212
213     if (token == TOKEN_WORD)
214       parse_route(1);
215     else {
216       CRITICAL1("Parse error line %d\n", line_pos);
217       xbt_abort();
218     }
219   }
220   close_section("ROUTE");
221
222   create_routing_table();
223
224   /* Building the routes */
225   find_section(file, "ROUTE");
226   while (1) {
227     token = surf_parse();
228
229     if (token == TOKEN_END_SECTION)
230       break;
231     if (token == TOKEN_NEWLINE)
232       continue;
233
234     if (token == TOKEN_WORD)
235       parse_route(0);
236     else {
237       CRITICAL1("Parse error line %d\n", line_pos);
238       xbt_abort();
239     }
240   }
241   close_section("ROUTE");
242 }
243
244 static void *name_service(const char *name)
245 {
246   network_card_t card = NULL;
247
248   xbt_dict_get(network_card_set, name, (void *) &card);
249
250   return card;
251 }
252
253 static const char *get_resource_name(void *resource_id)
254 {
255   return ((network_card_t) resource_id)->name;
256 }
257
258 static int resource_used(void *resource_id)
259 {
260   return lmm_constraint_used(maxmin_system,
261                              ((network_link_t) resource_id)->constraint);
262 }
263
264 static void action_free(surf_action_t action)
265 {
266   surf_action_network_t Action = (surf_action_network_t) action;
267
268   xbt_swag_remove(action, action->state_set);
269   lmm_variable_free(maxmin_system, Action->variable);
270   xbt_free(action);
271
272   return;
273 }
274
275 static void action_cancel(surf_action_t action)
276 {
277   return;
278 }
279
280 static void action_recycle(surf_action_t action)
281 {
282   return;
283 }
284
285 static void action_change_state(surf_action_t action,
286                                 e_surf_action_state_t state)
287 {
288   surf_action_change_state(action, state);
289   return;
290 }
291
292 static xbt_heap_float_t share_resources(xbt_heap_float_t now)
293 {
294   surf_action_network_t action = NULL;
295   xbt_swag_t running_actions =
296       surf_network_resource->common_public->states.running_action_set;
297   xbt_maxmin_float_t min = -1;
298   xbt_maxmin_float_t value = -1;
299   lmm_solve(maxmin_system);
300
301   action = xbt_swag_getFirst(running_actions);
302   if (!action)
303     return -1.0;
304   value = lmm_variable_getvalue(action->variable);
305   min = action->generic_action.remains / value;
306
307   xbt_swag_foreach(action, running_actions) {
308     value = action->latency + (action->generic_action.remains /
309         lmm_variable_getvalue(action->variable));
310     if (value < min)
311       min = value;
312   }
313
314   return min;
315 }
316
317
318 static void update_actions_state(xbt_heap_float_t now,
319                                  xbt_heap_float_t delta)
320 {
321   xbt_heap_float_t deltap = 0.0;
322   surf_action_network_t action = NULL;
323   surf_action_network_t next_action = NULL;
324   xbt_swag_t running_actions =
325       surf_network_resource->common_public->states.running_action_set;
326   xbt_swag_t failed_actions =
327       surf_network_resource->common_public->states.failed_action_set;
328
329   xbt_swag_foreach_safe(action, next_action, running_actions) {
330     deltap = delta;
331     if(action->latency>0) {
332       if(action->latency>deltap) {
333         action->latency-=deltap;
334         deltap = 0.0;
335       } else {
336         deltap -= action->latency;
337         action->latency = 0.0;
338       }
339     }
340     action->generic_action.remains -=
341       lmm_variable_getvalue(action->variable) * deltap;
342
343 /*     if(action->generic_action.remains<.00001) action->generic_action.remains=0; */
344
345     if (action->generic_action.remains <= 0) {
346       action_change_state((surf_action_t) action, SURF_ACTION_DONE);
347     } else {                    /* Need to check that none of the resource has failed */
348       lmm_constraint_t cnst = NULL;
349       int i = 0;
350       network_link_t nw_link = NULL;
351
352       while ((cnst =
353               lmm_get_cnst_from_var(maxmin_system, action->variable,
354                                     i++))) {
355         nw_link = lmm_constraint_id(cnst);
356         if (nw_link->state_current == SURF_NETWORK_LINK_OFF) {
357           action_change_state((surf_action_t) action, SURF_ACTION_FAILED);
358           break;
359         }
360       }
361     }
362   }
363
364   xbt_swag_foreach_safe(action, next_action, failed_actions) {
365     lmm_variable_disable(maxmin_system, action->variable);
366   }
367
368   return;
369 }
370
371 static void update_resource_state(void *id,
372                                   tmgr_trace_event_t event_type,
373                                   xbt_maxmin_float_t value)
374 {
375   network_link_t nw_link = id;
376
377 /*   printf("[" XBT_HEAP_FLOAT_T "] Asking to update network card \"%s\" with value " */
378 /*       XBT_MAXMIN_FLOAT_T " for event %p\n", surf_get_clock(), nw_link->name, */
379 /*       value, event_type); */
380
381   if (event_type == nw_link->bw_event) {
382     nw_link->bw_current = value;
383     lmm_update_constraint_bound(maxmin_system, nw_link->constraint,
384                                 nw_link->bw_current);
385   } else if (event_type == nw_link->lat_event) {
386     nw_link->lat_current = value;
387   } else if (event_type == nw_link->state_event) {
388     if (value > 0)
389       nw_link->state_current = SURF_NETWORK_LINK_ON;
390     else
391       nw_link->state_current = SURF_NETWORK_LINK_OFF;
392   } else {
393     CRITICAL0("Unknown event ! \n");
394     xbt_abort();
395   }
396
397   return;
398 }
399
400 static surf_action_t communicate(void *src, void *dst,
401                                  xbt_maxmin_float_t size)
402 {
403   surf_action_network_t action = NULL;
404   network_card_t card_src = src;
405   network_card_t card_dst = dst;
406   int route_size = ROUTE_SIZE(card_src->id,card_dst->id);
407   network_link_t *route = ROUTE(card_src->id,card_dst->id);
408   int i;
409
410   action = xbt_new0(s_surf_action_network_t, 1);
411
412   action->generic_action.cost = size;
413   action->generic_action.remains = size;
414   action->generic_action.start = -1.0;
415   action->generic_action.finish = -1.0;
416   action->generic_action.callback = NULL;
417   action->generic_action.resource_type =
418       (surf_resource_t) surf_network_resource;
419
420   action->generic_action.state_set =
421     surf_network_resource->common_public->states.running_action_set;
422
423   xbt_swag_insert(action, action->generic_action.state_set);
424
425   action->variable = lmm_variable_new(maxmin_system, action, 1.0, -1.0, 
426                                       route_size);
427   for(i=0; i<route_size; i++)
428     lmm_expand(maxmin_system, route[i]->constraint, action->variable,
429                1.0);
430
431   action->latency = 0.0;
432   for(i=0; i<route_size; i++)
433     action->latency += route[i]->lat_current;
434
435   return (surf_action_t) action;
436 }
437
438 static void finalize(void)
439 {
440   xbt_dict_free(&network_card_set);
441   xbt_dict_free(&network_link_set);
442   xbt_swag_free(surf_network_resource->common_public->states.ready_action_set);
443   xbt_swag_free(surf_network_resource->common_public->states.
444                 running_action_set);
445   xbt_swag_free(surf_network_resource->common_public->states.
446                 failed_action_set);
447   xbt_swag_free(surf_network_resource->common_public->states.done_action_set);
448   xbt_free(surf_network_resource->common_public);
449   xbt_free(surf_network_resource->common_private);
450   xbt_free(surf_network_resource->extension_public);
451
452   xbt_free(surf_network_resource);
453   surf_network_resource = NULL;
454 }
455
456 static void surf_network_resource_init_internal(void)
457 {
458   s_surf_action_t action;
459
460   surf_network_resource = xbt_new0(s_surf_network_resource_t, 1);
461
462   surf_network_resource->common_private =
463       xbt_new0(s_surf_resource_private_t, 1);
464   surf_network_resource->common_public =
465       xbt_new0(s_surf_resource_public_t, 1);
466 /*   surf_network_resource->extension_private = xbt_new0(s_surf_network_resource_extension_private_t,1); */
467   surf_network_resource->extension_public =
468       xbt_new0(s_surf_network_resource_extension_public_t, 1);
469
470   surf_network_resource->common_public->states.ready_action_set =
471       xbt_swag_new(xbt_swag_offset(action, state_hookup));
472   surf_network_resource->common_public->states.running_action_set =
473       xbt_swag_new(xbt_swag_offset(action, state_hookup));
474   surf_network_resource->common_public->states.failed_action_set =
475       xbt_swag_new(xbt_swag_offset(action, state_hookup));
476   surf_network_resource->common_public->states.done_action_set =
477       xbt_swag_new(xbt_swag_offset(action, state_hookup));
478
479   surf_network_resource->common_public->name_service = name_service;
480   surf_network_resource->common_public->get_resource_name =
481       get_resource_name;
482   surf_network_resource->common_public->action_get_state =
483       surf_action_get_state;
484   surf_network_resource->common_public->action_free = action_free;
485   surf_network_resource->common_public->action_cancel = action_cancel;
486   surf_network_resource->common_public->action_recycle = action_recycle;
487   surf_network_resource->common_public->action_change_state =
488       action_change_state;
489
490   surf_network_resource->common_private->resource_used = resource_used;
491   surf_network_resource->common_private->share_resources = share_resources;
492   surf_network_resource->common_private->update_actions_state =
493       update_actions_state;
494   surf_network_resource->common_private->update_resource_state =
495       update_resource_state;
496   surf_network_resource->common_private->finalize = finalize;
497
498   surf_network_resource->extension_public->communicate = communicate;
499
500   network_link_set = xbt_dict_new();
501   network_card_set = xbt_dict_new();
502
503   xbt_assert0(maxmin_system, "surf_init has to be called first!");
504 }
505
506 void surf_network_resource_init(const char *filename)
507 {
508   surf_network_resource_init_internal();
509   parse_file(filename);
510   xbt_dynar_push(resource_list, &surf_network_resource);
511 }