Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f5748846b6f9c04c5686add1c92a087f3e2b4238
[simgrid.git] / src / xbt / parmap.cpp
1 /* Copyright (c) 2004-2005, 2007, 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <atomic>
8
9 #include "src/internal_config.h"
10 #if HAVE_UNISTD_H
11 #include <unistd.h>
12 #endif
13
14 #ifndef _WIN32
15 #include <sys/syscall.h>
16 #endif
17
18 #if HAVE_FUTEX_H
19 #include <linux/futex.h>
20 #include <limits.h>
21 #endif
22
23 #include "xbt/parmap.h"
24 #include "xbt/log.h"
25 #include "xbt/function_types.h"
26 #include "xbt/dynar.h"
27 #include "xbt/xbt_os_thread.h"
28 #include "xbt/sysdep.h"
29 #include "src/simix/smx_private.h"
30 #include "src/simix/smx_private.hpp"
31
32 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
33
34 typedef enum {
35   XBT_PARMAP_WORK,
36   XBT_PARMAP_DESTROY
37 } e_xbt_parmap_flag_t;
38
39 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
40 static void *xbt_parmap_worker_main(void *parmap);
41 static void xbt_parmap_work(xbt_parmap_t parmap);
42
43 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
44 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
46 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
47
48 #if HAVE_FUTEX_H
49 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
50 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
51 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
52 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
53 static void futex_wait(unsigned *uaddr, unsigned val);
54 static void futex_wake(unsigned *uaddr, unsigned val);
55 #endif
56
57 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
58 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
59 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
60 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
61
62 #if HAVE_MC
63 static void xbt_parmap_mc_work(xbt_parmap_t parmap, int worker_id);
64 static void *xbt_parmap_mc_worker_main(void *arg);
65 #endif
66
67 /**
68  * \brief Parallel map structure
69  */
70 typedef struct s_xbt_parmap {
71   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
72   unsigned work;                   /**< index of the current round */
73   unsigned thread_counter;         /**< number of workers that have done the work */
74
75   unsigned int num_workers;        /**< total number of worker threads including the controller */
76   xbt_os_thread_t *workers;        /**< worker thread handlers */
77   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
78   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
79   std::atomic<unsigned int> index; /**< index of the next element of data to pick */
80
81 #if HAVE_MC
82   int finish;
83   void* ref_snapshot;
84   int_f_pvoid_pvoid_t snapshot_compare;
85   unsigned int length;
86   void* mc_data;
87 #endif
88
89   /* posix only */
90   xbt_os_cond_t ready_cond;
91   xbt_os_mutex_t ready_mutex;
92   xbt_os_cond_t done_cond;
93   xbt_os_mutex_t done_mutex;
94
95   /* fields that depend on the synchronization mode */
96   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
97   void (*master_wait_f)(xbt_parmap_t);    /**< wait for the workers to have done the work */
98   void (*worker_signal_f)(xbt_parmap_t);  /**< signal the master that a worker has done the work */
99   void (*master_signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
100   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
101 } s_xbt_parmap_t;
102
103 /**
104  * \brief Thread data transmission structure
105  */
106 typedef struct s_xbt_parmap_thread_data{
107   xbt_parmap_t parmap;
108   int worker_id;
109 } s_xbt_parmap_thread_data_t;
110
111 typedef s_xbt_parmap_thread_data_t *xbt_parmap_thread_data_t;
112
113 /**
114  * \brief Creates a parallel map object
115  * \param num_workers number of worker threads to create
116  * \param mode how to synchronize the worker threads
117  * \return the parmap created
118  */
119 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
120 {
121   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
122
123   /* Initialize the thread pool data structure */
124   xbt_parmap_t parmap = new s_xbt_parmap_t();
125   parmap->workers = xbt_new(xbt_os_thread_t, num_workers);
126
127   parmap->num_workers = num_workers;
128   parmap->status = XBT_PARMAP_WORK;
129   xbt_parmap_set_mode(parmap, mode);
130
131   /* Create the pool of worker threads */
132   xbt_parmap_thread_data_t data;
133   parmap->workers[0] = NULL;
134 #if HAVE_PTHREAD_SETAFFINITY
135   int core_bind = 0;
136 #endif  
137   for (unsigned int i = 1; i < num_workers; i++) {
138     data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
139     data->parmap = parmap;
140     data->worker_id = i;
141     parmap->workers[i] = xbt_os_thread_create(NULL, xbt_parmap_worker_main, data, NULL);
142 #if HAVE_PTHREAD_SETAFFINITY
143     xbt_os_thread_bind(parmap->workers[i], core_bind); 
144     if (core_bind != xbt_os_get_numcores())
145       core_bind++;
146     else
147       core_bind = 0; 
148 #endif    
149   }
150   return parmap;
151 }
152
153 #if HAVE_MC
154 /**
155  * \brief Creates a parallel map object
156  * \param num_workers number of worker threads to create
157  * \param mode how to synchronize the worker threads
158  * \return the parmap created
159  */
160 xbt_parmap_t xbt_parmap_mc_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
161 {
162   unsigned int i;
163
164   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
165
166   /* Initialize the thread pool data structure */
167   xbt_parmap_t parmap = new s_xbt_parmap_t();
168   parmap->workers = xbt_new(xbt_os_thread_t, num_workers);
169
170   parmap->num_workers = num_workers;
171   parmap->status = XBT_PARMAP_WORK;
172   xbt_parmap_set_mode(parmap, mode);
173
174   /* Create the pool of worker threads */
175   xbt_parmap_thread_data_t data;
176   parmap->workers[0] = NULL;
177   for (i = 1; i < num_workers; i++) {
178     data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
179     data->parmap = parmap;
180     data->worker_id = i;
181     parmap->workers[i] = xbt_os_thread_create(NULL, xbt_parmap_mc_worker_main,
182                                               data, NULL);
183   }
184   return parmap;
185 }
186 #endif
187
188 /**
189  * \brief Destroys a parmap
190  * \param parmap the parmap to destroy
191  */
192 void xbt_parmap_destroy(xbt_parmap_t parmap)
193 {
194   if (!parmap) {
195     return;
196   }
197
198   parmap->status = XBT_PARMAP_DESTROY;
199   parmap->master_signal_f(parmap);
200
201   unsigned int i;
202   for (i = 1; i < parmap->num_workers; i++)
203     xbt_os_thread_join(parmap->workers[i], NULL);
204
205   xbt_os_cond_destroy(parmap->ready_cond);
206   xbt_os_mutex_destroy(parmap->ready_mutex);
207   xbt_os_cond_destroy(parmap->done_cond);
208   xbt_os_mutex_destroy(parmap->done_mutex);
209
210   xbt_free(parmap->workers);
211   delete parmap;
212 }
213
214 /**
215  * \brief Sets the synchronization mode of a parmap.
216  * \param parmap a parallel map object
217  * \param mode the synchronization mode
218  */
219 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
220 {
221   if (mode == XBT_PARMAP_DEFAULT) {
222 #if HAVE_FUTEX_H
223     mode = XBT_PARMAP_FUTEX;
224 #else
225     mode = XBT_PARMAP_POSIX;
226 #endif
227   }
228   parmap->mode = mode;
229
230   switch (mode) {
231
232     case XBT_PARMAP_POSIX:
233       parmap->master_wait_f = xbt_parmap_posix_master_wait;
234       parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
235       parmap->master_signal_f = xbt_parmap_posix_master_signal;
236       parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
237
238       parmap->ready_cond = xbt_os_cond_init();
239       parmap->ready_mutex = xbt_os_mutex_init();
240       parmap->done_cond = xbt_os_cond_init();
241       parmap->done_mutex = xbt_os_mutex_init();
242       break;
243
244
245     case XBT_PARMAP_FUTEX:
246 #if HAVE_FUTEX_H
247       parmap->master_wait_f = xbt_parmap_futex_master_wait;
248       parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
249       parmap->master_signal_f = xbt_parmap_futex_master_signal;
250       parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
251
252       xbt_os_cond_destroy(parmap->ready_cond);
253       xbt_os_mutex_destroy(parmap->ready_mutex);
254       xbt_os_cond_destroy(parmap->done_cond);
255       xbt_os_mutex_destroy(parmap->done_mutex);
256       break;
257 #else
258       xbt_die("Futex is not available on this OS.");
259 #endif
260
261     case XBT_PARMAP_BUSY_WAIT:
262 #ifndef _MSC_VER
263       parmap->master_wait_f = xbt_parmap_busy_master_wait;
264       parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
265       parmap->master_signal_f = xbt_parmap_busy_master_signal;
266       parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
267
268       xbt_os_cond_destroy(parmap->ready_cond);
269       xbt_os_mutex_destroy(parmap->ready_mutex);
270       xbt_os_cond_destroy(parmap->done_cond);
271       xbt_os_mutex_destroy(parmap->done_mutex);
272       break;
273 #else
274       xbt_die("Busy waiting not implemented on Windows yet.");
275 #endif
276
277     case XBT_PARMAP_DEFAULT:
278       THROW_IMPOSSIBLE;
279       break;
280   }
281 }
282
283 /**
284  * \brief Applies a list of tasks in parallel.
285  * \param parmap a parallel map object
286  * \param fun the function to call in parallel
287  * \param data each element of this dynar will be passed as an argument to fun
288  */
289 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
290 {
291   /* Assign resources to worker threads (we are maestro here)*/
292   parmap->fun = fun;
293   parmap->data = data;
294   parmap->index = 0;
295   parmap->master_signal_f(parmap); // maestro runs futex_wait to wake all the minions (the working threads)
296   xbt_parmap_work(parmap);         // maestro works with its minions
297   parmap->master_wait_f(parmap);   // When there is no more work to do, then maestro waits for the last minion to stop
298   XBT_DEBUG("Job done");           //   ... and proceeds
299 }
300
301 /**
302  * \brief Returns a next task to process.
303  *
304  * Worker threads call this function to get more work.
305  *
306  * \return the next task to process, or NULL if there is no more work
307  */
308 void* xbt_parmap_next(xbt_parmap_t parmap)
309 {
310   unsigned int index = parmap->index++;
311   if (index < xbt_dynar_length(parmap->data)) {
312     return xbt_dynar_get_as(parmap->data, index, void*);
313   }
314   return NULL;
315 }
316
317 static void xbt_parmap_work(xbt_parmap_t parmap)
318 {
319   unsigned index;
320   while ((index = parmap->index++)
321          < xbt_dynar_length(parmap->data))
322     parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
323 }
324
325 /**
326  * \brief Main function of a worker thread.
327  * \param arg the parmap
328  */
329 static void *xbt_parmap_worker_main(void *arg)
330 {
331   xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
332   xbt_parmap_t parmap = data->parmap;
333   unsigned round = 0;
334   smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL);
335   SIMIX_context_set_current(context);
336
337   XBT_DEBUG("New worker thread created");
338
339   /* Worker's main loop */
340   while (1) {
341     parmap->worker_wait_f(parmap, ++round);
342     if (parmap->status == XBT_PARMAP_WORK) {
343
344       XBT_DEBUG("Worker %d got a job", data->worker_id);
345
346       xbt_parmap_work(parmap);
347       parmap->worker_signal_f(parmap);
348
349       XBT_DEBUG("Worker %d has finished", data->worker_id);
350
351     /* We are destroying the parmap */
352     } else {
353       SIMIX_context_free(context);
354       xbt_free(data);
355       return NULL;
356     }
357   }
358 }
359
360 #if HAVE_MC
361
362 /**
363  * \brief Applies a list of tasks in parallel.
364  * \param parmap a parallel map object
365  * \param fun the function to call in parallel
366  * \param data each element of this dynar will be passed as an argument to fun
367  */
368 int xbt_parmap_mc_apply(xbt_parmap_t parmap, int_f_pvoid_pvoid_t fun, 
369                          void* data, unsigned int length,  void* ref_snapshot)
370 {
371   /* Assign resources to worker threads */
372   parmap->snapshot_compare = fun;
373   parmap->mc_data = data;
374   parmap->index = 0;
375   parmap->finish = -1;
376   parmap->length = length;
377   parmap->ref_snapshot = ref_snapshot;
378   parmap->master_signal_f(parmap);
379   xbt_parmap_mc_work(parmap, 0);
380   parmap->master_wait_f(parmap);
381   XBT_DEBUG("Job done");
382   return parmap->finish;
383 }
384
385 static void xbt_parmap_mc_work(xbt_parmap_t parmap, int worker_id)
386 {
387   unsigned int data_size = (parmap->length / parmap->num_workers) +
388     ((parmap->length % parmap->num_workers) ? 1 :0);
389   void* start = (char*)parmap->mc_data + (data_size*worker_id*sizeof(void*));
390   void* end = MIN((char *)start + data_size* sizeof(void*), (char*)parmap->mc_data + parmap->length*sizeof(void*));
391   
392   //XBT_CRITICAL("Worker %d : %p -> %p (%d)", worker_id, start, end, data_size);
393
394   while ( start < end && parmap->finish == -1) {
395     //XBT_CRITICAL("Starting with %p", start);
396     int res = parmap->snapshot_compare(*(void**)start, parmap->ref_snapshot);
397     start = (char *)start + sizeof(start);
398     if (!res){
399     
400       parmap->finish = ((char*)start - (char*)parmap->mc_data) / sizeof(void*);
401       //XBT_CRITICAL("Find good one %p (%p)", start, parmap->mc_data);
402       break;
403     }
404   }
405 }
406
407 /**
408  * \brief Main function of a worker thread.
409  * \param arg the parmap
410  */
411 static void *xbt_parmap_mc_worker_main(void *arg)
412 {
413   xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
414   xbt_parmap_t parmap = data->parmap;
415   unsigned round = 0;
416   /* smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL); */
417   /* SIMIX_context_set_current(context); */
418
419   XBT_DEBUG("New worker thread created");
420
421   /* Worker's main loop */
422   while (1) {
423     parmap->worker_wait_f(parmap, ++round);
424     if (parmap->status == XBT_PARMAP_WORK) {
425
426       XBT_DEBUG("Worker %d got a job", data->worker_id);
427
428       xbt_parmap_mc_work(parmap, data->worker_id);
429       parmap->worker_signal_f(parmap);
430
431       XBT_DEBUG("Worker %d has finished", data->worker_id);
432
433     /* We are destroying the parmap */
434     } else {
435       xbt_free(data);
436       return NULL;
437     }
438   }
439 }
440 #endif
441
442 #if HAVE_FUTEX_H
443 static void futex_wait(unsigned *uaddr, unsigned val)
444 {
445   XBT_VERB("Waiting on futex %p", uaddr);
446   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
447 }
448
449 static void futex_wake(unsigned *uaddr, unsigned val)
450 {
451   XBT_VERB("Waking futex %p", uaddr);
452   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
453 }
454 #endif
455
456 /**
457  * \brief Starts the parmap: waits for all workers to be ready and returns.
458  *
459  * This function is called by the controller thread.
460  *
461  * \param parmap a parmap
462  */
463 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
464 {
465   xbt_os_mutex_acquire(parmap->done_mutex);
466   if (parmap->thread_counter < parmap->num_workers) {
467     /* wait for all workers to be ready */
468     xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
469   }
470   xbt_os_mutex_release(parmap->done_mutex);
471 }
472
473 /**
474  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
475  *
476  * This function is called by all worker threads when they end (not including
477  * the controller).
478  *
479  * \param parmap a parmap
480  */
481 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
482 {
483   xbt_os_mutex_acquire(parmap->done_mutex);
484   if (++parmap->thread_counter == parmap->num_workers) {
485     /* all workers have finished, wake the controller */
486     xbt_os_cond_signal(parmap->done_cond);
487   }
488   xbt_os_mutex_release(parmap->done_mutex);
489 }
490
491 /**
492  * \brief Wakes all workers and waits for them to finish the tasks.
493  *
494  * This function is called by the controller thread.
495  *
496  * \param parmap a parmap
497  */
498 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
499 {
500   xbt_os_mutex_acquire(parmap->ready_mutex);
501   parmap->thread_counter = 1;
502   parmap->work++;
503   /* wake all workers */
504   xbt_os_cond_broadcast(parmap->ready_cond);
505   xbt_os_mutex_release(parmap->ready_mutex);
506 }
507
508 /**
509  * \brief Waits for some work to process.
510  *
511  * This function is called by each worker thread (not including the controller)
512  * when it has no more work to do.
513  *
514  * \param parmap a parmap
515  * \param round  the expected round number
516  */
517 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
518 {
519   xbt_os_mutex_acquire(parmap->ready_mutex);
520   /* wait for more work */
521   if (parmap->work != round) {
522     xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
523   }
524   xbt_os_mutex_release(parmap->ready_mutex);
525 }
526
527 #if HAVE_FUTEX_H
528 /**
529  * \brief Starts the parmap: waits for all workers to be ready and returns.
530  *
531  * This function is called by the controller thread.
532  *
533  * \param parmap a parmap
534  */
535 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
536 {
537   unsigned count = parmap->thread_counter;
538   while (count < parmap->num_workers) {
539     /* wait for all workers to be ready */
540     futex_wait(&parmap->thread_counter, count);
541     count = parmap->thread_counter;
542   }
543 }
544
545 /**
546  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
547  *
548  * This function is called by all worker threads when they end (not including
549  * the controller).
550  *
551  * \param parmap a parmap
552  */
553 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
554 {
555   unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
556   if (count == parmap->num_workers) {
557     /* all workers have finished, wake the controller */
558     futex_wake(&parmap->thread_counter, INT_MAX);
559   }
560 }
561
562 /**
563  * \brief Wakes all workers and waits for them to finish the tasks.
564  *
565  * This function is called by the controller thread.
566  *
567  * \param parmap a parmap
568  */
569 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
570 {
571   parmap->thread_counter = 1;
572   __sync_add_and_fetch(&parmap->work, 1);
573   /* wake all workers */
574   futex_wake(&parmap->work, INT_MAX);
575 }
576
577 /**
578  * \brief Waits for some work to process.
579  *
580  * This function is called by each worker thread (not including the controller)
581  * when it has no more work to do.
582  *
583  * \param parmap a parmap
584  * \param round  the expected round number
585  */
586 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
587 {
588   unsigned work = parmap->work;
589   /* wait for more work */
590   while (work != round) {
591     futex_wait(&parmap->work, work);
592     work = parmap->work;
593   }
594 }
595 #endif
596
597 #ifndef _MSC_VER
598 /**
599  * \brief Starts the parmap: waits for all workers to be ready and returns.
600  *
601  * This function is called by the controller thread.
602  *
603  * \param parmap a parmap
604  */
605 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
606 {
607   while (parmap->thread_counter < parmap->num_workers) {
608     xbt_os_thread_yield();
609   }
610 }
611
612 /**
613  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
614  *
615  * This function is called by all worker threads when they end.
616  *
617  * \param parmap a parmap
618  */
619 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
620 {
621   __sync_add_and_fetch(&parmap->thread_counter, 1);
622 }
623
624 /**
625  * \brief Wakes all workers and waits for them to finish the tasks.
626  *
627  * This function is called by the controller thread.
628  *
629  * \param parmap a parmap
630  */
631 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
632 {
633   parmap->thread_counter = 1;
634   __sync_add_and_fetch(&parmap->work, 1);
635 }
636
637 /**
638  * \brief Waits for some work to process.
639  *
640  * This function is called by each worker thread (not including the controller)
641  * when it has no more work to do.
642  *
643  * \param parmap a parmap
644  * \param round  the expected round number
645  */
646 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
647 {
648   /* wait for more work */
649   while (parmap->work != round) {
650     xbt_os_thread_yield();
651   }
652 }
653 #endif /* ! _MSC_VER */