Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add a verbose message to the Boost contexts
[simgrid.git] / src / simix / RawContext.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /** \file RawContext.cpp 
8   * Fast context switching inspired from SystemV ucontexts.
9   *
10   * In contrast to System V context, it does not touch the signal mask
11   * which avoids making a system call (at least on Linux).
12   */
13
14 #include <math.h>
15
16 #include <utility>
17 #include <functional>
18
19 #include <xbt/log.h>
20 #include <xbt/parmap.h>
21 #include <xbt/dynar.h>
22
23 #include "smx_private.h"
24 #include "smx_private.hpp"
25 #include "mc/mc.h"
26
27 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context);
28
29 // ***** Class definitions
30
31 namespace simgrid {
32 namespace simix {
33
34 class RawContext;
35 class RawContextFactory;
36
37 class RawContext : public Context {
38 protected:
39   void* stack_ = nullptr; 
40   /** pointer to top the stack stack */
41   void* stack_top_ = nullptr;
42 public:
43   friend class RawContextFactory;
44   RawContext(std::function<void()> code,
45           void_pfn_smxprocess_t cleanup_func,
46           smx_process_t process);
47   ~RawContext();
48 public:
49   static void wrapper(void* arg);
50   void stop() override;
51   void suspend() override;
52   void resume();
53 private:
54   void suspend_serial();
55   void suspend_parallel();
56   void resume_serial();
57   void resume_parallel();
58 };
59
60 class RawContextFactory : public ContextFactory {
61 public:
62   RawContextFactory();
63   ~RawContextFactory();
64   RawContext* create_context(std::function<void()> code,
65     void_pfn_smxprocess_t, smx_process_t process) override;
66   void run_all() override;
67 private:
68   void run_all_adaptative();
69   void run_all_serial();
70   void run_all_parallel();
71 };
72
73 ContextFactory* raw_factory()
74 {
75   XBT_VERB("Using raw contexts. Because the glibc is just not good enough for us.");
76   return new RawContextFactory();
77 }
78
79 }
80 }
81
82 // ***** Loads of static stuff
83
84 #ifdef HAVE_THREAD_CONTEXTS
85 static xbt_parmap_t raw_parmap;
86 static simgrid::simix::RawContext** raw_workers_context;    /* space to save the worker context in each thread */
87 static uintptr_t raw_threads_working;     /* number of threads that have started their work */
88 static xbt_os_thread_key_t raw_worker_id_key; /* thread-specific storage for the thread id */
89 #endif
90 #ifdef ADAPTIVE_THRESHOLD
91 #define SCHED_ROUND_LIMIT 5
92 static xbt_os_timer_t round_time;
93 static double par_time,seq_time;
94 static double par_ratio,seq_ratio;
95 static int reached_seq_limit, reached_par_limit;
96 static unsigned int par_proc_that_ran = 0,seq_proc_that_ran = 0;  /* Counters of processes that have run in SCHED_ROUND_LIMIT scheduling rounds */
97 static unsigned int seq_sched_round=0, par_sched_round=0; /* Amount of SR that ran serial/parallel*/
98 /*Varables used to calculate running variance and mean*/
99 static double prev_avg_par_proc=0,prev_avg_seq_proc=0;
100 static double delta=0;
101 static double s_par_proc=0,s_seq_proc=0; /*Standard deviation of number of processes computed in par/seq during the current simulation*/
102 static double avg_par_proc=0,sd_par_proc=0;
103 static double avg_seq_proc=0,sd_seq_proc=0;
104 static long long par_window=(long long)HUGE_VAL,seq_window=0;
105 #endif
106 static unsigned long raw_process_index = 0;   /* index of the next process to run in the
107                                                * list of runnable processes */
108 static simgrid::simix::RawContext* raw_maestro_context;
109
110 static bool raw_context_parallel = false;
111 #ifdef ADAPTIVE_THRESHOLD
112 static bool raw_context_adaptative = false;
113 #endif
114
115 // ***** Raw context routines
116
117 typedef void (*rawctx_entry_point_t)(void *);
118
119 typedef void* raw_stack_t;
120 extern "C" raw_stack_t raw_makecontext(void* malloced_stack, int stack_size,
121                                    rawctx_entry_point_t entry_point, void* arg);
122 extern "C" void raw_swapcontext(raw_stack_t* old, raw_stack_t new_context);
123
124 #if PROCESSOR_x86_64
125 __asm__ (
126 #if defined(__APPLE__)
127    ".text\n"
128    ".globl _raw_makecontext\n"
129    "_raw_makecontext:\n"
130 #elif defined(_WIN32)
131    ".text\n"
132    ".globl raw_makecontext\n"
133    "raw_makecontext:\n"
134 #else
135    ".text\n"
136    ".globl raw_makecontext\n"
137    ".type raw_makecontext,@function\n"
138    "raw_makecontext:\n"/* Calling convention sets the arguments in rdi, rsi, rdx and rcx, respectively */
139 #endif
140    "   mov %rdi,%rax\n"      /* stack */
141    "   add %rsi,%rax\n"      /* size  */
142    "   andq $-16, %rax\n"    /* align stack */
143    "   movq $0,   -8(%rax)\n" /* @return for func */
144    "   mov %rdx,-16(%rax)\n" /* func */
145    "   mov %rcx,-24(%rax)\n" /* arg/rdi */
146    "   movq $0,  -32(%rax)\n" /* rsi */
147    "   movq $0,  -40(%rax)\n" /* rdx */
148    "   movq $0,  -48(%rax)\n" /* rcx */
149    "   movq $0,  -56(%rax)\n" /* r8  */
150    "   movq $0,  -64(%rax)\n" /* r9  */
151    "   movq $0,  -72(%rax)\n" /* rbp */
152    "   movq $0,  -80(%rax)\n" /* rbx */
153    "   movq $0,  -88(%rax)\n" /* r12 */
154    "   movq $0,  -96(%rax)\n" /* r13 */
155    "   movq $0, -104(%rax)\n" /* r14 */
156    "   movq $0, -112(%rax)\n" /* r15 */
157    "   sub $112,%rax\n"
158    "   ret\n"
159 );
160
161 __asm__ (
162 #if defined(__APPLE__)
163    ".text\n"
164    ".globl _raw_swapcontext\n"
165    "_raw_swapcontext:\n"
166 #elif defined(_WIN32)
167    ".text\n"
168    ".globl raw_swapcontext\n"
169    "raw_swapcontext:\n"
170 #else
171    ".text\n"
172    ".globl raw_swapcontext\n"
173    ".type raw_swapcontext,@function\n"
174    "raw_swapcontext:\n" /* Calling convention sets the arguments in rdi and rsi, respectively */
175 #endif
176    "   push %rdi\n"
177    "   push %rsi\n"
178    "   push %rdx\n"
179    "   push %rcx\n"
180    "   push %r8\n"
181    "   push %r9\n"
182    "   push %rbp\n"
183    "   push %rbx\n"
184    "   push %r12\n"
185    "   push %r13\n"
186    "   push %r14\n"
187    "   push %r15\n"
188    "   mov %rsp,(%rdi)\n" /* old */
189    "   mov %rsi,%rsp\n" /* new */
190    "   pop %r15\n"
191    "   pop %r14\n"
192    "   pop %r13\n"
193    "   pop %r12\n"
194    "   pop %rbx\n"
195    "   pop %rbp\n"
196    "   pop %r9\n"
197    "   pop %r8\n"
198    "   pop %rcx\n"
199    "   pop %rdx\n"
200    "   pop %rsi\n"
201    "   pop %rdi\n"
202    "   ret\n"
203 );
204 #elif PROCESSOR_i686
205 __asm__ (
206 #if defined(__APPLE__) || defined(_WIN32)
207    ".text\n"
208    ".globl _raw_makecontext\n"
209    "_raw_makecontext:\n"
210 #else
211    ".text\n"
212    ".globl raw_makecontext\n"
213    ".type raw_makecontext,@function\n"
214    "raw_makecontext:\n"
215 #endif
216    "   movl 4(%esp),%eax\n"   /* stack */
217    "   addl 8(%esp),%eax\n"   /* size  */
218    "   andl $-16, %eax\n"     /* align stack */
219    "   movl 12(%esp),%ecx\n"  /* func  */
220    "   movl 16(%esp),%edx\n"  /* arg   */
221    "   movl %edx, -4(%eax)\n"
222    "   movl $0,   -8(%eax)\n" /* @return for func */
223    "   movl %ecx,-12(%eax)\n"
224    "   movl $0,  -16(%eax)\n" /* ebp */
225    "   movl $0,  -20(%eax)\n" /* ebx */
226    "   movl $0,  -24(%eax)\n" /* esi */
227    "   movl $0,  -28(%eax)\n" /* edi */
228    "   subl $28,%eax\n"
229    "   retl\n"
230 );
231
232 __asm__ (
233 #if defined(__APPLE__) || defined(_WIN32)
234    ".text\n"
235    ".globl _raw_swapcontext\n"
236    "_raw_swapcontext:\n"
237 #else
238    ".text\n"
239    ".globl raw_swapcontext\n"
240    ".type raw_swapcontext,@function\n"
241    "raw_swapcontext:\n"
242 #endif
243    "   movl 4(%esp),%eax\n" /* old */
244    "   movl 8(%esp),%edx\n" /* new */
245    "   pushl %ebp\n"
246    "   pushl %ebx\n"
247    "   pushl %esi\n"
248    "   pushl %edi\n"
249    "   movl %esp,(%eax)\n"
250    "   movl %edx,%esp\n"
251    "   popl %edi\n"
252    "   popl %esi\n"
253    "   popl %ebx\n"
254    "   popl %ebp\n"
255    "   retl\n"
256 );
257 #else
258
259
260 /* If you implement raw contexts for other processors, don't forget to
261    update the definition of HAVE_RAW_CONTEXTS in tools/cmake/CompleteInFiles.cmake */
262
263 raw_stack_t raw_makecontext(void* malloced_stack, int stack_size,
264                             rawctx_entry_point_t entry_point, void* arg) {
265    THROW_UNIMPLEMENTED;
266 }
267
268 void raw_swapcontext(raw_stack_t* old, raw_stack_t new_context) {
269    THROW_UNIMPLEMENTED;
270 }
271
272 #endif
273
274 // ***** Method definitions
275
276 namespace simgrid {
277 namespace simix {
278
279 RawContextFactory::RawContextFactory()
280   : ContextFactory("RawContextFactory")
281 {
282 #ifdef ADAPTIVE_THRESHOLD
283   raw_context_adaptative = (SIMIX_context_get_parallel_threshold() > 1);
284 #endif
285   raw_context_parallel = SIMIX_context_is_parallel();
286   if (raw_context_parallel) {
287 #ifdef HAVE_THREAD_CONTEXTS
288     int nthreads = SIMIX_context_get_nthreads();
289     xbt_os_thread_key_create(&raw_worker_id_key);
290     // TODO, lazily init
291     raw_parmap = nullptr;
292     raw_workers_context = xbt_new(RawContext*, nthreads);
293     raw_maestro_context = nullptr;
294 #endif
295     // TODO, if(SIMIX_context_get_parallel_threshold() > 1) => choose dynamically
296   }
297 #ifdef ADAPTIVE_THRESHOLD
298   round_time = xbt_os_timer_new();
299   reached_seq_limit = 0;
300   reached_par_limit = 0;
301 #endif
302 }
303
304 RawContextFactory::~RawContextFactory()
305 {
306 #ifdef HAVE_THREAD_CONTEXTS
307   if (raw_parmap)
308     xbt_parmap_destroy(raw_parmap);
309   xbt_free(raw_workers_context);
310 #endif
311 }
312
313 RawContext* RawContextFactory::create_context(std::function<void()> code,
314     void_pfn_smxprocess_t cleanup, smx_process_t process)
315 {
316   return this->new_context<RawContext>(std::move(code),
317     cleanup, process);
318 }
319
320 void RawContext::wrapper(void* arg)
321 {
322   RawContext* context = (RawContext*) arg;
323   (*context)();
324   context->stop();
325 }
326
327 RawContext::RawContext(std::function<void()> code,
328     void_pfn_smxprocess_t cleanup, smx_process_t process)
329   : Context(std::move(code), cleanup, process)
330 {
331    if (has_code()) {
332      this->stack_ = SIMIX_context_stack_new();
333      this->stack_top_ = raw_makecontext(this->stack_,
334                          smx_context_usable_stack_size,
335                          RawContext::wrapper,
336                          this);
337    } else {
338      if(process != nullptr && raw_maestro_context == nullptr)
339        raw_maestro_context = this;
340      if (MC_is_active())
341        MC_ignore_heap(
342          &raw_maestro_context->stack_top_,
343          sizeof(raw_maestro_context->stack_top_));
344    }
345 }
346
347 RawContext::~RawContext()
348 {
349   SIMIX_context_stack_delete(this->stack_);
350 }
351
352 void RawContext::stop()
353 {
354   Context::stop();
355   this->suspend();
356 }
357
358 void RawContextFactory::run_all()
359 {
360 #ifdef ADAPTIVE_THRESHOLD
361   if (raw_context_adaptative)
362     run_all_adaptative();
363   else
364 #endif
365   if (raw_context_parallel)
366     run_all_parallel();
367   else
368     run_all_serial();
369 }
370
371 void RawContextFactory::run_all_serial()
372 {
373   smx_process_t first_process =
374       xbt_dynar_get_as(simix_global->process_to_run, 0, smx_process_t);
375   raw_process_index = 1;
376   static_cast<RawContext*>(first_process->context)->resume_serial();
377 }
378
379 void RawContextFactory::run_all_parallel()
380 {
381 #ifdef HAVE_THREAD_CONTEXTS
382   raw_threads_working = 0;
383   if (raw_parmap == nullptr)
384     raw_parmap = xbt_parmap_new(
385       SIMIX_context_get_nthreads(), SIMIX_context_get_parallel_mode());
386   xbt_parmap_apply(raw_parmap,
387       [](void* arg) {
388         smx_process_t process = static_cast<smx_process_t>(arg);
389         RawContext* context = static_cast<RawContext*>(process->context);
390         context->resume_parallel();
391       },
392       simix_global->process_to_run);
393 #else
394   xbt_die("You asked for a parallel execution, but you don't have any threads.");
395 #endif
396 }
397
398 void RawContext::suspend()
399 {
400   if (raw_context_parallel)
401     RawContext::suspend_parallel();
402   else
403     RawContext::suspend_serial();
404 }
405
406 void RawContext::suspend_serial()
407 {
408   /* determine the next context */
409   RawContext* next_context = nullptr;
410   unsigned long int i;
411   i = raw_process_index++;
412   if (i < xbt_dynar_length(simix_global->process_to_run)) {
413     /* execute the next process */
414     XBT_DEBUG("Run next process");
415     next_context = (RawContext*) xbt_dynar_get_as(
416         simix_global->process_to_run, i, smx_process_t)->context;
417   }
418   else {
419     /* all processes were run, return to maestro */
420     XBT_DEBUG("No more process to run");
421     next_context = (RawContext*) raw_maestro_context;
422   }
423   SIMIX_context_set_current(next_context);
424   raw_swapcontext(&this->stack_top_, next_context->stack_top_);
425 }
426
427 void RawContext::suspend_parallel()
428 {
429 #ifdef HAVE_THREAD_CONTEXTS
430   /* determine the next context */
431   smx_process_t next_work = (smx_process_t) xbt_parmap_next(raw_parmap);
432   RawContext* next_context = nullptr;
433
434   if (next_work != NULL) {
435     /* there is a next process to resume */
436     XBT_DEBUG("Run next process");
437     next_context = (RawContext*) next_work->context;
438   }
439   else {
440     /* all processes were run, go to the barrier */
441     XBT_DEBUG("No more processes to run");
442     uintptr_t worker_id = (uintptr_t)
443       xbt_os_thread_get_specific(raw_worker_id_key);
444     next_context = (RawContext*) raw_workers_context[worker_id];
445     XBT_DEBUG("Restoring worker stack %zu (working threads = %zu)",
446         worker_id, raw_threads_working);
447   }
448
449   SIMIX_context_set_current(next_context);
450   raw_swapcontext(&this->stack_top_, next_context->stack_top_);
451 #endif
452 }
453
454 void RawContext::resume()
455 {
456   if (raw_context_parallel)
457     resume_parallel();
458   else
459     resume_serial();
460 }
461
462 void RawContext::resume_serial()
463 {
464   SIMIX_context_set_current(this);
465   raw_swapcontext(&raw_maestro_context->stack_top_, this->stack_top_);
466 }
467
468 void RawContext::resume_parallel()
469 {
470 #ifdef HAVE_THREAD_CONTEXTS
471   uintptr_t worker_id = __sync_fetch_and_add(&raw_threads_working, 1);
472   xbt_os_thread_set_specific(raw_worker_id_key, (void*) worker_id);
473   RawContext* worker_context = (RawContext*) SIMIX_context_self();
474   raw_workers_context[worker_id] = worker_context;
475   XBT_DEBUG("Saving worker stack %zu", worker_id);
476   SIMIX_context_set_current(this);
477   raw_swapcontext(&worker_context->stack_top_, this->stack_top_);
478 #else
479   xbt_die("Parallel execution disabled");
480 #endif
481 }
482
483 /**
484  * \brief Resumes all processes ready to run.
485  */
486 #ifdef ADAPTIVE_THRESHOLD
487 void RawContectFactory::run_all_adaptative()
488 {
489   unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run);
490   unsigned long threshold = SIMIX_context_get_parallel_threshold();
491   reached_seq_limit = (seq_sched_round % SCHED_ROUND_LIMIT == 0);
492   reached_par_limit = (par_sched_round % SCHED_ROUND_LIMIT == 0);
493
494   if(reached_seq_limit && reached_par_limit){
495     par_ratio = (par_proc_that_ran != 0) ? (par_time / (double)par_proc_that_ran) : 0;
496     seq_ratio = (seq_proc_that_ran != 0) ? (seq_time / (double)seq_proc_that_ran) : 0;
497     if(seq_ratio > par_ratio){
498        if(nb_processes < avg_par_proc) {
499           threshold = (threshold>2) ? threshold - 1 : threshold ;
500           SIMIX_context_set_parallel_threshold(threshold);
501         }
502     } else {
503         if(nb_processes > avg_seq_proc){
504           SIMIX_context_set_parallel_threshold(threshold+1);
505         }
506     }
507   }
508
509   if (nb_processes >= SIMIX_context_get_parallel_threshold()) {
510     simix_global->context_factory->suspend = smx_ctx_raw_suspend_parallel;
511     if (nb_processes < par_window){
512       par_sched_round++;
513       xbt_os_walltimer_start(round_time);
514       smx_ctx_raw_runall_parallel();
515       xbt_os_walltimer_stop(round_time);
516       par_time += xbt_os_timer_elapsed(round_time);
517
518       prev_avg_par_proc = avg_par_proc;
519       delta = nb_processes - avg_par_proc;
520       avg_par_proc = (par_sched_round==1) ? nb_processes : avg_par_proc + delta / (double) par_sched_round;
521
522       if(par_sched_round>=2){
523         s_par_proc = s_par_proc + (nb_processes - prev_avg_par_proc) * delta;
524         sd_par_proc = sqrt(s_par_proc / (par_sched_round-1));
525         par_window = (int) (avg_par_proc + sd_par_proc);
526       }else{
527         sd_par_proc = 0;
528       }
529
530       par_proc_that_ran += nb_processes;
531     } else{
532       smx_ctx_raw_runall_parallel();
533     }
534   } else {
535     simix_global->context_factory->suspend = smx_ctx_raw_suspend_serial;
536     if(nb_processes > seq_window){
537       seq_sched_round++;
538       xbt_os_walltimer_start(round_time);
539       smx_ctx_raw_runall_serial();
540       xbt_os_walltimer_stop(round_time);
541       seq_time += xbt_os_timer_elapsed(round_time);
542
543       prev_avg_seq_proc = avg_seq_proc;
544       delta = (nb_processes-avg_seq_proc);
545       avg_seq_proc = (seq_sched_round==1) ? nb_processes : avg_seq_proc + delta / (double) seq_sched_round;
546
547       if(seq_sched_round>=2){
548         s_seq_proc = s_seq_proc + (nb_processes - prev_avg_seq_proc)*delta;
549         sd_seq_proc = sqrt(s_seq_proc / (seq_sched_round-1));
550         seq_window = (int) (avg_seq_proc - sd_seq_proc);
551       } else {
552         sd_seq_proc = 0;
553       }
554
555       seq_proc_that_ran += nb_processes;
556     } else {
557       smx_ctx_raw_runall_serial();
558     }
559   }
560 }
561
562 #else
563
564 // TODO
565 void RawContextFactory::run_all_adaptative()
566 {
567   unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run);
568   if (SIMIX_context_is_parallel()
569     && (unsigned long) SIMIX_context_get_parallel_threshold() < nb_processes) {
570         raw_context_parallel = true;
571         XBT_DEBUG("Runall // %lu", nb_processes);
572         this->run_all_parallel();
573     } else {
574         XBT_DEBUG("Runall serial %lu", nb_processes);
575         raw_context_parallel = false;
576         this->run_all_serial();
577     }
578 }
579 #endif
580
581 }
582 }