Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
de1dc57c1a2ef32ece249115d1ccb7c78e4a977a
[simgrid.git] / src / xbt / parmap.cpp
1 /* Copyright (c) 2004-2005, 2007, 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <atomic>
8
9 #include "src/internal_config.h"
10 #if HAVE_UNISTD_H
11 #include <unistd.h>
12 #endif
13
14 #ifndef _WIN32
15 #include <sys/syscall.h>
16 #endif
17
18 #if HAVE_FUTEX_H
19 #include <linux/futex.h>
20 #include <limits.h>
21 #endif
22
23 #include "xbt/parmap.h"
24 #include "xbt/log.h"
25 #include "xbt/function_types.h"
26 #include "xbt/dynar.h"
27 #include "xbt/xbt_os_thread.h"
28 #include "xbt/sysdep.h"
29 #include "src/simix/smx_private.h"
30 #include "src/simix/smx_private.hpp"
31
32 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
33
34 typedef enum {
35   XBT_PARMAP_WORK,
36   XBT_PARMAP_DESTROY
37 } e_xbt_parmap_flag_t;
38
39 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
40 static void *xbt_parmap_worker_main(void *parmap);
41 static void xbt_parmap_work(xbt_parmap_t parmap);
42
43 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
44 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
46 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
47
48 #if HAVE_FUTEX_H
49 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
50 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
51 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
52 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
53 static void futex_wait(unsigned *uaddr, unsigned val);
54 static void futex_wake(unsigned *uaddr, unsigned val);
55 #endif
56
57 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
58 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
59 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
60 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
61
62 #if HAVE_MC
63 static void xbt_parmap_mc_work(xbt_parmap_t parmap, int worker_id);
64 static void *xbt_parmap_mc_worker_main(void *arg);
65 #endif
66
67 /**
68  * \brief Parallel map structure
69  */
70 typedef struct s_xbt_parmap {
71   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
72   unsigned work;                   /**< index of the current round */
73   unsigned thread_counter;         /**< number of workers that have done the work */
74
75   unsigned int num_workers;        /**< total number of worker threads including the controller */
76   xbt_os_thread_t *workers;        /**< worker thread handlers */
77   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
78   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
79   std::atomic<unsigned int> index; /**< index of the next element of data to pick */
80
81 #if HAVE_MC
82   int finish;
83   void* ref_snapshot;
84   int_f_pvoid_pvoid_t snapshot_compare;
85   unsigned int length;
86   void* mc_data;
87 #endif
88
89   /* posix only */
90   xbt_os_cond_t ready_cond;
91   xbt_os_mutex_t ready_mutex;
92   xbt_os_cond_t done_cond;
93   xbt_os_mutex_t done_mutex;
94
95   /* fields that depend on the synchronization mode */
96   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
97   void (*master_wait_f)(xbt_parmap_t);    /**< wait for the workers to have done the work */
98   void (*worker_signal_f)(xbt_parmap_t);  /**< signal the master that a worker has done the work */
99   void (*master_signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
100   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
101 } s_xbt_parmap_t;
102
103 /**
104  * \brief Thread data transmission structure
105  */
106 typedef struct s_xbt_parmap_thread_data{
107   xbt_parmap_t parmap;
108   int worker_id;
109 } s_xbt_parmap_thread_data_t;
110
111 typedef s_xbt_parmap_thread_data_t *xbt_parmap_thread_data_t;
112
113 /**
114  * \brief Creates a parallel map object
115  * \param num_workers number of worker threads to create
116  * \param mode how to synchronize the worker threads
117  * \return the parmap created
118  */
119 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
120 {
121   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
122
123   /* Initialize the thread pool data structure */
124   xbt_parmap_t parmap = new s_xbt_parmap_t();
125   parmap->workers = xbt_new(xbt_os_thread_t, num_workers);
126
127   parmap->num_workers = num_workers;
128   parmap->status = XBT_PARMAP_WORK;
129   xbt_parmap_set_mode(parmap, mode);
130
131   /* Create the pool of worker threads */
132   xbt_parmap_thread_data_t data;
133   parmap->workers[0] = NULL;
134 #if HAVE_PTHREAD_SETAFFINITY
135   int core_bind = 0;
136 #endif  
137   for (unsigned int i = 1; i < num_workers; i++) {
138     data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
139     data->parmap = parmap;
140     data->worker_id = i;
141     parmap->workers[i] = xbt_os_thread_create(NULL, xbt_parmap_worker_main, data, NULL);
142 #if HAVE_PTHREAD_SETAFFINITY
143     xbt_os_thread_bind(parmap->workers[i], core_bind); 
144     if (core_bind != xbt_os_get_numcores())
145       core_bind++;
146     else
147       core_bind = 0; 
148 #endif
149   }
150   return parmap;
151 }
152
153 #if HAVE_MC
154 /**
155  * \brief Creates a parallel map object
156  * \param num_workers number of worker threads to create
157  * \param mode how to synchronize the worker threads
158  * \return the parmap created
159  */
160 xbt_parmap_t xbt_parmap_mc_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
161 {
162   unsigned int i;
163
164   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
165
166   /* Initialize the thread pool data structure */
167   xbt_parmap_t parmap = new s_xbt_parmap_t();
168   parmap->workers = xbt_new(xbt_os_thread_t, num_workers);
169
170   parmap->num_workers = num_workers;
171   parmap->status = XBT_PARMAP_WORK;
172   xbt_parmap_set_mode(parmap, mode);
173
174   /* Create the pool of worker threads */
175   xbt_parmap_thread_data_t data;
176   parmap->workers[0] = NULL;
177   for (i = 1; i < num_workers; i++) {
178     data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
179     data->parmap = parmap;
180     data->worker_id = i;
181     parmap->workers[i] = xbt_os_thread_create(NULL, xbt_parmap_mc_worker_main, data, NULL);
182   }
183   return parmap;
184 }
185 #endif
186
187 /**
188  * \brief Destroys a parmap
189  * \param parmap the parmap to destroy
190  */
191 void xbt_parmap_destroy(xbt_parmap_t parmap)
192 {
193   if (!parmap) {
194     return;
195   }
196
197   parmap->status = XBT_PARMAP_DESTROY;
198   parmap->master_signal_f(parmap);
199
200   unsigned int i;
201   for (i = 1; i < parmap->num_workers; i++)
202     xbt_os_thread_join(parmap->workers[i], NULL);
203
204   xbt_os_cond_destroy(parmap->ready_cond);
205   xbt_os_mutex_destroy(parmap->ready_mutex);
206   xbt_os_cond_destroy(parmap->done_cond);
207   xbt_os_mutex_destroy(parmap->done_mutex);
208
209   xbt_free(parmap->workers);
210   delete parmap;
211 }
212
213 /**
214  * \brief Sets the synchronization mode of a parmap.
215  * \param parmap a parallel map object
216  * \param mode the synchronization mode
217  */
218 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
219 {
220   if (mode == XBT_PARMAP_DEFAULT) {
221 #if HAVE_FUTEX_H
222     mode = XBT_PARMAP_FUTEX;
223 #else
224     mode = XBT_PARMAP_POSIX;
225 #endif
226   }
227   parmap->mode = mode;
228
229   switch (mode) {
230     case XBT_PARMAP_POSIX:
231       parmap->master_wait_f = xbt_parmap_posix_master_wait;
232       parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
233       parmap->master_signal_f = xbt_parmap_posix_master_signal;
234       parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
235
236       parmap->ready_cond = xbt_os_cond_init();
237       parmap->ready_mutex = xbt_os_mutex_init();
238       parmap->done_cond = xbt_os_cond_init();
239       parmap->done_mutex = xbt_os_mutex_init();
240       break;
241     case XBT_PARMAP_FUTEX:
242 #if HAVE_FUTEX_H
243       parmap->master_wait_f = xbt_parmap_futex_master_wait;
244       parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
245       parmap->master_signal_f = xbt_parmap_futex_master_signal;
246       parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
247
248       xbt_os_cond_destroy(parmap->ready_cond);
249       xbt_os_mutex_destroy(parmap->ready_mutex);
250       xbt_os_cond_destroy(parmap->done_cond);
251       xbt_os_mutex_destroy(parmap->done_mutex);
252       break;
253 #else
254       xbt_die("Futex is not available on this OS.");
255 #endif
256     case XBT_PARMAP_BUSY_WAIT:
257       parmap->master_wait_f = xbt_parmap_busy_master_wait;
258       parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
259       parmap->master_signal_f = xbt_parmap_busy_master_signal;
260       parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
261
262       xbt_os_cond_destroy(parmap->ready_cond);
263       xbt_os_mutex_destroy(parmap->ready_mutex);
264       xbt_os_cond_destroy(parmap->done_cond);
265       xbt_os_mutex_destroy(parmap->done_mutex);
266       break;
267     case XBT_PARMAP_DEFAULT:
268       THROW_IMPOSSIBLE;
269       break;
270   }
271 }
272
273 /**
274  * \brief Applies a list of tasks in parallel.
275  * \param parmap a parallel map object
276  * \param fun the function to call in parallel
277  * \param data each element of this dynar will be passed as an argument to fun
278  */
279 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
280 {
281   /* Assign resources to worker threads (we are maestro here)*/
282   parmap->fun = fun;
283   parmap->data = data;
284   parmap->index = 0;
285   parmap->master_signal_f(parmap); // maestro runs futex_wait to wake all the minions (the working threads)
286   xbt_parmap_work(parmap);         // maestro works with its minions
287   parmap->master_wait_f(parmap);   // When there is no more work to do, then maestro waits for the last minion to stop
288   XBT_DEBUG("Job done");           //   ... and proceeds
289 }
290
291 /**
292  * \brief Returns a next task to process.
293  *
294  * Worker threads call this function to get more work.
295  *
296  * \return the next task to process, or NULL if there is no more work
297  */
298 void* xbt_parmap_next(xbt_parmap_t parmap)
299 {
300   unsigned int index = parmap->index++;
301   if (index < xbt_dynar_length(parmap->data)) {
302     return xbt_dynar_get_as(parmap->data, index, void*);
303   }
304   return NULL;
305 }
306
307 static void xbt_parmap_work(xbt_parmap_t parmap)
308 {
309   unsigned index;
310   while ((index = parmap->index++) < xbt_dynar_length(parmap->data))
311     parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
312 }
313
314 /**
315  * \brief Main function of a worker thread.
316  * \param arg the parmap
317  */
318 static void *xbt_parmap_worker_main(void *arg)
319 {
320   xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
321   xbt_parmap_t parmap = data->parmap;
322   unsigned round = 0;
323   smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL);
324   SIMIX_context_set_current(context);
325
326   XBT_DEBUG("New worker thread created");
327
328   /* Worker's main loop */
329   while (1) {
330     parmap->worker_wait_f(parmap, ++round);
331     if (parmap->status == XBT_PARMAP_WORK) {
332       XBT_DEBUG("Worker %d got a job", data->worker_id);
333
334       xbt_parmap_work(parmap);
335       parmap->worker_signal_f(parmap);
336
337       XBT_DEBUG("Worker %d has finished", data->worker_id);
338     /* We are destroying the parmap */
339     } else {
340       SIMIX_context_free(context);
341       xbt_free(data);
342       return NULL;
343     }
344   }
345 }
346
347 #if HAVE_MC
348 /**
349  * \brief Applies a list of tasks in parallel.
350  * \param parmap a parallel map object
351  * \param fun the function to call in parallel
352  * \param data each element of this dynar will be passed as an argument to fun
353  */
354 int xbt_parmap_mc_apply(xbt_parmap_t parmap, int_f_pvoid_pvoid_t fun, void* data, unsigned int length,
355                         void* ref_snapshot)
356 {
357   /* Assign resources to worker threads */
358   parmap->snapshot_compare = fun;
359   parmap->mc_data = data;
360   parmap->index = 0;
361   parmap->finish = -1;
362   parmap->length = length;
363   parmap->ref_snapshot = ref_snapshot;
364   parmap->master_signal_f(parmap);
365   xbt_parmap_mc_work(parmap, 0);
366   parmap->master_wait_f(parmap);
367   XBT_DEBUG("Job done");
368   return parmap->finish;
369 }
370
371 static void xbt_parmap_mc_work(xbt_parmap_t parmap, int worker_id)
372 {
373   unsigned int data_size = (parmap->length / parmap->num_workers) + ((parmap->length % parmap->num_workers) ? 1 :0);
374   void* start = (char*)parmap->mc_data + (data_size*worker_id*sizeof(void*));
375   void* end = MIN((char *)start + data_size* sizeof(void*), (char*)parmap->mc_data + parmap->length*sizeof(void*));
376
377   //XBT_CRITICAL("Worker %d : %p -> %p (%d)", worker_id, start, end, data_size);
378
379   while ( start < end && parmap->finish == -1) {
380     //XBT_CRITICAL("Starting with %p", start);
381     int res = parmap->snapshot_compare(*(void**)start, parmap->ref_snapshot);
382     start = (char *)start + sizeof(start);
383     if (!res){
384       parmap->finish = ((char*)start - (char*)parmap->mc_data) / sizeof(void*);
385       //XBT_CRITICAL("Find good one %p (%p)", start, parmap->mc_data);
386       break;
387     }
388   }
389 }
390
391 /**
392  * \brief Main function of a worker thread.
393  * \param arg the parmap
394  */
395 static void *xbt_parmap_mc_worker_main(void *arg)
396 {
397   xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
398   xbt_parmap_t parmap = data->parmap;
399   unsigned round = 0;
400   /* smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL); */
401   /* SIMIX_context_set_current(context); */
402
403   XBT_DEBUG("New worker thread created");
404
405   /* Worker's main loop */
406   while (1) {
407     parmap->worker_wait_f(parmap, ++round);
408     if (parmap->status == XBT_PARMAP_WORK) {
409       XBT_DEBUG("Worker %d got a job", data->worker_id);
410
411       xbt_parmap_mc_work(parmap, data->worker_id);
412       parmap->worker_signal_f(parmap);
413
414       XBT_DEBUG("Worker %d has finished", data->worker_id);
415     /* We are destroying the parmap */
416     } else {
417       xbt_free(data);
418       return NULL;
419     }
420   }
421 }
422 #endif
423
424 #if HAVE_FUTEX_H
425 static void futex_wait(unsigned *uaddr, unsigned val)
426 {
427   XBT_VERB("Waiting on futex %p", uaddr);
428   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
429 }
430
431 static void futex_wake(unsigned *uaddr, unsigned val)
432 {
433   XBT_VERB("Waking futex %p", uaddr);
434   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
435 }
436 #endif
437
438 /**
439  * \brief Starts the parmap: waits for all workers to be ready and returns.
440  *
441  * This function is called by the controller thread.
442  *
443  * \param parmap a parmap
444  */
445 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
446 {
447   xbt_os_mutex_acquire(parmap->done_mutex);
448   if (parmap->thread_counter < parmap->num_workers) {
449     /* wait for all workers to be ready */
450     xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
451   }
452   xbt_os_mutex_release(parmap->done_mutex);
453 }
454
455 /**
456  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
457  *
458  * This function is called by all worker threads when they end (not including the controller).
459  *
460  * \param parmap a parmap
461  */
462 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
463 {
464   xbt_os_mutex_acquire(parmap->done_mutex);
465   if (++parmap->thread_counter == parmap->num_workers) {
466     /* all workers have finished, wake the controller */
467     xbt_os_cond_signal(parmap->done_cond);
468   }
469   xbt_os_mutex_release(parmap->done_mutex);
470 }
471
472 /**
473  * \brief Wakes all workers and waits for them to finish the tasks.
474  *
475  * This function is called by the controller thread.
476  *
477  * \param parmap a parmap
478  */
479 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
480 {
481   xbt_os_mutex_acquire(parmap->ready_mutex);
482   parmap->thread_counter = 1;
483   parmap->work++;
484   /* wake all workers */
485   xbt_os_cond_broadcast(parmap->ready_cond);
486   xbt_os_mutex_release(parmap->ready_mutex);
487 }
488
489 /**
490  * \brief Waits for some work to process.
491  *
492  * This function is called by each worker thread (not including the controller) when it has no more work to do.
493  *
494  * \param parmap a parmap
495  * \param round  the expected round number
496  */
497 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
498 {
499   xbt_os_mutex_acquire(parmap->ready_mutex);
500   /* wait for more work */
501   if (parmap->work != round) {
502     xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
503   }
504   xbt_os_mutex_release(parmap->ready_mutex);
505 }
506
507 #if HAVE_FUTEX_H
508 /**
509  * \brief Starts the parmap: waits for all workers to be ready and returns.
510  *
511  * This function is called by the controller thread.
512  *
513  * \param parmap a parmap
514  */
515 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
516 {
517   unsigned count = parmap->thread_counter;
518   while (count < parmap->num_workers) {
519     /* wait for all workers to be ready */
520     futex_wait(&parmap->thread_counter, count);
521     count = parmap->thread_counter;
522   }
523 }
524
525 /**
526  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
527  *
528  * This function is called by all worker threads when they end (not including the controller).
529  *
530  * \param parmap a parmap
531  */
532 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
533 {
534   unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
535   if (count == parmap->num_workers) {
536     /* all workers have finished, wake the controller */
537     futex_wake(&parmap->thread_counter, INT_MAX);
538   }
539 }
540
541 /**
542  * \brief Wakes all workers and waits for them to finish the tasks.
543  *
544  * This function is called by the controller thread.
545  *
546  * \param parmap a parmap
547  */
548 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
549 {
550   parmap->thread_counter = 1;
551   __sync_add_and_fetch(&parmap->work, 1);
552   /* wake all workers */
553   futex_wake(&parmap->work, INT_MAX);
554 }
555
556 /**
557  * \brief Waits for some work to process.
558  *
559  * This function is called by each worker thread (not including the controller) when it has no more work to do.
560  *
561  * \param parmap a parmap
562  * \param round  the expected round number
563  */
564 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
565 {
566   unsigned work = parmap->work;
567   /* wait for more work */
568   while (work != round) {
569     futex_wait(&parmap->work, work);
570     work = parmap->work;
571   }
572 }
573 #endif
574
575 /**
576  * \brief Starts the parmap: waits for all workers to be ready and returns.
577  *
578  * This function is called by the controller thread.
579  *
580  * \param parmap a parmap
581  */
582 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
583 {
584   while (parmap->thread_counter < parmap->num_workers) {
585     xbt_os_thread_yield();
586   }
587 }
588
589 /**
590  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
591  *
592  * This function is called by all worker threads when they end.
593  *
594  * \param parmap a parmap
595  */
596 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
597 {
598   __sync_add_and_fetch(&parmap->thread_counter, 1);
599 }
600
601 /**
602  * \brief Wakes all workers and waits for them to finish the tasks.
603  *
604  * This function is called by the controller thread.
605  *
606  * \param parmap a parmap
607  */
608 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
609 {
610   parmap->thread_counter = 1;
611   __sync_add_and_fetch(&parmap->work, 1);
612 }
613
614 /**
615  * \brief Waits for some work to process.
616  *
617  * This function is called by each worker thread (not including the controller) when it has no more work to do.
618  *
619  * \param parmap a parmap
620  * \param round  the expected round number
621  */
622 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
623 {
624   /* wait for more work */
625   while (parmap->work != round) {
626     xbt_os_thread_yield();
627   }
628 }