Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Parmap: use INT_MAX as number of threads to wake by futex_wake.
[simgrid.git] / src / xbt / parmap.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "gras_config.h"
7 #include <unistd.h>
8
9 #ifndef _XBT_WIN32
10 #include <sys/syscall.h>
11 #endif
12
13 #ifdef HAVE_FUTEX_H
14 #include <linux/futex.h>
15 #include <limits.h>
16 #endif
17
18 #include "xbt/parmap.h"
19 #include "xbt/log.h"
20 #include "xbt/function_types.h"
21 #include "xbt/dynar.h"
22 #include "xbt/xbt_os_thread.h"
23 #include "xbt/sysdep.h"
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
26 XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing");
27
28 typedef enum {
29   XBT_PARMAP_WORK,
30   XBT_PARMAP_DESTROY
31 } e_xbt_parmap_flag_t;
32
33 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
34 static void *xbt_parmap_worker_main(void *parmap);
35 static void xbt_parmap_work(xbt_parmap_t parmap);
36
37 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
38 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
39 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
40 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
41
42 #ifdef HAVE_FUTEX_H
43 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
44 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
46 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
47 static void futex_wait(unsigned *uaddr, unsigned val);
48 static void futex_wake(unsigned *uaddr, unsigned val);
49 #endif
50
51 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
52 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
53 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
54 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
55
56 /**
57  * \brief Parallel map structure
58  */
59 typedef struct s_xbt_parmap {
60   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
61   unsigned work;                   /**< index of the current round */
62   unsigned thread_counter;         /**< number of workers that have done the work */
63
64   unsigned int num_workers;        /**< total number of worker threads including the controller */
65   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
66   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
67   unsigned int index;              /**< index of the next element of data to pick */
68
69   /* posix only */
70   xbt_os_cond_t ready_cond;
71   xbt_os_mutex_t ready_mutex;
72   xbt_os_cond_t done_cond;
73   xbt_os_mutex_t done_mutex;
74
75   /* fields that depend on the synchronization mode */
76   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
77   void (*master_wait_f)(xbt_parmap_t);    /**< wait for the workers to have done the work */
78   void (*worker_signal_f)(xbt_parmap_t);  /**< signal the master that a worker has done the work */
79   void (*master_signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
80   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
81 } s_xbt_parmap_t;
82
83 /**
84  * \brief Creates a parallel map object
85  * \param num_workers number of worker threads to create
86  * \param mode how to synchronize the worker threads
87  * \return the parmap created
88  */
89 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
90 {
91   unsigned int i;
92   xbt_os_thread_t worker = NULL;
93
94   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
95
96   /* Initialize the thread pool data structure */
97   xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
98
99   parmap->num_workers = num_workers;
100   parmap->status = XBT_PARMAP_WORK;
101   xbt_parmap_set_mode(parmap, mode);
102
103   /* Create the pool of worker threads */
104   for (i = 1; i < num_workers; i++) {
105     worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
106     xbt_os_thread_detach(worker);
107   }
108   return parmap;
109 }
110
111 /**
112  * \brief Destroys a parmap
113  * \param parmap the parmap to destroy
114  */
115 void xbt_parmap_destroy(xbt_parmap_t parmap)
116 {
117   if (!parmap) {
118     return;
119   }
120
121   parmap->status = XBT_PARMAP_DESTROY;
122   parmap->master_signal_f(parmap);
123   parmap->master_wait_f(parmap);
124
125   xbt_os_cond_destroy(parmap->ready_cond);
126   xbt_os_mutex_destroy(parmap->ready_mutex);
127   xbt_os_cond_destroy(parmap->done_cond);
128   xbt_os_mutex_destroy(parmap->done_mutex);
129
130   xbt_free(parmap);
131 }
132
133 /**
134  * \brief Sets the synchronization mode of a parmap.
135  * \param parmap a parallel map object
136  * \param mode the synchronization mode
137  */
138 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
139 {
140   if (mode == XBT_PARMAP_DEFAULT) {
141 #ifdef HAVE_FUTEX_H
142     mode = XBT_PARMAP_FUTEX;
143 #else
144     mode = XBT_PARMAP_POSIX;
145 #endif
146   }
147   parmap->mode = mode;
148
149   switch (mode) {
150
151     case XBT_PARMAP_POSIX:
152       parmap->master_wait_f = xbt_parmap_posix_master_wait;
153       parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
154       parmap->master_signal_f = xbt_parmap_posix_master_signal;
155       parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
156
157       parmap->ready_cond = xbt_os_cond_init();
158       parmap->ready_mutex = xbt_os_mutex_init();
159       parmap->done_cond = xbt_os_cond_init();
160       parmap->done_mutex = xbt_os_mutex_init();
161       break;
162
163
164     case XBT_PARMAP_FUTEX:
165 #ifdef HAVE_FUTEX_H
166       parmap->master_wait_f = xbt_parmap_futex_master_wait;
167       parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
168       parmap->master_signal_f = xbt_parmap_futex_master_signal;
169       parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
170
171       xbt_os_cond_destroy(parmap->ready_cond);
172       xbt_os_mutex_destroy(parmap->ready_mutex);
173       xbt_os_cond_destroy(parmap->done_cond);
174       xbt_os_mutex_destroy(parmap->done_mutex);
175       break;
176 #else
177       xbt_die("Futex is not available on this OS.");
178 #endif
179
180     case XBT_PARMAP_BUSY_WAIT:
181       parmap->master_wait_f = xbt_parmap_busy_master_wait;
182       parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
183       parmap->master_signal_f = xbt_parmap_busy_master_signal;
184       parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
185
186       xbt_os_cond_destroy(parmap->ready_cond);
187       xbt_os_mutex_destroy(parmap->ready_mutex);
188       xbt_os_cond_destroy(parmap->done_cond);
189       xbt_os_mutex_destroy(parmap->done_mutex);
190       break;
191
192     case XBT_PARMAP_DEFAULT:
193       THROW_IMPOSSIBLE;
194       break;
195   }
196 }
197
198 /**
199  * \brief Applies a list of tasks in parallel.
200  * \param parmap a parallel map object
201  * \param fun the function to call in parallel
202  * \param data each element of this dynar will be passed as an argument to fun
203  */
204 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
205 {
206   /* Assign resources to worker threads */
207   parmap->fun = fun;
208   parmap->data = data;
209   parmap->index = 0;
210   parmap->master_signal_f(parmap);
211   xbt_parmap_work(parmap);
212   parmap->master_wait_f(parmap);
213   XBT_DEBUG("Job done");
214 }
215
216 /**
217  * \brief Returns a next task to process.
218  *
219  * Worker threads call this function to get more work.
220  *
221  * \return the next task to process, or NULL if there is no more work
222  */
223 void* xbt_parmap_next(xbt_parmap_t parmap)
224 {
225   unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
226   if (index < xbt_dynar_length(parmap->data)) {
227     return xbt_dynar_get_as(parmap->data, index, void*);
228   }
229   return NULL;
230 }
231
232 static void xbt_parmap_work(xbt_parmap_t parmap)
233 {
234   unsigned index;
235   while ((index = __sync_fetch_and_add(&parmap->index, 1))
236          < xbt_dynar_length(parmap->data))
237     parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
238 }
239
240 /**
241  * \brief Main function of a worker thread.
242  * \param arg the parmap
243  */
244 static void *xbt_parmap_worker_main(void *arg)
245 {
246   xbt_parmap_t parmap = (xbt_parmap_t) arg;
247   unsigned round = 0;
248
249   XBT_DEBUG("New worker thread created");
250
251   /* Worker's main loop */
252   while (1) {
253     parmap->worker_wait_f(parmap, ++round);
254     if (parmap->status == XBT_PARMAP_WORK) {
255
256       XBT_DEBUG("Worker got a job");
257
258       xbt_parmap_work(parmap);
259       parmap->worker_signal_f(parmap);
260
261       XBT_DEBUG("Worker has finished");
262
263     /* We are destroying the parmap */
264     } else {
265       parmap->worker_signal_f(parmap);
266       return NULL;
267     }
268   }
269 }
270
271 #ifdef HAVE_FUTEX_H
272 static void futex_wait(unsigned *uaddr, unsigned val)
273 {
274   XBT_VERB("Waiting on futex %p", uaddr);
275   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
276 }
277
278 static void futex_wake(unsigned *uaddr, unsigned val)
279 {
280   XBT_VERB("Waking futex %p", uaddr);
281   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
282 }
283 #endif
284
285 /**
286  * \brief Starts the parmap: waits for all workers to be ready and returns.
287  *
288  * This function is called by the controller thread.
289  *
290  * \param parmap a parmap
291  */
292 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
293 {
294   xbt_os_mutex_acquire(parmap->done_mutex);
295   if (parmap->thread_counter < parmap->num_workers) {
296     /* wait for all workers to be ready */
297     xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
298   }
299   xbt_os_mutex_release(parmap->done_mutex);
300 }
301
302 /**
303  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
304  *
305  * This function is called by all worker threads when they end (not including
306  * the controller).
307  *
308  * \param parmap a parmap
309  */
310 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
311 {
312   xbt_os_mutex_acquire(parmap->done_mutex);
313   if (++parmap->thread_counter == parmap->num_workers) {
314     /* all workers have finished, wake the controller */
315     xbt_os_cond_signal(parmap->done_cond);
316   }
317   xbt_os_mutex_release(parmap->done_mutex);
318 }
319
320 /**
321  * \brief Wakes all workers and waits for them to finish the tasks.
322  *
323  * This function is called by the controller thread.
324  *
325  * \param parmap a parmap
326  */
327 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
328 {
329   xbt_os_mutex_acquire(parmap->ready_mutex);
330   parmap->thread_counter = 1;
331   parmap->work++;
332   /* wake all workers */
333   xbt_os_cond_broadcast(parmap->ready_cond);
334   xbt_os_mutex_release(parmap->ready_mutex);
335 }
336
337 /**
338  * \brief Waits for some work to process.
339  *
340  * This function is called by each worker thread (not including the controller)
341  * when it has no more work to do.
342  *
343  * \param parmap a parmap
344  * \param round  the expected round number
345  */
346 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
347 {
348   xbt_os_mutex_acquire(parmap->ready_mutex);
349   /* wait for more work */
350   if (parmap->work < round) {
351     xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
352   }
353   xbt_os_mutex_release(parmap->ready_mutex);
354 }
355
356 #ifdef HAVE_FUTEX_H
357 /**
358  * \brief Starts the parmap: waits for all workers to be ready and returns.
359  *
360  * This function is called by the controller thread.
361  *
362  * \param parmap a parmap
363  */
364 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
365 {
366   unsigned count = parmap->thread_counter;
367   while (count < parmap->num_workers) {
368     /* wait for all workers to be ready */
369     futex_wait(&parmap->thread_counter, count);
370     count = parmap->thread_counter;
371   }
372 }
373
374 /**
375  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
376  *
377  * This function is called by all worker threads when they end (not including
378  * the controller).
379  *
380  * \param parmap a parmap
381  */
382 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
383 {
384   unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
385   if (count == parmap->num_workers) {
386     /* all workers have finished, wake the controller */
387     futex_wake(&parmap->thread_counter, INT_MAX);
388   }
389 }
390
391 /**
392  * \brief Wakes all workers and waits for them to finish the tasks.
393  *
394  * This function is called by the controller thread.
395  *
396  * \param parmap a parmap
397  */
398 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
399 {
400   parmap->thread_counter = 1;
401   __sync_add_and_fetch(&parmap->work, 1);
402   /* wake all workers */
403   futex_wake(&parmap->work, INT_MAX);
404 }
405
406 /**
407  * \brief Waits for some work to process.
408  *
409  * This function is called by each worker thread (not including the controller)
410  * when it has no more work to do.
411  *
412  * \param parmap a parmap
413  * \param round  the expected round number
414  */
415 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
416 {
417   unsigned work = parmap->work;
418   /* wait for more work */
419   if (work < round)
420     futex_wait(&parmap->work, work);
421 }
422 #endif
423
424 /**
425  * \brief Starts the parmap: waits for all workers to be ready and returns.
426  *
427  * This function is called by the controller thread.
428  *
429  * \param parmap a parmap
430  */
431 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
432 {
433   while (parmap->thread_counter < parmap->num_workers) {
434     xbt_os_thread_yield();
435   }
436 }
437
438 /**
439  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
440  *
441  * This function is called by all worker threads when they end.
442  *
443  * \param parmap a parmap
444  */
445 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
446 {
447   __sync_add_and_fetch(&parmap->thread_counter, 1);
448 }
449
450 /**
451  * \brief Wakes all workers and waits for them to finish the tasks.
452  *
453  * This function is called by the controller thread.
454  *
455  * \param parmap a parmap
456  */
457 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
458 {
459   parmap->thread_counter = 1;
460   __sync_add_and_fetch(&parmap->work, 1);
461 }
462
463 /**
464  * \brief Waits for some work to process.
465  *
466  * This function is called by each worker thread (not including the controller)
467  * when it has no more work to do.
468  *
469  * \param parmap a parmap
470  * \param round  the expected round number
471  */
472 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
473 {
474   /* wait for more work */
475   while (parmap->work < round) {
476     xbt_os_thread_yield();
477   }
478 }
479
480 #ifdef SIMGRID_TEST
481 #include "xbt.h"
482 #include "xbt/ex.h"
483 #include "xbt/xbt_os_thread.h"
484 #include "xbt/xbt_os_time.h"
485 #include "gras_config.h"        /* HAVE_FUTEX_H */
486
487 XBT_TEST_SUITE("parmap", "Parallel Map");
488 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit);
489
490 #ifdef HAVE_FUTEX_H
491 #define TEST_PARMAP_SKIP_TEST(mode) 0
492 #else
493 #define TEST_PARMAP_SKIP_TEST(mode) ((mode) == XBT_PARMAP_FUTEX)
494 #endif
495
496 #define TEST_PARMAP_VALIDATE_MODE(mode) \
497   if (TEST_PARMAP_SKIP_TEST(mode)) { xbt_test_skip(); return; } else ((void)0)
498
499 static void fun_double(void *arg)
500 {
501   unsigned *u = arg;
502   *u = 2 * *u + 1;
503 }
504
505 /* Check that the computations are correctly done. */
506 static void test_parmap_basic(e_xbt_parmap_mode_t mode)
507 {
508   unsigned num_workers;
509
510   for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
511     const unsigned len = 1033;
512     const unsigned num = 5;
513     unsigned *a;
514     xbt_dynar_t data;
515     xbt_parmap_t parmap;
516     unsigned i;
517
518     xbt_test_add("Basic parmap usage (%u workers)", num_workers);
519
520     TEST_PARMAP_VALIDATE_MODE(mode);
521     parmap = xbt_parmap_new(num_workers, mode);
522
523     a = xbt_malloc(len * sizeof *a);
524     data = xbt_dynar_new(sizeof a, NULL);
525     for (i = 0; i < len; i++) {
526       a[i] = i;
527       xbt_dynar_push_as(data, void *, &a[i]);
528     }
529
530     for (i = 0; i < num; i++)
531       xbt_parmap_apply(parmap, fun_double, data);
532
533     for (i = 0; i < len; i++) {
534       unsigned expected = (1U << num) * (i + 1) - 1;
535       xbt_test_assert(a[i] == expected,
536                       "a[%u]: expected %u, got %u", i, expected, a[i]);
537     }
538
539     xbt_dynar_free(&data);
540     xbt_free(a);
541     xbt_parmap_destroy(parmap);
542   }
543 }
544
545 XBT_TEST_UNIT("basic_posix", test_parmap_basic_posix, "Basic usage: posix")
546 {
547   test_parmap_basic(XBT_PARMAP_POSIX);
548 }
549
550 XBT_TEST_UNIT("basic_futex", test_parmap_basic_futex, "Basic usage: futex")
551 {
552   test_parmap_basic(XBT_PARMAP_FUTEX);
553 }
554
555 XBT_TEST_UNIT("basic_busy_wait", test_parmap_basic_busy_wait, "Basic usage: busy_wait")
556 {
557   test_parmap_basic(XBT_PARMAP_BUSY_WAIT);
558 }
559
560 static void fun_get_id(void *arg)
561 {
562   *(uintptr_t *)arg = (uintptr_t)xbt_os_thread_self();
563   xbt_os_sleep(0.5);
564 }
565
566 static int fun_compare(const void *pa, const void *pb)
567 {
568   uintptr_t a = *(uintptr_t *)pa;
569   uintptr_t b = *(uintptr_t *)pb;
570   return a < b ? -1 : a > b ? 1 : 0;
571 }
572
573 /* Check that all threads are working. */
574 static void test_parmap_extended(e_xbt_parmap_mode_t mode)
575 {
576   unsigned num_workers;
577
578   for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
579     const unsigned len = 2 * num_workers;
580     uintptr_t *a;
581     xbt_parmap_t parmap;
582     xbt_dynar_t data;
583     unsigned i;
584     unsigned count;
585
586     xbt_test_add("Extended parmap usage (%u workers)", num_workers);
587
588     TEST_PARMAP_VALIDATE_MODE(mode);
589     parmap = xbt_parmap_new(num_workers, mode);
590
591     a = xbt_malloc(len * sizeof *a);
592     data = xbt_dynar_new(sizeof a, NULL);
593     for (i = 0; i < len; i++)
594       xbt_dynar_push_as(data, void *, &a[i]);
595
596     xbt_parmap_apply(parmap, fun_get_id, data);
597
598     qsort(a, len, sizeof a[0], fun_compare);
599     count = 1;
600     for (i = 1; i < len; i++)
601       if (a[i] != a[i - 1])
602         count++;
603     xbt_test_assert(count == num_workers,
604                     "only %u/%u threads did some work", count, num_workers);
605
606     xbt_dynar_free(&data);
607     xbt_free(a);
608     xbt_parmap_destroy(parmap);
609   }
610 }
611
612 XBT_TEST_UNIT("extended_posix", test_parmap_extended_posix, "Extended usage: posix")
613 {
614   test_parmap_extended(XBT_PARMAP_POSIX);
615 }
616
617 XBT_TEST_UNIT("extended_futex", test_parmap_extended_futex, "Extended usage: futex")
618 {
619   test_parmap_extended(XBT_PARMAP_FUTEX);
620 }
621
622 XBT_TEST_UNIT("extended_busy_wait", test_parmap_extended_busy_wait, "Extended usage: busy_wait")
623 {
624   test_parmap_extended(XBT_PARMAP_BUSY_WAIT);
625 }
626
627 #endif /* SIMGRID_TEST */