Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
2179b1790cb357d1b17642b240a63582585733bc
[simgrid.git] / src / surf / ptask_L07.cpp
1 /* Copyright (c) 2007-2010, 2013-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <cstdlib>
8
9 #include <algorithm>
10
11 #include "ptask_L07.hpp"
12
13 #include "cpu_interface.hpp"
14 #include "surf_routing.hpp"
15 #include "xbt/lib.h"
16
17 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(surf_host);
18 XBT_LOG_EXTERNAL_CATEGORY(xbt_cfg);
19
20 /**************************************/
21 /*** Resource Creation & Destruction **/
22 /**************************************/
23
24 static void ptask_netlink_parse_init(sg_platf_link_cbarg_t link)
25 {
26   netlink_parse_init(link);
27   current_property_set = NULL;
28 }
29
30 void surf_host_model_init_ptask_L07(void)
31 {
32   XBT_CINFO(xbt_cfg,"Switching to the L07 model to handle parallel tasks.");
33   xbt_assert(!surf_cpu_model_pm, "CPU model type already defined");
34   xbt_assert(!surf_network_model, "network model type already defined");
35
36   // Define the callbacks to parse the XML
37   simgrid::surf::on_link.connect(ptask_netlink_parse_init);
38
39   surf_host_model = new simgrid::surf::HostL07Model();
40   xbt_dynar_push(all_existing_models, &surf_host_model);
41 }
42
43
44 namespace simgrid {
45 namespace surf {
46
47 HostL07Model::HostL07Model() : HostModel() {
48   p_maxminSystem = lmm_system_new(1);
49   surf_network_model = new NetworkL07Model(this,p_maxminSystem);
50   surf_cpu_model_pm = new CpuL07Model(this,p_maxminSystem);
51
52   routing_model_create(surf_network_model->createLink("__loopback__",
53                                                     498000000, NULL,
54                                                     0.000015, NULL,
55                                                     SURF_LINK_FATPIPE, NULL));
56 }
57
58 HostL07Model::~HostL07Model() {
59   delete surf_cpu_model_pm;
60   delete surf_network_model;
61 }
62
63 CpuL07Model::CpuL07Model(HostL07Model *hmodel,lmm_system_t sys)
64   : CpuModel()
65   , p_hostModel(hmodel)
66   {
67     p_maxminSystem = sys;
68   }
69 CpuL07Model::~CpuL07Model() {
70   surf_cpu_model_pm = NULL;
71   lmm_system_free(p_maxminSystem);
72   p_maxminSystem = NULL;
73 }
74 NetworkL07Model::NetworkL07Model(HostL07Model *hmodel, lmm_system_t sys)
75   : NetworkModel()
76   , p_hostModel(hmodel)
77   {
78     p_maxminSystem = sys;
79   }
80 NetworkL07Model::~NetworkL07Model()
81 {
82   surf_network_model = NULL;
83   p_maxminSystem = NULL; // Avoid multi-free
84 }
85
86
87 double HostL07Model::next_occuring_event(double /*now*/)
88 {
89   L07Action *action;
90
91   ActionList *running_actions = getRunningActionSet();
92   double min = this->shareResourcesMaxMin(running_actions,
93                                               p_maxminSystem,
94                                               bottleneck_solve);
95
96   for(ActionList::iterator it(running_actions->begin()), itend(running_actions->end())
97    ; it != itend ; ++it) {
98   action = static_cast<L07Action*>(&*it);
99     if (action->m_latency > 0) {
100       if (min < 0) {
101         min = action->m_latency;
102         XBT_DEBUG("Updating min (value) with %p (start %f): %f", action,
103                action->getStartTime(), min);
104       } else if (action->m_latency < min) {
105         min = action->m_latency;
106         XBT_DEBUG("Updating min (latency) with %p (start %f): %f", action,
107                action->getStartTime(), min);
108       }
109     }
110   }
111
112   XBT_DEBUG("min value : %f", min);
113
114   return min;
115 }
116
117 void HostL07Model::updateActionsState(double /*now*/, double delta) {
118
119   L07Action *action;
120   ActionList *actionSet = getRunningActionSet();
121
122   for(ActionList::iterator it = actionSet->begin(), itNext = it
123    ; it != actionSet->end()
124    ; it =  itNext) {
125   ++itNext;
126     action = static_cast<L07Action*>(&*it);
127     if (action->m_latency > 0) {
128       if (action->m_latency > delta) {
129         double_update(&(action->m_latency), delta, sg_surf_precision);
130       } else {
131         action->m_latency = 0.0;
132       }
133       if ((action->m_latency == 0.0) && (action->isSuspended() == 0)) {
134         action->updateBound();
135         lmm_update_variable_weight(p_maxminSystem, action->getVariable(), 1.0);
136       }
137     }
138     XBT_DEBUG("Action (%p) : remains (%g) updated by %g.",
139            action, action->getRemains(), lmm_variable_getvalue(action->getVariable()) * delta);
140     action->updateRemains(lmm_variable_getvalue(action->getVariable()) * delta);
141
142     if (action->getMaxDuration() != NO_MAX_DURATION)
143       action->updateMaxDuration(delta);
144
145     XBT_DEBUG("Action (%p) : remains (%g).", action, action->getRemains());
146
147     /* In the next if cascade, the action can be finished either because:
148      *  - The amount of remaining work reached 0
149      *  - The max duration was reached
150      * If it's not done, it may have failed.
151      */
152
153     if ((action->getRemains() <= 0) &&
154         (lmm_get_variable_weight(action->getVariable()) > 0)) {
155       action->finish();
156       action->setState(SURF_ACTION_DONE);
157     } else if ((action->getMaxDuration() != NO_MAX_DURATION) &&
158                (action->getMaxDuration() <= 0)) {
159       action->finish();
160       action->setState(SURF_ACTION_DONE);
161     } else {
162       /* Need to check that none of the model has failed */
163       lmm_constraint_t cnst = NULL;
164       int i = 0;
165
166       while ((cnst = lmm_get_cnst_from_var(p_maxminSystem, action->getVariable(), i++))) {
167         void *constraint_id = lmm_constraint_id(cnst);
168
169         if (static_cast<HostImpl*>(constraint_id)->isOff()) {
170           XBT_DEBUG("Action (%p) Failed!!", action);
171           action->finish();
172           action->setState(SURF_ACTION_FAILED);
173           break;
174         }
175       }
176     }
177   }
178   return;
179 }
180
181 Action *HostL07Model::executeParallelTask(int host_nb, sg_host_t *host_list,
182       double *flops_amount, double *bytes_amount,
183       double rate) {
184   return new L07Action(this, host_nb, host_list, flops_amount, bytes_amount, rate);
185 }
186
187
188 L07Action::L07Action(Model *model, int host_nb, sg_host_t*host_list,
189     double *flops_amount, double *bytes_amount, double rate)
190   : CpuAction(model, 1, 0)
191 {
192   int nb_link = 0;
193   int nb_used_host = 0; /* Only the hosts with something to compute (>0 flops) are counted) */
194   double latency = 0.0;
195
196   this->p_netcardList->reserve(host_nb);
197   for (int i = 0; i<host_nb; i++)
198     this->p_netcardList->push_back(host_list[i]->pimpl_netcard);
199
200   /* Compute the number of affected resources... */
201   if(bytes_amount != NULL) {
202     xbt_dict_t ptask_parallel_task_link_set = xbt_dict_new_homogeneous(NULL);
203
204     for (int i = 0; i < host_nb; i++) {
205       for (int j = 0; j < host_nb; j++) {
206
207         if (bytes_amount[i * host_nb + j] > 0) {
208           double lat=0.0;
209           std::vector<Link*> *route = new std::vector<Link*>();
210
211           routing_platf->getRouteAndLatency((*p_netcardList)[i], (*p_netcardList)[j], route, &lat);
212           latency = MAX(latency, lat);
213
214           for (auto link : *route)
215             xbt_dict_set(ptask_parallel_task_link_set, link->getName(), link, NULL);
216           delete route;
217         }
218       }
219     }
220
221     nb_link = xbt_dict_length(ptask_parallel_task_link_set);
222     xbt_dict_free(&ptask_parallel_task_link_set);
223   }
224
225   for (int i = 0; i < host_nb; i++)
226     if (flops_amount[i] > 0)
227       nb_used_host++;
228
229   XBT_DEBUG("Creating a parallel task (%p) with %d hosts and %d unique links.", this, host_nb, nb_link);
230   this->p_computationAmount = flops_amount;
231   this->p_communicationAmount = bytes_amount;
232   this->m_latency = latency;
233   this->m_rate = rate;
234
235   this->p_variable = lmm_variable_new(model->getMaxminSystem(), this, 1.0,
236       (rate > 0 ? rate : -1.0),
237       host_nb + nb_link);
238
239   if (this->m_latency > 0)
240     lmm_update_variable_weight(model->getMaxminSystem(), this->getVariable(), 0.0);
241
242   for (int i = 0; i < host_nb; i++)
243     lmm_expand(model->getMaxminSystem(), host_list[i]->pimpl_cpu->getConstraint(),
244         this->getVariable(), flops_amount[i]);
245
246   if(bytes_amount != NULL) {
247     for (int i = 0; i < host_nb; i++) {
248       for (int j = 0; j < host_nb; j++) {
249
250         if (bytes_amount[i * host_nb + j] == 0.0)
251           continue;
252         std::vector<Link*> *route = new std::vector<Link*>();
253
254         routing_platf->getRouteAndLatency((*p_netcardList)[i], (*p_netcardList)[j], route, NULL);
255
256         for (auto link : *route)
257           lmm_expand_add(model->getMaxminSystem(), link->getConstraint(), this->getVariable(), bytes_amount[i * host_nb + j]);
258
259         delete route;
260       }
261     }
262   }
263
264   if (nb_link + nb_used_host == 0) {
265     this->setCost(1.0);
266     this->setRemains(0.0);
267   }
268   xbt_free(host_list);
269 }
270
271 Action *NetworkL07Model::communicate(NetCard *src, NetCard *dst,
272                                        double size, double rate)
273 {
274   sg_host_t*host_list = xbt_new0(sg_host_t, 2);
275   double *flops_amount = xbt_new0(double, 2);
276   double *bytes_amount = xbt_new0(double, 4);
277   Action *res = NULL;
278
279   host_list[0] = sg_host_by_name(src->name());
280   host_list[1] = sg_host_by_name(dst->name());
281   bytes_amount[1] = size;
282
283   res = p_hostModel->executeParallelTask(2, host_list, flops_amount, bytes_amount, rate);
284
285   return res;
286 }
287
288 Cpu *CpuL07Model::createCpu(simgrid::s4u::Host *host,  xbt_dynar_t powerPeakList,
289     tmgr_trace_t power_trace, int core, tmgr_trace_t state_trace)
290 {
291   CpuL07 *cpu = new CpuL07(this, host, powerPeakList, power_trace, core, state_trace);
292   return cpu;
293 }
294
295 Link* NetworkL07Model::createLink(const char *name,
296     double bw_initial, tmgr_trace_t bw_trace,
297     double lat_initial, tmgr_trace_t lat_trace,
298     e_surf_link_sharing_policy_t policy, xbt_dict_t properties)
299 {
300   xbt_assert(!Link::byName(name),
301            "Link '%s' declared several times in the platform.", name);
302
303   Link* link = new LinkL07(this, name, properties, bw_initial, bw_trace, lat_initial, lat_trace, policy);
304   Link::onCreation(link);
305   return link;
306 }
307
308 /************
309  * Resource *
310  ************/
311
312 CpuL07::CpuL07(CpuL07Model *model, simgrid::s4u::Host *host,
313     xbt_dynar_t speedPeakList,
314     tmgr_trace_t speedTrace,
315     int core, tmgr_trace_t state_trace)
316  : Cpu(model, host, speedPeakList, core, xbt_dynar_get_as(speedPeakList,0,double))
317 {
318   p_constraint = lmm_constraint_new(model->getMaxminSystem(), this, xbt_dynar_get_as(speedPeakList,0,double));
319
320   if (speedTrace)
321     p_speed.event = future_evt_set->add_trace(speedTrace, 0.0, this);
322
323   if (state_trace)
324     p_stateEvent = future_evt_set->add_trace(state_trace, 0.0, this);
325 }
326
327 CpuL07::~CpuL07()
328 {
329 }
330
331 LinkL07::LinkL07(NetworkL07Model *model, const char* name, xbt_dict_t props,
332              double bw_initial, tmgr_trace_t bw_trace,
333              double lat_initial, tmgr_trace_t lat_trace,
334              e_surf_link_sharing_policy_t policy)
335  : Link(model, name, props, lmm_constraint_new(model->getMaxminSystem(), this, bw_initial))
336 {
337   m_bandwidth.peak = bw_initial;
338   if (bw_trace)
339     m_bandwidth.event = future_evt_set->add_trace(bw_trace, 0.0, this);
340
341   m_latency.peak = lat_initial;
342   if (lat_trace)
343     m_latency.event = future_evt_set->add_trace(lat_trace, 0.0, this);
344
345   if (policy == SURF_LINK_FATPIPE)
346     lmm_constraint_shared(getConstraint());
347 }
348
349 Action *CpuL07::execution_start(double size)
350 {
351   sg_host_t*host_list = xbt_new0(sg_host_t, 1);
352   double *flops_amount = xbt_new0(double, 1);
353
354   host_list[0] = getHost();
355   flops_amount[0] = size;
356
357   return static_cast<CpuL07Model*>(getModel())->p_hostModel
358     ->executeParallelTask( 1, host_list, flops_amount, NULL, -1);
359 }
360
361 Action *CpuL07::sleep(double duration)
362 {
363   L07Action *action = static_cast<L07Action*>(execution_start(1.0));
364   action->m_maxDuration = duration;
365   action->m_suspended = 2;
366   lmm_update_variable_weight(getModel()->getMaxminSystem(), action->getVariable(), 0.0);
367
368   return action;
369 }
370
371 bool CpuL07::isUsed(){
372   return lmm_constraint_used(getModel()->getMaxminSystem(), getConstraint());
373 }
374
375 /** @brief take into account changes of speed (either load or max) */
376 void CpuL07::onSpeedChange() {
377   lmm_variable_t var = NULL;
378   lmm_element_t elem = NULL;
379
380     lmm_update_constraint_bound(getModel()->getMaxminSystem(), getConstraint(), p_speed.peak * p_speed.scale);
381     while ((var = lmm_get_var_from_cnst
382             (getModel()->getMaxminSystem(), getConstraint(), &elem))) {
383       Action *action = static_cast<Action*>(lmm_variable_id(var));
384
385       lmm_update_variable_bound(getModel()->getMaxminSystem(),
386                                 action->getVariable(),
387                                 p_speed.scale * p_speed.peak);
388     }
389
390   Cpu::onSpeedChange();
391 }
392
393
394 bool LinkL07::isUsed(){
395   return lmm_constraint_used(getModel()->getMaxminSystem(), getConstraint());
396 }
397
398 void CpuL07::apply_event(tmgr_trace_iterator_t triggered, double value){
399   XBT_DEBUG("Updating cpu %s (%p) with value %g", getName(), this, value);
400   if (triggered == p_speed.event) {
401     p_speed.scale = value;
402     onSpeedChange();
403     tmgr_trace_event_unref(&p_speed.event);
404
405   } else if (triggered == p_stateEvent) {
406     if (value > 0)
407       turnOn();
408     else
409       turnOff();
410     tmgr_trace_event_unref(&p_stateEvent);
411
412   } else {
413     xbt_die("Unknown event!\n");
414   }
415 }
416
417 void LinkL07::apply_event(tmgr_trace_iterator_t triggered, double value) {
418   XBT_DEBUG("Updating link %s (%p) with value=%f", getName(), this, value);
419   if (triggered == m_bandwidth.event) {
420     updateBandwidth(value);
421     tmgr_trace_event_unref(&m_bandwidth.event);
422
423   } else if (triggered == m_latency.event) {
424     updateLatency(value);
425     tmgr_trace_event_unref(&m_latency.event);
426
427   } else if (triggered == m_stateEvent) {
428     if (value > 0)
429       turnOn();
430     else
431       turnOff();
432     tmgr_trace_event_unref(&m_stateEvent);
433
434   } else {
435     xbt_die("Unknown event ! \n");
436   }
437 }
438
439 void LinkL07::updateBandwidth(double value)
440 {
441   m_bandwidth.peak = value;
442   lmm_update_constraint_bound(getModel()->getMaxminSystem(), getConstraint(), m_bandwidth.peak * m_bandwidth.scale);
443 }
444
445 void LinkL07::updateLatency(double value)
446 {
447   lmm_variable_t var = NULL;
448   L07Action *action;
449   lmm_element_t elem = NULL;
450
451   m_latency.peak = value;
452   while ((var = lmm_get_var_from_cnst(getModel()->getMaxminSystem(), getConstraint(), &elem))) {
453     action = static_cast<L07Action*>(lmm_variable_id(var));
454     action->updateBound();
455   }
456 }
457
458 /**********
459  * Action *
460  **********/
461
462 L07Action::~L07Action(){
463   delete p_netcardList;
464   free(p_communicationAmount);
465   free(p_computationAmount);
466 }
467
468 void L07Action::updateBound()
469 {
470   double lat_current = 0.0;
471   double lat_bound = -1.0;
472   int i, j;
473
474   int hostNb = p_netcardList->size();
475
476   if (p_communicationAmount != NULL) {
477     for (i = 0; i < hostNb; i++) {
478       for (j = 0; j < hostNb; j++) {
479
480         if (p_communicationAmount[i * hostNb + j] > 0) {
481           double lat = 0.0;
482           std::vector<Link*> *route = new std::vector<Link*>();
483           routing_platf->getRouteAndLatency((*p_netcardList)[i], (*p_netcardList)[j], route, &lat);
484
485           lat_current = MAX(lat_current, lat * p_communicationAmount[i * hostNb + j]);
486           delete route;
487         }
488       }
489     }
490   }
491   lat_bound = sg_tcp_gamma / (2.0 * lat_current);
492   XBT_DEBUG("action (%p) : lat_bound = %g", this, lat_bound);
493   if ((m_latency == 0.0) && (m_suspended == 0)) {
494     if (m_rate < 0)
495       lmm_update_variable_bound(getModel()->getMaxminSystem(), getVariable(), lat_bound);
496     else
497       lmm_update_variable_bound(getModel()->getMaxminSystem(), getVariable(),
498         std::min(m_rate, lat_bound));
499   }
500 }
501
502 int L07Action::unref()
503 {
504   m_refcount--;
505   if (!m_refcount) {
506     if (action_hook.is_linked())
507       p_stateSet->erase(p_stateSet->iterator_to(*this));
508     if (getVariable())
509       lmm_variable_free(getModel()->getMaxminSystem(), getVariable());
510     delete this;
511     return 1;
512   }
513   return 0;
514 }
515
516 }
517 }