Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Fix compilation error on windows.
[simgrid.git] / src / xbt / parmap.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "gras_config.h"
7 #include <unistd.h>
8
9 #ifndef _XBT_WIN32
10 #include <sys/syscall.h>
11 #endif
12
13 #ifdef HAVE_FUTEX_H
14 #include <linux/futex.h>
15 #include <limits.h>
16 #endif
17
18 #include "xbt/parmap.h"
19 #include "xbt/log.h"
20 #include "xbt/function_types.h"
21 #include "xbt/dynar.h"
22 #include "xbt/xbt_os_thread.h"
23 #include "xbt/sysdep.h"
24
25 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
26 XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing");
27
28 typedef enum {
29   XBT_PARMAP_WORK,
30   XBT_PARMAP_DESTROY
31 } e_xbt_parmap_flag_t;
32
33 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
34 static void *xbt_parmap_worker_main(void *parmap);
35 static void xbt_parmap_work(xbt_parmap_t parmap);
36
37 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
38 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
39 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
40 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
41
42 #ifdef HAVE_FUTEX_H
43 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
44 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
46 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
47 static void futex_wait(unsigned *uaddr, unsigned val);
48 static void futex_wake(unsigned *uaddr, unsigned val);
49 #endif
50
51 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
52 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
53 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
54 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
55
56 /**
57  * \brief Parallel map structure
58  */
59 typedef struct s_xbt_parmap {
60   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
61   unsigned work;                   /**< index of the current round */
62   unsigned thread_counter;         /**< number of workers that have done the work */
63
64   unsigned int num_workers;        /**< total number of worker threads including the controller */
65   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
66   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
67   unsigned int index;              /**< index of the next element of data to pick */
68
69   /* posix only */
70   xbt_os_cond_t ready_cond;
71   xbt_os_mutex_t ready_mutex;
72   xbt_os_cond_t done_cond;
73   xbt_os_mutex_t done_mutex;
74
75   /* fields that depend on the synchronization mode */
76   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
77   void (*master_wait_f)(xbt_parmap_t);    /**< wait for the workers to have done the work */
78   void (*worker_signal_f)(xbt_parmap_t);  /**< signal the master that a worker has done the work */
79   void (*master_signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
80   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
81 } s_xbt_parmap_t;
82
83 /**
84  * \brief Creates a parallel map object
85  * \param num_workers number of worker threads to create
86  * \param mode how to synchronize the worker threads
87  * \return the parmap created
88  */
89 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
90 {
91   unsigned int i;
92   xbt_os_thread_t worker = NULL;
93
94   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
95
96   /* Initialize the thread pool data structure */
97   xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
98
99   parmap->num_workers = num_workers;
100   parmap->status = XBT_PARMAP_WORK;
101   xbt_parmap_set_mode(parmap, mode);
102
103   /* Create the pool of worker threads */
104   for (i = 1; i < num_workers; i++) {
105     worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
106     xbt_os_thread_detach(worker);
107   }
108   return parmap;
109 }
110
111 /**
112  * \brief Destroys a parmap
113  * \param parmap the parmap to destroy
114  */
115 void xbt_parmap_destroy(xbt_parmap_t parmap)
116 {
117   if (!parmap) {
118     return;
119   }
120
121   parmap->status = XBT_PARMAP_DESTROY;
122   parmap->master_signal_f(parmap);
123   parmap->master_wait_f(parmap);
124
125   xbt_os_cond_destroy(parmap->ready_cond);
126   xbt_os_mutex_destroy(parmap->ready_mutex);
127   xbt_os_cond_destroy(parmap->done_cond);
128   xbt_os_mutex_destroy(parmap->done_mutex);
129
130   xbt_free(parmap);
131 }
132
133 /**
134  * \brief Sets the synchronization mode of a parmap.
135  * \param parmap a parallel map object
136  * \param mode the synchronization mode
137  */
138 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
139 {
140   if (mode == XBT_PARMAP_DEFAULT) {
141 #ifdef HAVE_FUTEX_H
142     mode = XBT_PARMAP_FUTEX;
143 #else
144     mode = XBT_PARMAP_POSIX;
145 #endif
146   }
147   parmap->mode = mode;
148
149   switch (mode) {
150
151     case XBT_PARMAP_POSIX:
152       parmap->master_wait_f = xbt_parmap_posix_master_wait;
153       parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
154       parmap->master_signal_f = xbt_parmap_posix_master_signal;
155       parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
156
157       parmap->ready_cond = xbt_os_cond_init();
158       parmap->ready_mutex = xbt_os_mutex_init();
159       parmap->done_cond = xbt_os_cond_init();
160       parmap->done_mutex = xbt_os_mutex_init();
161       break;
162
163
164     case XBT_PARMAP_FUTEX:
165 #ifdef HAVE_FUTEX_H
166       parmap->master_wait_f = xbt_parmap_futex_master_wait;
167       parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
168       parmap->master_signal_f = xbt_parmap_futex_master_signal;
169       parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
170
171       xbt_os_cond_destroy(parmap->ready_cond);
172       xbt_os_mutex_destroy(parmap->ready_mutex);
173       xbt_os_cond_destroy(parmap->done_cond);
174       xbt_os_mutex_destroy(parmap->done_mutex);
175       break;
176 #else
177       xbt_die("Futex is not available on this OS.");
178 #endif
179
180     case XBT_PARMAP_BUSY_WAIT:
181       parmap->master_wait_f = xbt_parmap_busy_master_wait;
182       parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
183       parmap->master_signal_f = xbt_parmap_busy_master_signal;
184       parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
185
186       xbt_os_cond_destroy(parmap->ready_cond);
187       xbt_os_mutex_destroy(parmap->ready_mutex);
188       xbt_os_cond_destroy(parmap->done_cond);
189       xbt_os_mutex_destroy(parmap->done_mutex);
190       break;
191
192     case XBT_PARMAP_DEFAULT:
193       THROW_IMPOSSIBLE;
194       break;
195   }
196 }
197
198 /**
199  * \brief Applies a list of tasks in parallel.
200  * \param parmap a parallel map object
201  * \param fun the function to call in parallel
202  * \param data each element of this dynar will be passed as an argument to fun
203  */
204 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
205 {
206   /* Assign resources to worker threads */
207   parmap->fun = fun;
208   parmap->data = data;
209   parmap->index = 0;
210   parmap->master_signal_f(parmap);
211   xbt_parmap_work(parmap);
212   parmap->master_wait_f(parmap);
213   XBT_DEBUG("Job done");
214 }
215
216 /**
217  * \brief Returns a next task to process.
218  *
219  * Worker threads call this function to get more work.
220  *
221  * \return the next task to process, or NULL if there is no more work
222  */
223 void* xbt_parmap_next(xbt_parmap_t parmap)
224 {
225   unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
226   if (index < xbt_dynar_length(parmap->data)) {
227     return xbt_dynar_get_as(parmap->data, index, void*);
228   }
229   return NULL;
230 }
231
232 static void xbt_parmap_work(xbt_parmap_t parmap)
233 {
234   unsigned index;
235   while ((index = __sync_fetch_and_add(&parmap->index, 1))
236          < xbt_dynar_length(parmap->data))
237     parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
238 }
239
240 /**
241  * \brief Main function of a worker thread.
242  * \param arg the parmap
243  */
244 static void *xbt_parmap_worker_main(void *arg)
245 {
246   xbt_parmap_t parmap = (xbt_parmap_t) arg;
247   unsigned round = 0;
248
249   XBT_DEBUG("New worker thread created");
250
251   /* Worker's main loop */
252   while (1) {
253     parmap->worker_wait_f(parmap, ++round);
254     if (parmap->status == XBT_PARMAP_WORK) {
255
256       XBT_DEBUG("Worker got a job");
257
258       xbt_parmap_work(parmap);
259       parmap->worker_signal_f(parmap);
260
261       XBT_DEBUG("Worker has finished");
262
263     /* We are destroying the parmap */
264     } else {
265       parmap->worker_signal_f(parmap);
266       return NULL;
267     }
268   }
269 }
270
271 #ifdef HAVE_FUTEX_H
272 static void futex_wait(unsigned *uaddr, unsigned val)
273 {
274   XBT_VERB("Waiting on futex %p", uaddr);
275   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
276 }
277
278 static void futex_wake(unsigned *uaddr, unsigned val)
279 {
280   XBT_VERB("Waking futex %p", uaddr);
281   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
282 }
283 #endif
284
285 /**
286  * \brief Starts the parmap: waits for all workers to be ready and returns.
287  *
288  * This function is called by the controller thread.
289  *
290  * \param parmap a parmap
291  */
292 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
293 {
294   xbt_os_mutex_acquire(parmap->done_mutex);
295   if (parmap->thread_counter < parmap->num_workers) {
296     /* wait for all workers to be ready */
297     xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
298   }
299   xbt_os_mutex_release(parmap->done_mutex);
300 }
301
302 /**
303  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
304  *
305  * This function is called by all worker threads when they end (not including
306  * the controller).
307  *
308  * \param parmap a parmap
309  */
310 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
311 {
312   xbt_os_mutex_acquire(parmap->done_mutex);
313   if (++parmap->thread_counter == parmap->num_workers) {
314     /* all workers have finished, wake the controller */
315     xbt_os_cond_signal(parmap->done_cond);
316   }
317   xbt_os_mutex_release(parmap->done_mutex);
318 }
319
320 /**
321  * \brief Wakes all workers and waits for them to finish the tasks.
322  *
323  * This function is called by the controller thread.
324  *
325  * \param parmap a parmap
326  */
327 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
328 {
329   xbt_os_mutex_acquire(parmap->ready_mutex);
330   parmap->thread_counter = 1;
331   parmap->work++;
332   /* wake all workers */
333   xbt_os_cond_broadcast(parmap->ready_cond);
334   xbt_os_mutex_release(parmap->ready_mutex);
335 }
336
337 /**
338  * \brief Waits for some work to process.
339  *
340  * This function is called by each worker thread (not including the controller)
341  * when it has no more work to do.
342  *
343  * \param parmap a parmap
344  * \param round  the expected round number
345  */
346 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
347 {
348   xbt_os_mutex_acquire(parmap->ready_mutex);
349   /* wait for more work */
350   if (parmap->work != round) {
351     xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
352   }
353   xbt_os_mutex_release(parmap->ready_mutex);
354 }
355
356 #ifdef HAVE_FUTEX_H
357 /**
358  * \brief Starts the parmap: waits for all workers to be ready and returns.
359  *
360  * This function is called by the controller thread.
361  *
362  * \param parmap a parmap
363  */
364 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
365 {
366   unsigned count = parmap->thread_counter;
367   while (count < parmap->num_workers) {
368     /* wait for all workers to be ready */
369     futex_wait(&parmap->thread_counter, count);
370     count = parmap->thread_counter;
371   }
372 }
373
374 /**
375  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
376  *
377  * This function is called by all worker threads when they end (not including
378  * the controller).
379  *
380  * \param parmap a parmap
381  */
382 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
383 {
384   unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
385   if (count == parmap->num_workers) {
386     /* all workers have finished, wake the controller */
387     futex_wake(&parmap->thread_counter, INT_MAX);
388   }
389 }
390
391 /**
392  * \brief Wakes all workers and waits for them to finish the tasks.
393  *
394  * This function is called by the controller thread.
395  *
396  * \param parmap a parmap
397  */
398 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
399 {
400   parmap->thread_counter = 1;
401   __sync_add_and_fetch(&parmap->work, 1);
402   /* wake all workers */
403   futex_wake(&parmap->work, INT_MAX);
404 }
405
406 /**
407  * \brief Waits for some work to process.
408  *
409  * This function is called by each worker thread (not including the controller)
410  * when it has no more work to do.
411  *
412  * \param parmap a parmap
413  * \param round  the expected round number
414  */
415 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
416 {
417   unsigned work = parmap->work;
418   /* wait for more work */
419   while (work != round) {
420     futex_wait(&parmap->work, work);
421     work = parmap->work;
422   }
423 }
424 #endif
425
426 /**
427  * \brief Starts the parmap: waits for all workers to be ready and returns.
428  *
429  * This function is called by the controller thread.
430  *
431  * \param parmap a parmap
432  */
433 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
434 {
435   while (parmap->thread_counter < parmap->num_workers) {
436     xbt_os_thread_yield();
437   }
438 }
439
440 /**
441  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
442  *
443  * This function is called by all worker threads when they end.
444  *
445  * \param parmap a parmap
446  */
447 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
448 {
449   __sync_add_and_fetch(&parmap->thread_counter, 1);
450 }
451
452 /**
453  * \brief Wakes all workers and waits for them to finish the tasks.
454  *
455  * This function is called by the controller thread.
456  *
457  * \param parmap a parmap
458  */
459 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
460 {
461   parmap->thread_counter = 1;
462   __sync_add_and_fetch(&parmap->work, 1);
463 }
464
465 /**
466  * \brief Waits for some work to process.
467  *
468  * This function is called by each worker thread (not including the controller)
469  * when it has no more work to do.
470  *
471  * \param parmap a parmap
472  * \param round  the expected round number
473  */
474 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
475 {
476   /* wait for more work */
477   while (parmap->work != round) {
478     xbt_os_thread_yield();
479   }
480 }
481
482 #ifdef SIMGRID_TEST
483 #include "xbt.h"
484 #include "xbt/ex.h"
485 #include "xbt/xbt_os_thread.h"
486 #include "xbt/xbt_os_time.h"
487 #include "gras_config.h"        /* HAVE_FUTEX_H */
488
489 XBT_TEST_SUITE("parmap", "Parallel Map");
490 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit);
491
492 #ifdef HAVE_FUTEX_H
493 #define TEST_PARMAP_SKIP_TEST(mode) 0
494 #else
495 #define TEST_PARMAP_SKIP_TEST(mode) ((mode) == XBT_PARMAP_FUTEX)
496 #endif
497
498 #define TEST_PARMAP_VALIDATE_MODE(mode) \
499   if (TEST_PARMAP_SKIP_TEST(mode)) { xbt_test_skip(); return; } else ((void)0)
500
501 static void fun_double(void *arg)
502 {
503   unsigned *u = arg;
504   *u = 2 * *u + 1;
505 }
506
507 /* Check that the computations are correctly done. */
508 static void test_parmap_basic(e_xbt_parmap_mode_t mode)
509 {
510   unsigned num_workers;
511
512   for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
513     const unsigned len = 1033;
514     const unsigned num = 5;
515     unsigned *a;
516     xbt_dynar_t data;
517     xbt_parmap_t parmap;
518     unsigned i;
519
520     xbt_test_add("Basic parmap usage (%u workers)", num_workers);
521
522     TEST_PARMAP_VALIDATE_MODE(mode);
523     parmap = xbt_parmap_new(num_workers, mode);
524
525     a = xbt_malloc(len * sizeof *a);
526     data = xbt_dynar_new(sizeof a, NULL);
527     for (i = 0; i < len; i++) {
528       a[i] = i;
529       xbt_dynar_push_as(data, void *, &a[i]);
530     }
531
532     for (i = 0; i < num; i++)
533       xbt_parmap_apply(parmap, fun_double, data);
534
535     for (i = 0; i < len; i++) {
536       unsigned expected = (1U << num) * (i + 1) - 1;
537       xbt_test_assert(a[i] == expected,
538                       "a[%u]: expected %u, got %u", i, expected, a[i]);
539     }
540
541     xbt_dynar_free(&data);
542     xbt_free(a);
543     xbt_parmap_destroy(parmap);
544   }
545 }
546
547 XBT_TEST_UNIT("basic_posix", test_parmap_basic_posix, "Basic usage: posix")
548 {
549   test_parmap_basic(XBT_PARMAP_POSIX);
550 }
551
552 XBT_TEST_UNIT("basic_futex", test_parmap_basic_futex, "Basic usage: futex")
553 {
554   test_parmap_basic(XBT_PARMAP_FUTEX);
555 }
556
557 XBT_TEST_UNIT("basic_busy_wait", test_parmap_basic_busy_wait, "Basic usage: busy_wait")
558 {
559   test_parmap_basic(XBT_PARMAP_BUSY_WAIT);
560 }
561
562 static void fun_get_id(void *arg)
563 {
564   *(uintptr_t *)arg = (uintptr_t)xbt_os_thread_self();
565   xbt_os_sleep(0.5);
566 }
567
568 static int fun_compare(const void *pa, const void *pb)
569 {
570   uintptr_t a = *(uintptr_t *)pa;
571   uintptr_t b = *(uintptr_t *)pb;
572   return a < b ? -1 : a > b ? 1 : 0;
573 }
574
575 /* Check that all threads are working. */
576 static void test_parmap_extended(e_xbt_parmap_mode_t mode)
577 {
578   unsigned num_workers;
579
580   for (num_workers = 1 ; num_workers <= 16 ; num_workers *= 2) {
581     const unsigned len = 2 * num_workers;
582     uintptr_t *a;
583     xbt_parmap_t parmap;
584     xbt_dynar_t data;
585     unsigned i;
586     unsigned count;
587
588     xbt_test_add("Extended parmap usage (%u workers)", num_workers);
589
590     TEST_PARMAP_VALIDATE_MODE(mode);
591     parmap = xbt_parmap_new(num_workers, mode);
592
593     a = xbt_malloc(len * sizeof *a);
594     data = xbt_dynar_new(sizeof a, NULL);
595     for (i = 0; i < len; i++)
596       xbt_dynar_push_as(data, void *, &a[i]);
597
598     xbt_parmap_apply(parmap, fun_get_id, data);
599
600     qsort(a, len, sizeof a[0], fun_compare);
601     count = 1;
602     for (i = 1; i < len; i++)
603       if (a[i] != a[i - 1])
604         count++;
605     xbt_test_assert(count == num_workers,
606                     "only %u/%u threads did some work", count, num_workers);
607
608     xbt_dynar_free(&data);
609     xbt_free(a);
610     xbt_parmap_destroy(parmap);
611   }
612 }
613
614 XBT_TEST_UNIT("extended_posix", test_parmap_extended_posix, "Extended usage: posix")
615 {
616   test_parmap_extended(XBT_PARMAP_POSIX);
617 }
618
619 XBT_TEST_UNIT("extended_futex", test_parmap_extended_futex, "Extended usage: futex")
620 {
621   test_parmap_extended(XBT_PARMAP_FUTEX);
622 }
623
624 XBT_TEST_UNIT("extended_busy_wait", test_parmap_extended_busy_wait, "Extended usage: busy_wait")
625 {
626   test_parmap_extended(XBT_PARMAP_BUSY_WAIT);
627 }
628
629 #endif /* SIMGRID_TEST */