Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of github.com:mquinson/simgrid
[simgrid.git] / src / simix / RawContext.cpp
1 /* Copyright (c) 2009-2015. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 /** \file RawContext.cpp 
8   * Fast context switching inspired from SystemV ucontexts.
9   *
10   * In contrast to System V context, it does not touch the signal mask
11   * which avoids making a system call (at least on Linux).
12   */
13
14 #include <math.h>
15
16 #include <utility>
17 #include <functional>
18
19 #include <xbt/log.h>
20 #include <xbt/parmap.h>
21 #include <xbt/dynar.h>
22
23 #include "smx_private.h"
24 #include "smx_private.hpp"
25 #include "mc/mc.h"
26
27 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_context);
28
29 // ***** Class definitions
30
31 namespace simgrid {
32 namespace simix {
33
34 class RawContext;
35 class RawContextFactory;
36
37 class RawContext : public Context {
38 protected:
39   void* stack_ = nullptr; 
40   /** pointer to top the stack stack */
41   void* stack_top_ = nullptr;
42 public:
43   friend class RawContextFactory;
44   RawContext(std::function<void()> code,
45           void_pfn_smxprocess_t cleanup_func,
46           smx_process_t process);
47   ~RawContext();
48 public:
49   static void wrapper(void* arg);
50   void stop() override;
51   void suspend() override;
52   void resume();
53 private:
54   void suspend_serial();
55   void suspend_parallel();
56   void resume_serial();
57   void resume_parallel();
58 };
59
60 class RawContextFactory : public ContextFactory {
61 public:
62   RawContextFactory();
63   ~RawContextFactory();
64   RawContext* create_context(std::function<void()> code,
65     void_pfn_smxprocess_t, smx_process_t process) override;
66   void run_all() override;
67 private:
68   void run_all_adaptative();
69   void run_all_serial();
70   void run_all_parallel();
71 };
72
73 ContextFactory* raw_factory()
74 {
75   XBT_VERB("Using raw contexts. "
76     "Because the glibc is just not good enough for us.");
77   return new RawContextFactory();
78 }
79
80 }
81 }
82
83 // ***** Loads of static stuff
84
85 #ifdef HAVE_THREAD_CONTEXTS
86 static xbt_parmap_t raw_parmap;
87 static simgrid::simix::RawContext** raw_workers_context;    /* space to save the worker context in each thread */
88 static unsigned long raw_threads_working;     /* number of threads that have started their work */
89 static xbt_os_thread_key_t raw_worker_id_key; /* thread-specific storage for the thread id */
90 #endif
91 #ifdef ADAPTIVE_THRESHOLD
92 #define SCHED_ROUND_LIMIT 5
93 static xbt_os_timer_t round_time;
94 static double par_time,seq_time;
95 static double par_ratio,seq_ratio;
96 static int reached_seq_limit, reached_par_limit;
97 static unsigned int par_proc_that_ran = 0,seq_proc_that_ran = 0;  /* Counters of processes that have run in SCHED_ROUND_LIMIT scheduling rounds */
98 static unsigned int seq_sched_round=0, par_sched_round=0; /* Amount of SR that ran serial/parallel*/
99 /*Varables used to calculate running variance and mean*/
100 static double prev_avg_par_proc=0,prev_avg_seq_proc=0;
101 static double delta=0;
102 static double s_par_proc=0,s_seq_proc=0; /*Standard deviation of number of processes computed in par/seq during the current simulation*/
103 static double avg_par_proc=0,sd_par_proc=0;
104 static double avg_seq_proc=0,sd_seq_proc=0;
105 static long long par_window=(long long)HUGE_VAL,seq_window=0;
106 #endif
107 static unsigned long raw_process_index = 0;   /* index of the next process to run in the
108                                                * list of runnable processes */
109 static simgrid::simix::RawContext* raw_maestro_context;
110
111 static bool raw_context_parallel = false;
112 #ifdef ADAPTIVE_THRESHOLD
113 static bool raw_context_adaptative = false;
114 #endif
115
116 // ***** Raw context routines
117
118 typedef void (*rawctx_entry_point_t)(void *);
119
120 typedef void* raw_stack_t;
121 extern "C" raw_stack_t raw_makecontext(void* malloced_stack, int stack_size,
122                                    rawctx_entry_point_t entry_point, void* arg);
123 extern "C" void raw_swapcontext(raw_stack_t* old, raw_stack_t new_context);
124
125 #if PROCESSOR_x86_64
126 __asm__ (
127 #if defined(APPLE)
128    ".text\n"
129    ".globl _raw_makecontext\n"
130    "_raw_makecontext:\n"
131 #elif defined(_XBT_WIN32)
132    ".text\n"
133    ".globl raw_makecontext\n"
134    "raw_makecontext:\n"
135 #else
136    ".text\n"
137    ".globl raw_makecontext\n"
138    ".type raw_makecontext,@function\n"
139    "raw_makecontext:\n"/* Calling convention sets the arguments in rdi, rsi, rdx and rcx, respectively */
140 #endif
141    "   mov %rdi,%rax\n"      /* stack */
142    "   add %rsi,%rax\n"      /* size  */
143    "   andq $-16, %rax\n"    /* align stack */
144    "   movq $0,   -8(%rax)\n" /* @return for func */
145    "   mov %rdx,-16(%rax)\n" /* func */
146    "   mov %rcx,-24(%rax)\n" /* arg/rdi */
147    "   movq $0,  -32(%rax)\n" /* rsi */
148    "   movq $0,  -40(%rax)\n" /* rdx */
149    "   movq $0,  -48(%rax)\n" /* rcx */
150    "   movq $0,  -56(%rax)\n" /* r8  */
151    "   movq $0,  -64(%rax)\n" /* r9  */
152    "   movq $0,  -72(%rax)\n" /* rbp */
153    "   movq $0,  -80(%rax)\n" /* rbx */
154    "   movq $0,  -88(%rax)\n" /* r12 */
155    "   movq $0,  -96(%rax)\n" /* r13 */
156    "   movq $0, -104(%rax)\n" /* r14 */
157    "   movq $0, -112(%rax)\n" /* r15 */
158    "   sub $112,%rax\n"
159    "   ret\n"
160 );
161
162 __asm__ (
163 #if defined(APPLE)
164    ".text\n"
165    ".globl _raw_swapcontext\n"
166    "_raw_swapcontext:\n"
167 #elif defined(_XBT_WIN32)
168    ".text\n"
169    ".globl raw_swapcontext\n"
170    "raw_swapcontext:\n"
171 #else
172    ".text\n"
173    ".globl raw_swapcontext\n"
174    ".type raw_swapcontext,@function\n"
175    "raw_swapcontext:\n" /* Calling convention sets the arguments in rdi and rsi, respectively */
176 #endif
177    "   push %rdi\n"
178    "   push %rsi\n"
179    "   push %rdx\n"
180    "   push %rcx\n"
181    "   push %r8\n"
182    "   push %r9\n"
183    "   push %rbp\n"
184    "   push %rbx\n"
185    "   push %r12\n"
186    "   push %r13\n"
187    "   push %r14\n"
188    "   push %r15\n"
189    "   mov %rsp,(%rdi)\n" /* old */
190    "   mov %rsi,%rsp\n" /* new */
191    "   pop %r15\n"
192    "   pop %r14\n"
193    "   pop %r13\n"
194    "   pop %r12\n"
195    "   pop %rbx\n"
196    "   pop %rbp\n"
197    "   pop %r9\n"
198    "   pop %r8\n"
199    "   pop %rcx\n"
200    "   pop %rdx\n"
201    "   pop %rsi\n"
202    "   pop %rdi\n"
203    "   ret\n"
204 );
205 #elif PROCESSOR_i686
206 __asm__ (
207 #if defined(APPLE) || defined(_XBT_WIN32)
208    ".text\n"
209    ".globl _raw_makecontext\n"
210    "_raw_makecontext:\n"
211 #else
212    ".text\n"
213    ".globl raw_makecontext\n"
214    ".type raw_makecontext,@function\n"
215    "raw_makecontext:\n"
216 #endif
217    "   movl 4(%esp),%eax\n"   /* stack */
218    "   addl 8(%esp),%eax\n"   /* size  */
219    "   andl $-16, %eax\n"     /* align stack */
220    "   movl 12(%esp),%ecx\n"  /* func  */
221    "   movl 16(%esp),%edx\n"  /* arg   */
222    "   movl %edx, -4(%eax)\n"
223    "   movl $0,   -8(%eax)\n" /* @return for func */
224    "   movl %ecx,-12(%eax)\n"
225    "   movl $0,  -16(%eax)\n" /* ebp */
226    "   movl $0,  -20(%eax)\n" /* ebx */
227    "   movl $0,  -24(%eax)\n" /* esi */
228    "   movl $0,  -28(%eax)\n" /* edi */
229    "   subl $28,%eax\n"
230    "   retl\n"
231 );
232
233 __asm__ (
234 #if defined(APPLE) || defined(_XBT_WIN32)
235    ".text\n"
236    ".globl _raw_swapcontext\n"
237    "_raw_swapcontext:\n"
238 #else
239    ".text\n"
240    ".globl raw_swapcontext\n"
241    ".type raw_swapcontext,@function\n"
242    "raw_swapcontext:\n"
243 #endif
244    "   movl 4(%esp),%eax\n" /* old */
245    "   movl 8(%esp),%edx\n" /* new */
246    "   pushl %ebp\n"
247    "   pushl %ebx\n"
248    "   pushl %esi\n"
249    "   pushl %edi\n"
250    "   movl %esp,(%eax)\n"
251    "   movl %edx,%esp\n"
252    "   popl %edi\n"
253    "   popl %esi\n"
254    "   popl %ebx\n"
255    "   popl %ebp\n"
256    "   retl\n"
257 );
258 #else
259
260
261 /* If you implement raw contexts for other processors, don't forget to
262    update the definition of HAVE_RAW_CONTEXTS in tools/cmake/CompleteInFiles.cmake */
263
264 raw_stack_t raw_makecontext(void* malloced_stack, int stack_size,
265                             rawctx_entry_point_t entry_point, void* arg) {
266    THROW_UNIMPLEMENTED;
267 }
268
269 void raw_swapcontext(raw_stack_t* old, raw_stack_t new_context) {
270    THROW_UNIMPLEMENTED;
271 }
272
273 #endif
274
275 // ***** Method definitions
276
277 namespace simgrid {
278 namespace simix {
279
280 RawContextFactory::RawContextFactory()
281   : ContextFactory("RawContextFactory")
282 {
283 #ifdef ADAPTIVE_THRESHOLD
284   raw_context_adaptative = (SIMIX_context_get_parallel_threshold() > 1);
285 #endif
286   raw_context_parallel = SIMIX_context_is_parallel();
287   if (raw_context_parallel) {
288 #ifdef HAVE_THREAD_CONTEXTS
289     int nthreads = SIMIX_context_get_nthreads();
290     xbt_os_thread_key_create(&raw_worker_id_key);
291     // TODO, lazily init
292     raw_parmap = nullptr;
293     raw_workers_context = xbt_new(RawContext*, nthreads);
294     raw_maestro_context = nullptr;
295 #endif
296     // TODO, if(SIMIX_context_get_parallel_threshold() > 1) => choose dynamically
297   }
298 #ifdef ADAPTIVE_THRESHOLD
299   round_time = xbt_os_timer_new();
300   reached_seq_limit = 0;
301   reached_par_limit = 0;
302 #endif
303 }
304
305 RawContextFactory::~RawContextFactory()
306 {
307 #ifdef HAVE_THREAD_CONTEXTS
308   if (raw_parmap)
309     xbt_parmap_destroy(raw_parmap);
310   xbt_free(raw_workers_context);
311 #endif
312 }
313
314 RawContext* RawContextFactory::create_context(std::function<void()> code,
315     void_pfn_smxprocess_t cleanup, smx_process_t process)
316 {
317   return this->new_context<RawContext>(std::move(code),
318     cleanup, process);
319 }
320
321 void RawContext::wrapper(void* arg)
322 {
323   RawContext* context = (RawContext*) arg;
324   (*context)();
325   context->stop();
326 }
327
328 RawContext::RawContext(std::function<void()> code,
329     void_pfn_smxprocess_t cleanup, smx_process_t process)
330   : Context(std::move(code), cleanup, process)
331 {
332    if (has_code()) {
333      this->stack_ = SIMIX_context_stack_new();
334      this->stack_top_ = raw_makecontext(this->stack_,
335                          smx_context_usable_stack_size,
336                          RawContext::wrapper,
337                          this);
338    } else {
339      if(process != nullptr && raw_maestro_context == nullptr)
340        raw_maestro_context = this;
341      if (MC_is_active())
342        MC_ignore_heap(
343          &raw_maestro_context->stack_top_,
344          sizeof(raw_maestro_context->stack_top_));
345    }
346 }
347
348 RawContext::~RawContext()
349 {
350   SIMIX_context_stack_delete(this->stack_);
351 }
352
353 void RawContext::stop()
354 {
355   Context::stop();
356   this->suspend();
357 }
358
359 void RawContextFactory::run_all()
360 {
361 #ifdef ADAPTIVE_THRESHOLD
362   if (raw_context_adaptative)
363     run_all_adaptative();
364   else
365 #endif
366   if (raw_context_parallel)
367     run_all_parallel();
368   else
369     run_all_serial();
370 }
371
372 void RawContextFactory::run_all_serial()
373 {
374   smx_process_t first_process =
375       xbt_dynar_get_as(simix_global->process_to_run, 0, smx_process_t);
376   raw_process_index = 1;
377   static_cast<RawContext*>(first_process->context)->resume_serial();
378 }
379
380 void RawContextFactory::run_all_parallel()
381 {
382 #ifdef HAVE_THREAD_CONTEXTS
383   raw_threads_working = 0;
384   if (raw_parmap == nullptr)
385     raw_parmap = xbt_parmap_new(
386       SIMIX_context_get_nthreads(), SIMIX_context_get_parallel_mode());
387   xbt_parmap_apply(raw_parmap,
388       [](void* arg) {
389         smx_process_t process = static_cast<smx_process_t>(arg);
390         RawContext* context = static_cast<RawContext*>(process->context);
391         context->resume_parallel();
392       },
393       simix_global->process_to_run);
394 #else
395   xbt_die("You asked for a parallel execution, but you don't have any threads.");
396 #endif
397 }
398
399 void RawContext::suspend()
400 {
401   if (raw_context_parallel)
402     RawContext::suspend_parallel();
403   else
404     RawContext::suspend_serial();
405 }
406
407 void RawContext::suspend_serial()
408 {
409   /* determine the next context */
410   RawContext* next_context = nullptr;
411   unsigned long int i;
412   i = raw_process_index++;
413   if (i < xbt_dynar_length(simix_global->process_to_run)) {
414     /* execute the next process */
415     XBT_DEBUG("Run next process");
416     next_context = (RawContext*) xbt_dynar_get_as(
417         simix_global->process_to_run, i, smx_process_t)->context;
418   }
419   else {
420     /* all processes were run, return to maestro */
421     XBT_DEBUG("No more process to run");
422     next_context = (RawContext*) raw_maestro_context;
423   }
424   SIMIX_context_set_current(next_context);
425   raw_swapcontext(&this->stack_top_, next_context->stack_top_);
426 }
427
428 void RawContext::suspend_parallel()
429 {
430 #ifdef HAVE_THREAD_CONTEXTS
431   /* determine the next context */
432   smx_process_t next_work = (smx_process_t) xbt_parmap_next(raw_parmap);
433   RawContext* next_context = nullptr;
434
435   if (next_work != NULL) {
436     /* there is a next process to resume */
437     XBT_DEBUG("Run next process");
438     next_context = (RawContext*) next_work->context;
439   }
440   else {
441     /* all processes were run, go to the barrier */
442     XBT_DEBUG("No more processes to run");
443     unsigned long worker_id = (unsigned long)(uintptr_t)
444       xbt_os_thread_get_specific(raw_worker_id_key);
445     next_context = (RawContext*) raw_workers_context[worker_id];
446     XBT_DEBUG("Restoring worker stack %lu (working threads = %lu)",
447         worker_id, raw_threads_working);
448   }
449
450   SIMIX_context_set_current(next_context);
451   raw_swapcontext(&this->stack_top_, next_context->stack_top_);
452 #endif
453 }
454
455 void RawContext::resume()
456 {
457   if (raw_context_parallel)
458     resume_parallel();
459   else
460     resume_serial();
461 }
462
463 void RawContext::resume_serial()
464 {
465   SIMIX_context_set_current(this);
466   raw_swapcontext(&raw_maestro_context->stack_top_, this->stack_top_);
467 }
468
469 void RawContext::resume_parallel()
470 {
471 #ifdef HAVE_THREAD_CONTEXTS
472   unsigned long worker_id = __sync_fetch_and_add(&raw_threads_working, 1);
473   xbt_os_thread_set_specific(raw_worker_id_key, (void*)(uintptr_t) worker_id);
474   RawContext* worker_context = (RawContext*) SIMIX_context_self();
475   raw_workers_context[worker_id] = worker_context;
476   XBT_DEBUG("Saving worker stack %lu", worker_id);
477   SIMIX_context_set_current(this);
478   raw_swapcontext(&worker_context->stack_top_, this->stack_top_);
479 #else
480   xbt_die("Parallel execution disabled");
481 #endif
482 }
483
484 /**
485  * \brief Resumes all processes ready to run.
486  */
487 #ifdef ADAPTIVE_THRESHOLD
488 void RawContectFactory::run_all_adaptative()
489 {
490   unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run);
491   unsigned long threshold = SIMIX_context_get_parallel_threshold();
492   reached_seq_limit = (seq_sched_round % SCHED_ROUND_LIMIT == 0);
493   reached_par_limit = (par_sched_round % SCHED_ROUND_LIMIT == 0);
494
495   if(reached_seq_limit && reached_par_limit){
496     par_ratio = (par_proc_that_ran != 0) ? (par_time / (double)par_proc_that_ran) : 0;
497     seq_ratio = (seq_proc_that_ran != 0) ? (seq_time / (double)seq_proc_that_ran) : 0;
498     if(seq_ratio > par_ratio){
499        if(nb_processes < avg_par_proc) {
500           threshold = (threshold>2) ? threshold - 1 : threshold ;
501           SIMIX_context_set_parallel_threshold(threshold);
502         }
503     } else {
504         if(nb_processes > avg_seq_proc){
505           SIMIX_context_set_parallel_threshold(threshold+1);
506         }
507     }
508   }
509
510   if (nb_processes >= SIMIX_context_get_parallel_threshold()) {
511     simix_global->context_factory->suspend = smx_ctx_raw_suspend_parallel;
512     if (nb_processes < par_window){
513       par_sched_round++;
514       xbt_os_walltimer_start(round_time);
515       smx_ctx_raw_runall_parallel();
516       xbt_os_walltimer_stop(round_time);
517       par_time += xbt_os_timer_elapsed(round_time);
518
519       prev_avg_par_proc = avg_par_proc;
520       delta = nb_processes - avg_par_proc;
521       avg_par_proc = (par_sched_round==1) ? nb_processes : avg_par_proc + delta / (double) par_sched_round;
522
523       if(par_sched_round>=2){
524         s_par_proc = s_par_proc + (nb_processes - prev_avg_par_proc) * delta;
525         sd_par_proc = sqrt(s_par_proc / (par_sched_round-1));
526         par_window = (int) (avg_par_proc + sd_par_proc);
527       }else{
528         sd_par_proc = 0;
529       }
530
531       par_proc_that_ran += nb_processes;
532     } else{
533       smx_ctx_raw_runall_parallel();
534     }
535   } else {
536     simix_global->context_factory->suspend = smx_ctx_raw_suspend_serial;
537     if(nb_processes > seq_window){
538       seq_sched_round++;
539       xbt_os_walltimer_start(round_time);
540       smx_ctx_raw_runall_serial();
541       xbt_os_walltimer_stop(round_time);
542       seq_time += xbt_os_timer_elapsed(round_time);
543
544       prev_avg_seq_proc = avg_seq_proc;
545       delta = (nb_processes-avg_seq_proc);
546       avg_seq_proc = (seq_sched_round==1) ? nb_processes : avg_seq_proc + delta / (double) seq_sched_round;
547
548       if(seq_sched_round>=2){
549         s_seq_proc = s_seq_proc + (nb_processes - prev_avg_seq_proc)*delta;
550         sd_seq_proc = sqrt(s_seq_proc / (seq_sched_round-1));
551         seq_window = (int) (avg_seq_proc - sd_seq_proc);
552       } else {
553         sd_seq_proc = 0;
554       }
555
556       seq_proc_that_ran += nb_processes;
557     } else {
558       smx_ctx_raw_runall_serial();
559     }
560   }
561 }
562
563 #else
564
565 // TODO
566 void RawContextFactory::run_all_adaptative()
567 {
568   unsigned long nb_processes = xbt_dynar_length(simix_global->process_to_run);
569   if (SIMIX_context_is_parallel()
570     && (unsigned long) SIMIX_context_get_parallel_threshold() < nb_processes) {
571         raw_context_parallel = true;
572         XBT_DEBUG("Runall // %lu", nb_processes);
573         this->run_all_parallel();
574     } else {
575         XBT_DEBUG("Runall serial %lu", nb_processes);
576         raw_context_parallel = false;
577         this->run_all_serial();
578     }
579 }
580 #endif
581
582 }
583 }