Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ad16d13b7fba3887e227289ad45870416c92dc9d
[simgrid.git] / src / surf / cpu_ti.cpp
1 /* Copyright (c) 2013-2018. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "cpu_ti.hpp"
7 #include "src/surf/surf_interface.hpp"
8 #include "src/surf/trace_mgr.hpp"
9 #include "surf/surf.hpp"
10
11 #ifndef SURF_MODEL_CPUTI_H_
12 #define SURF_MODEL_CPUTI_H_
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(surf_cpu_ti, surf_cpu, "Logging specific to the SURF CPU TRACE INTEGRATION module");
15
16 namespace simgrid {
17 namespace surf {
18
19 /*********
20  * Trace *
21  *********/
22
23 CpuTiTrace::CpuTiTrace(tmgr_trace_t speedTrace)
24 {
25   double integral = 0;
26   double time = 0;
27   int i = 0;
28   nb_points_      = speedTrace->event_list.size() + 1;
29   time_points_    = new double[nb_points_];
30   integral_       = new double[nb_points_];
31   for (auto const& val : speedTrace->event_list) {
32     time_points_[i] = time;
33     integral_[i] = integral;
34     integral += val.date_ * val.value_;
35     time += val.date_;
36     i++;
37   }
38   time_points_[i] = time;
39   integral_[i] = integral;
40 }
41
42 CpuTiTrace::~CpuTiTrace()
43 {
44   delete[] time_points_;
45   delete [] integral_;
46 }
47
48 CpuTiTmgr::~CpuTiTmgr()
49 {
50   if (trace_)
51     delete trace_;
52 }
53
54 /**
55 * \brief Integrate trace
56 *
57 * Wrapper around surf_cpu_integrate_trace_simple() to get
58 * the cyclic effect.
59 *
60 * \param a      Begin of interval
61 * \param b      End of interval
62 * \return the integrate value. -1 if an error occurs.
63 */
64 double CpuTiTmgr::integrate(double a, double b)
65 {
66   int a_index;
67
68   if ((a < 0.0) || (a > b)) {
69     xbt_die("Error, invalid integration interval [%.2f,%.2f]. "
70         "You probably have a task executing with negative computation amount. Check your code.", a, b);
71   }
72   if (fabs(a -b) < EPSILON)
73     return 0.0;
74
75   if (type_ == TRACE_FIXED) {
76     return ((b - a) * value_);
77   }
78
79   if (fabs(ceil(a / last_time_) - a / last_time_) < EPSILON)
80     a_index = 1 + static_cast<int>(ceil(a / last_time_));
81   else
82     a_index = static_cast<int>(ceil(a / last_time_));
83
84   int b_index = static_cast<int>(floor(b / last_time_));
85
86   if (a_index > b_index) {      /* Same chunk */
87     return trace_->integrate_simple(a - (a_index - 1) * last_time_, b - (b_index)*last_time_);
88   }
89
90   double first_chunk  = trace_->integrate_simple(a - (a_index - 1) * last_time_, last_time_);
91   double middle_chunk = (b_index - a_index) * total_;
92   double last_chunk   = trace_->integrate_simple(0.0, b - (b_index)*last_time_);
93
94   XBT_DEBUG("first_chunk=%.2f  middle_chunk=%.2f  last_chunk=%.2f\n", first_chunk, middle_chunk, last_chunk);
95
96   return (first_chunk + middle_chunk + last_chunk);
97 }
98
99 /**
100  * \brief Auxiliary function to compute the integral between a and b.
101  *     It simply computes the integrals at point a and b and returns the difference between them.
102  * \param a  Initial point
103  * \param b  Final point
104 */
105 double CpuTiTrace::integrate_simple(double a, double b)
106 {
107   return integrate_simple_point(b) - integrate_simple_point(a);
108 }
109
110 /**
111  * \brief Auxiliary function to compute the integral at point a.
112  * \param a        point
113  */
114 double CpuTiTrace::integrate_simple_point(double a)
115 {
116   double integral = 0;
117   double a_aux = a;
118   int ind         = binary_search(time_points_, a, 0, nb_points_ - 1);
119   integral += integral_[ind];
120
121   XBT_DEBUG("a %f ind %d integral %f ind + 1 %f ind %f time +1 %f time %f", a, ind, integral, integral_[ind + 1],
122             integral_[ind], time_points_[ind + 1], time_points_[ind]);
123   double_update(&a_aux, time_points_[ind], sg_maxmin_precision * sg_surf_precision);
124   if (a_aux > 0)
125     integral +=
126         ((integral_[ind + 1] - integral_[ind]) / (time_points_[ind + 1] - time_points_[ind])) * (a - time_points_[ind]);
127   XBT_DEBUG("Integral a %f = %f", a, integral);
128
129   return integral;
130 }
131
132 /**
133 * \brief Computes the time needed to execute "amount" on cpu.
134 *
135 * Here, amount can span multiple trace periods
136 *
137 * \param a        Initial time
138 * \param amount  Amount to be executed
139 * \return  End time
140 */
141 double CpuTiTmgr::solve(double a, double amount)
142 {
143   /* Fix very small negative numbers */
144   if ((a < 0.0) && (a > -EPSILON)) {
145     a = 0.0;
146   }
147   if ((amount < 0.0) && (amount > -EPSILON)) {
148     amount = 0.0;
149   }
150
151   /* Sanity checks */
152   if ((a < 0.0) || (amount < 0.0)) {
153     XBT_CRITICAL ("Error, invalid parameters [a = %.2f, amount = %.2f]. "
154         "You probably have a task executing with negative computation amount. Check your code.", a, amount);
155     xbt_abort();
156   }
157
158   /* At this point, a and amount are positive */
159   if (amount < EPSILON)
160     return a;
161
162   /* Is the trace fixed ? */
163   if (type_ == TRACE_FIXED) {
164     return (a + (amount / value_));
165   }
166
167   XBT_DEBUG("amount %f total %f", amount, total_);
168   /* Reduce the problem to one where amount <= trace_total */
169   int quotient = static_cast<int>(floor(amount / total_));
170   double reduced_amount = (total_) * ((amount / total_) - floor(amount / total_));
171   double reduced_a      = a - (last_time_) * static_cast<int>(floor(a / last_time_));
172
173   XBT_DEBUG("Quotient: %d reduced_amount: %f reduced_a: %f", quotient, reduced_amount, reduced_a);
174
175   /* Now solve for new_amount which is <= trace_total */
176   double reduced_b;
177   XBT_DEBUG("Solve integral: [%.2f, amount=%.2f]", reduced_a, reduced_amount);
178   double amount_till_end = integrate(reduced_a, last_time_);
179
180   if (amount_till_end > reduced_amount) {
181     reduced_b = trace_->solve_simple(reduced_a, reduced_amount);
182   } else {
183     reduced_b = last_time_ + trace_->solve_simple(0.0, reduced_amount - amount_till_end);
184   }
185
186   /* Re-map to the original b and amount */
187   return (last_time_) * static_cast<int>(floor(a / last_time_)) + (quotient * last_time_) + reduced_b;
188 }
189
190 /**
191  * \brief Auxiliary function to solve integral.
192  *  It returns the date when the requested amount of flops is available
193  * \param a        Initial point
194  * \param amount  Amount of flops
195  * \return The date when amount is available.
196 */
197 double CpuTiTrace::solve_simple(double a, double amount)
198 {
199   double integral_a = integrate_simple_point(a);
200   int ind           = binary_search(integral_, integral_a + amount, 0, nb_points_ - 1);
201   double time       = time_points_[ind];
202   time += (integral_a + amount - integral_[ind]) /
203           ((integral_[ind + 1] - integral_[ind]) / (time_points_[ind + 1] - time_points_[ind]));
204
205   return time;
206 }
207
208 /**
209 * \brief Auxiliary function to update the CPU speed scale.
210 *
211 *  This function uses the trace structure to return the speed scale at the determined time a.
212 * \param a        Time
213 * \return CPU speed scale
214 */
215 double CpuTiTmgr::get_power_scale(double a)
216 {
217   double reduced_a          = a - floor(a / last_time_) * last_time_;
218   int point                 = trace_->binary_search(trace_->time_points_, reduced_a, 0, trace_->nb_points_ - 1);
219   trace_mgr::DatedValue val = speed_trace_->event_list.at(point);
220   return val.value_;
221 }
222
223 /**
224 * \brief Creates a new integration trace from a tmgr_trace_t
225 *
226 * \param  speedTrace    CPU availability trace
227 * \param  value          Percentage of CPU speed available (useful to fixed tracing)
228 * \return  Integration trace structure
229 */
230 CpuTiTmgr::CpuTiTmgr(tmgr_trace_t speedTrace, double value) : speed_trace_(speedTrace)
231 {
232   double total_time = 0.0;
233   trace_ = 0;
234
235 /* no availability file, fixed trace */
236   if (not speedTrace) {
237     type_ = TRACE_FIXED;
238     value_ = value;
239     XBT_DEBUG("No availability trace. Constant value = %f", value);
240     return;
241   }
242
243   /* only one point available, fixed trace */
244   if (speedTrace->event_list.size() == 1) {
245     trace_mgr::DatedValue val = speedTrace->event_list.front();
246     type_ = TRACE_FIXED;
247     value_                    = val.value_;
248     return;
249   }
250
251   type_ = TRACE_DYNAMIC;
252
253   /* count the total time of trace file */
254   for (auto const& val : speedTrace->event_list)
255     total_time += val.date_;
256
257   trace_ = new CpuTiTrace(speedTrace);
258   last_time_ = total_time;
259   total_    = trace_->integrate_simple(0, total_time);
260
261   XBT_DEBUG("Total integral %f, last_time %f ", total_, last_time_);
262 }
263
264 /**
265  * \brief Binary search in array.
266  *  It returns the first point of the interval in which "a" is.
267  * \param array    Array
268  * \param a        Value to search
269  * \param low     Low bound to search in array
270  * \param high    Upper bound to search in array
271  * \return Index of point
272 */
273 int CpuTiTrace::binary_search(double* array, double a, int low, int high)
274 {
275   xbt_assert(low < high, "Wrong parameters: low (%d) should be smaller than high (%d)", low, high);
276
277   do {
278     int mid = low + (high - low) / 2;
279     XBT_DEBUG("a %f low %d high %d mid %d value %f", a, low, high, mid, array[mid]);
280
281     if (array[mid] > a)
282       high = mid;
283     else
284       low = mid;
285   }
286   while (low < high - 1);
287
288   return low;
289 }
290
291 }
292 }
293
294 /*********
295  * Model *
296  *********/
297
298 void surf_cpu_model_init_ti()
299 {
300   xbt_assert(not surf_cpu_model_pm, "CPU model already initialized. This should not happen.");
301   xbt_assert(not surf_cpu_model_vm, "CPU model already initialized. This should not happen.");
302
303   surf_cpu_model_pm = new simgrid::surf::CpuTiModel();
304   all_existing_models->push_back(surf_cpu_model_pm);
305
306   surf_cpu_model_vm = new simgrid::surf::CpuTiModel();
307   all_existing_models->push_back(surf_cpu_model_vm);
308 }
309
310 namespace simgrid {
311 namespace surf {
312
313 CpuTiModel::~CpuTiModel()
314 {
315   surf_cpu_model_pm = nullptr;
316 }
317
318 Cpu *CpuTiModel::createCpu(simgrid::s4u::Host *host, std::vector<double>* speedPerPstate, int core)
319 {
320   return new CpuTi(this, host, speedPerPstate, core);
321 }
322
323 double CpuTiModel::next_occuring_event(double now)
324 {
325   double min_action_duration = -1;
326
327   /* iterates over modified cpus to update share resources */
328   for (auto it = std::begin(modified_cpus_); it != std::end(modified_cpus_);) {
329     CpuTi& ti = *it;
330     ++it; // increment iterator here since the following call to ti.updateActionsFinishTime() may invalidate it
331     ti.update_actions_finish_time(now);
332   }
333
334   /* get the min next event if heap not empty */
335   if (not get_action_heap().empty())
336     min_action_duration = get_action_heap().top_date() - now;
337
338   XBT_DEBUG("Share resources, min next event date: %f", min_action_duration);
339
340   return min_action_duration;
341 }
342
343 void CpuTiModel::update_actions_state(double now, double /*delta*/)
344 {
345   while (not get_action_heap().empty() && double_equals(get_action_heap().top_date(), now, sg_surf_precision)) {
346     CpuTiAction* action = static_cast<CpuTiAction*>(get_action_heap().pop());
347     XBT_DEBUG("Action %p: finish", action);
348     action->finish(kernel::resource::Action::State::done);
349     /* update remaining amount of all actions */
350     action->cpu_->update_remaining_amount(surf_get_clock());
351   }
352 }
353
354 /************
355  * Resource *
356  ************/
357 CpuTi::CpuTi(CpuTiModel *model, simgrid::s4u::Host *host, std::vector<double> *speedPerPstate, int core)
358   : Cpu(model, host, speedPerPstate, core)
359 {
360   xbt_assert(core == 1, "Multi-core not handled by this model yet");
361
362   speed_.peak = speedPerPstate->front();
363   XBT_DEBUG("CPU create: peak=%f", speed_.peak);
364
365   speed_integrated_trace_ = new CpuTiTmgr(nullptr, 1 /*scale*/);
366 }
367
368 CpuTi::~CpuTi()
369 {
370   set_modified(false);
371   delete speed_integrated_trace_;
372 }
373 void CpuTi::set_speed_trace(tmgr_trace_t trace)
374 {
375   if (speed_integrated_trace_)
376     delete speed_integrated_trace_;
377
378   speed_integrated_trace_ = new CpuTiTmgr(trace, speed_.scale);
379
380   /* add a fake trace event if periodicity == 0 */
381   if (trace && trace->event_list.size() > 1) {
382     trace_mgr::DatedValue val = trace->event_list.back();
383     if (val.date_ < 1e-12)
384       speed_.event = future_evt_set->add_trace(new simgrid::trace_mgr::trace(), this);
385   }
386 }
387
388 void CpuTi::apply_event(tmgr_trace_event_t event, double value)
389 {
390   if (event == speed_.event) {
391     tmgr_trace_t speedTrace;
392     CpuTiTmgr* trace;
393
394     XBT_DEBUG("Finish trace date: value %f", value);
395     /* update remaining of actions and put in modified cpu list */
396     update_remaining_amount(surf_get_clock());
397
398     set_modified(true);
399
400     speedTrace                = speed_integrated_trace_->speed_trace_;
401     trace_mgr::DatedValue val = speedTrace->event_list.back();
402     delete speed_integrated_trace_;
403     speed_.scale = val.value_;
404
405     trace = new CpuTiTmgr(TRACE_FIXED, val.value_);
406     XBT_DEBUG("value %f", val.value_);
407
408     speed_integrated_trace_ = trace;
409
410     tmgr_trace_event_unref(&speed_.event);
411
412   } else if (event == stateEvent_) {
413     if (value > 0) {
414       if (is_off())
415         host_that_restart.push_back(getHost());
416       turn_on();
417     } else {
418       turn_off();
419       double date = surf_get_clock();
420
421       /* put all action running on cpu to failed */
422       for (CpuTiAction& action : action_set_) {
423         if (action.get_state() == kernel::resource::Action::State::running ||
424             action.get_state() == kernel::resource::Action::State::ready ||
425             action.get_state() == kernel::resource::Action::State::not_in_the_system) {
426           action.set_finish_time(date);
427           action.set_state(kernel::resource::Action::State::failed);
428           get_model()->get_action_heap().remove(&action);
429         }
430       }
431     }
432     tmgr_trace_event_unref(&stateEvent_);
433
434   } else {
435     xbt_die("Unknown event!\n");
436   }
437 }
438
439 /** Update the actions that are running on this CPU (which was modified recently) */
440 void CpuTi::update_actions_finish_time(double now)
441 {
442   /* update remaining amount of actions */
443   update_remaining_amount(now);
444
445   /* Compute the sum of priorities for the actions running on that CPU */
446   sum_priority_ = 0.0;
447   for (CpuTiAction const& action : action_set_) {
448     /* action not running, skip it */
449     if (action.get_state_set() != surf_cpu_model_pm->get_running_action_set())
450       continue;
451
452     /* bogus priority, skip it */
453     if (action.get_priority() <= 0)
454       continue;
455
456     /* action suspended, skip it */
457     if (action.suspended_ != kernel::resource::Action::SuspendStates::not_suspended)
458       continue;
459
460     sum_priority_ += 1.0 / action.get_priority();
461   }
462
463   for (CpuTiAction& action : action_set_) {
464     double min_finish = -1;
465     /* action not running, skip it */
466     if (action.get_state_set() != surf_cpu_model_pm->get_running_action_set())
467       continue;
468
469     /* verify if the action is really running on cpu */
470     if (action.suspended_ == kernel::resource::Action::SuspendStates::not_suspended && action.get_priority() > 0) {
471       /* total area needed to finish the action. Used in trace integration */
472       double total_area = (action.get_remains() * sum_priority_ * action.get_priority()) / speed_.peak;
473
474       action.set_finish_time(speed_integrated_trace_->solve(now, total_area));
475       /* verify which event will happen before (max_duration or finish time) */
476       if (action.get_max_duration() > NO_MAX_DURATION &&
477           action.get_start_time() + action.get_max_duration() < action.get_finish_time())
478         min_finish = action.get_start_time() + action.get_max_duration();
479       else
480         min_finish = action.get_finish_time();
481     } else {
482       /* put the max duration time on heap */
483       if (action.get_max_duration() > NO_MAX_DURATION)
484         min_finish = action.get_start_time() + action.get_max_duration();
485     }
486     /* add in action heap */
487     if (min_finish > NO_MAX_DURATION)
488       get_model()->get_action_heap().update(&action, min_finish, kernel::resource::ActionHeap::Type::unset);
489     else
490       get_model()->get_action_heap().remove(&action);
491
492     XBT_DEBUG("Update finish time: Cpu(%s) Action: %p, Start Time: %f Finish Time: %f Max duration %f", get_cname(),
493               &action, action.get_start_time(), action.get_finish_time(), action.get_max_duration());
494   }
495   /* remove from modified cpu */
496   set_modified(false);
497 }
498
499 bool CpuTi::is_used()
500 {
501   return not action_set_.empty();
502 }
503
504 double CpuTi::get_available_speed()
505 {
506   speed_.scale = speed_integrated_trace_->get_power_scale(surf_get_clock());
507   return Cpu::get_available_speed();
508 }
509
510 /** @brief Update the remaining amount of actions */
511 void CpuTi::update_remaining_amount(double now)
512 {
513
514   /* already updated */
515   if (last_update_ >= now)
516     return;
517
518   /* compute the integration area */
519   double area_total = speed_integrated_trace_->integrate(last_update_, now) * speed_.peak;
520   XBT_DEBUG("Flops total: %f, Last update %f", area_total, last_update_);
521   for (CpuTiAction& action : action_set_) {
522     /* action not running, skip it */
523     if (action.get_state_set() != get_model()->get_running_action_set())
524       continue;
525
526     /* bogus priority, skip it */
527     if (action.get_priority() <= 0)
528       continue;
529
530     /* action suspended, skip it */
531     if (action.suspended_ != kernel::resource::Action::SuspendStates::not_suspended)
532       continue;
533
534     /* action don't need update */
535     if (action.get_start_time() >= now)
536       continue;
537
538     /* skip action that are finishing now */
539     if (action.get_finish_time() >= 0 && action.get_finish_time() <= now)
540       continue;
541
542     /* update remaining */
543     action.update_remains(area_total / (sum_priority_ * action.get_priority()));
544     XBT_DEBUG("Update remaining action(%p) remaining %f", &action, action.get_remains_no_update());
545   }
546   last_update_ = now;
547 }
548
549 CpuAction *CpuTi::execution_start(double size)
550 {
551   XBT_IN("(%s,%g)", get_cname(), size);
552   CpuTiAction* action = new CpuTiAction(static_cast<CpuTiModel*>(get_model()), size, is_off(), this);
553
554   action_set_.push_back(*action);
555
556   XBT_OUT();
557   return action;
558 }
559
560
561 CpuAction *CpuTi::sleep(double duration)
562 {
563   if (duration > 0)
564     duration = std::max(duration, sg_surf_precision);
565
566   XBT_IN("(%s,%g)", get_cname(), duration);
567   CpuTiAction* action = new CpuTiAction(static_cast<CpuTiModel*>(get_model()), 1.0, is_off(), this);
568
569   action->set_max_duration(duration);
570   action->suspended_ = kernel::resource::Action::SuspendStates::sleeping;
571   if (duration == NO_MAX_DURATION) {
572     /* Move to the *end* of the corresponding action set. This convention is used to speed up update_resource_state */
573     simgrid::xbt::intrusive_erase(*action->get_state_set(), *action);
574     action->state_set_ = &static_cast<CpuTiModel*>(get_model())->runningActionSetThatDoesNotNeedBeingChecked_;
575     action->get_state_set()->push_back(*action);
576   }
577
578   action_set_.push_back(*action);
579
580   XBT_OUT();
581   return action;
582 }
583
584 void CpuTi::set_modified(bool modified)
585 {
586   CpuTiList& modifiedCpu = static_cast<CpuTiModel*>(get_model())->modified_cpus_;
587   if (modified) {
588     if (not cpu_ti_hook.is_linked()) {
589       modifiedCpu.push_back(*this);
590     }
591   } else {
592     if (cpu_ti_hook.is_linked())
593       simgrid::xbt::intrusive_erase(modifiedCpu, *this);
594   }
595 }
596
597 /**********
598  * Action *
599  **********/
600
601 CpuTiAction::CpuTiAction(CpuTiModel *model_, double cost, bool failed, CpuTi *cpu)
602  : CpuAction(model_, cost, failed)
603  , cpu_(cpu)
604 {
605   cpu_->set_modified(true);
606 }
607 CpuTiAction::~CpuTiAction()
608 {
609   /* remove from action_set */
610   if (action_ti_hook.is_linked())
611     simgrid::xbt::intrusive_erase(cpu_->action_set_, *this);
612   /* remove from heap */
613   get_model()->get_action_heap().remove(this);
614   cpu_->set_modified(true);
615 }
616
617 void CpuTiAction::set_state(Action::State state)
618 {
619   CpuAction::set_state(state);
620   cpu_->set_modified(true);
621 }
622
623 void CpuTiAction::cancel()
624 {
625   this->set_state(Action::State::failed);
626   get_model()->get_action_heap().remove(this);
627   cpu_->set_modified(true);
628 }
629
630 void CpuTiAction::suspend()
631 {
632   XBT_IN("(%p)", this);
633   if (suspended_ != Action::SuspendStates::sleeping) {
634     suspended_ = Action::SuspendStates::suspended;
635     get_model()->get_action_heap().remove(this);
636     cpu_->set_modified(true);
637   }
638   XBT_OUT();
639 }
640
641 void CpuTiAction::resume()
642 {
643   XBT_IN("(%p)", this);
644   if (suspended_ != Action::SuspendStates::sleeping) {
645     suspended_ = Action::SuspendStates::not_suspended;
646     cpu_->set_modified(true);
647   }
648   XBT_OUT();
649 }
650
651 void CpuTiAction::set_max_duration(double duration)
652 {
653   double min_finish;
654
655   XBT_IN("(%p,%g)", this, duration);
656
657   Action::set_max_duration(duration);
658
659   if (duration >= 0)
660     min_finish = (get_start_time() + get_max_duration()) < get_finish_time() ? (get_start_time() + get_max_duration())
661                                                                              : get_finish_time();
662   else
663     min_finish = get_finish_time();
664
665   /* add in action heap */
666   get_model()->get_action_heap().update(this, min_finish, kernel::resource::ActionHeap::Type::unset);
667
668   XBT_OUT();
669 }
670
671 void CpuTiAction::set_priority(double priority)
672 {
673   XBT_IN("(%p,%g)", this, priority);
674   set_priority_no_update(priority);
675   cpu_->set_modified(true);
676   XBT_OUT();
677 }
678
679 double CpuTiAction::get_remains()
680 {
681   XBT_IN("(%p)", this);
682   cpu_->update_remaining_amount(surf_get_clock());
683   XBT_OUT();
684   return get_remains_no_update();
685 }
686
687 }
688 }
689
690 #endif /* SURF_MODEL_CPUTI_H_ */