Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Parmap review.
[simgrid.git] / src / xbt / parmap.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "gras_config.h"
7 #include <unistd.h>
8
9 #ifndef _XBT_WIN32
10 #include <sys/syscall.h>
11 #endif
12
13 #ifdef HAVE_FUTEX_H
14 #include <linux/futex.h>
15 #endif
16
17 #include "xbt/parmap.h"
18 #include "xbt/log.h"
19 #include "xbt/function_types.h"
20 #include "xbt/dynar.h"
21 #include "xbt/xbt_os_thread.h"
22 #include "xbt/sysdep.h"
23
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
25 XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing");
26
27 typedef enum {
28   XBT_PARMAP_WORK,
29   XBT_PARMAP_DESTROY
30 } e_xbt_parmap_flag_t;
31
32 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
33 static void *xbt_parmap_worker_main(void *parmap);
34 static void xbt_parmap_work(xbt_parmap_t parmap);
35
36 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
37 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
38 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
39 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
40
41 #ifdef HAVE_FUTEX_H
42 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
43 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
44 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
46 static void futex_wait(unsigned *uaddr, unsigned val);
47 static void futex_wake(unsigned *uaddr, unsigned val);
48 #endif
49
50 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
51 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
52 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
53 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
54
55 /**
56  * \brief Parallel map structure
57  */
58 typedef struct s_xbt_parmap {
59   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
60   unsigned work;                   /**< index of the current round */
61   unsigned thread_counter;         /**< number of workers that have done the work */
62
63   unsigned int num_workers;        /**< total number of worker threads including the controller */
64   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
65   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
66   unsigned int index;              /**< index of the next element of data to pick */
67
68   /* posix only */
69   xbt_os_cond_t ready_cond;
70   xbt_os_mutex_t ready_mutex;
71   xbt_os_cond_t done_cond;
72   xbt_os_mutex_t done_mutex;
73
74   /* fields that depend on the synchronization mode */
75   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
76   void (*master_wait_f)(xbt_parmap_t);    /**< wait for the workers to have done the work */
77   void (*worker_signal_f)(xbt_parmap_t);  /**< signal the master that a worker has done the work */
78   void (*master_signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
79   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
80 } s_xbt_parmap_t;
81
82 /**
83  * \brief Creates a parallel map object
84  * \param num_workers number of worker threads to create
85  * \param mode how to synchronize the worker threads
86  * \return the parmap created
87  */
88 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
89 {
90   unsigned int i;
91   xbt_os_thread_t worker = NULL;
92
93   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
94
95   /* Initialize the thread pool data structure */
96   xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
97
98   parmap->num_workers = num_workers;
99   parmap->status = XBT_PARMAP_WORK;
100   xbt_parmap_set_mode(parmap, mode);
101
102   /* Create the pool of worker threads */
103   for (i = 1; i < num_workers; i++) {
104     worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
105     xbt_os_thread_detach(worker);
106   }
107   return parmap;
108 }
109
110 /**
111  * \brief Destroys a parmap
112  * \param parmap the parmap to destroy
113  */
114 void xbt_parmap_destroy(xbt_parmap_t parmap)
115 {
116   if (!parmap) {
117     return;
118   }
119
120   parmap->status = XBT_PARMAP_DESTROY;
121   parmap->master_signal_f(parmap);
122   parmap->master_wait_f(parmap);
123
124   xbt_os_cond_destroy(parmap->ready_cond);
125   xbt_os_mutex_destroy(parmap->ready_mutex);
126   xbt_os_cond_destroy(parmap->done_cond);
127   xbt_os_mutex_destroy(parmap->done_mutex);
128
129   xbt_free(parmap);
130 }
131
132 /**
133  * \brief Sets the synchronization mode of a parmap.
134  * \param parmap a parallel map object
135  * \param mode the synchronization mode
136  */
137 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
138 {
139   if (mode == XBT_PARMAP_DEFAULT) {
140 #ifdef HAVE_FUTEX_H
141     mode = XBT_PARMAP_FUTEX;
142 #else
143     mode = XBT_PARMAP_POSIX;
144 #endif
145   }
146   parmap->mode = mode;
147
148   switch (mode) {
149
150     case XBT_PARMAP_POSIX:
151       parmap->master_wait_f = xbt_parmap_posix_master_wait;
152       parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
153       parmap->master_signal_f = xbt_parmap_posix_master_signal;
154       parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
155
156       parmap->ready_cond = xbt_os_cond_init();
157       parmap->ready_mutex = xbt_os_mutex_init();
158       parmap->done_cond = xbt_os_cond_init();
159       parmap->done_mutex = xbt_os_mutex_init();
160       break;
161
162
163     case XBT_PARMAP_FUTEX:
164 #ifdef HAVE_FUTEX_H
165       parmap->master_wait_f = xbt_parmap_futex_master_wait;
166       parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
167       parmap->master_signal_f = xbt_parmap_futex_master_signal;
168       parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
169
170       xbt_os_cond_destroy(parmap->ready_cond);
171       xbt_os_mutex_destroy(parmap->ready_mutex);
172       xbt_os_cond_destroy(parmap->done_cond);
173       xbt_os_mutex_destroy(parmap->done_mutex);
174       break;
175 #else
176       xbt_die("Futex is not available on this OS.");
177 #endif
178
179     case XBT_PARMAP_BUSY_WAIT:
180       parmap->master_wait_f = xbt_parmap_busy_master_wait;
181       parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
182       parmap->master_signal_f = xbt_parmap_busy_master_signal;
183       parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
184
185       xbt_os_cond_destroy(parmap->ready_cond);
186       xbt_os_mutex_destroy(parmap->ready_mutex);
187       xbt_os_cond_destroy(parmap->done_cond);
188       xbt_os_mutex_destroy(parmap->done_mutex);
189       break;
190
191     case XBT_PARMAP_DEFAULT:
192       THROW_IMPOSSIBLE;
193       break;
194   }
195 }
196
197 /**
198  * \brief Applies a list of tasks in parallel.
199  * \param parmap a parallel map object
200  * \param fun the function to call in parallel
201  * \param data each element of this dynar will be passed as an argument to fun
202  */
203 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
204 {
205   /* Assign resources to worker threads */
206   parmap->fun = fun;
207   parmap->data = data;
208   parmap->index = 0;
209   parmap->master_signal_f(parmap);
210   xbt_parmap_work(parmap);
211   parmap->master_wait_f(parmap);
212   XBT_DEBUG("Job done");
213 }
214
215 /**
216  * \brief Returns a next task to process.
217  *
218  * Worker threads call this function to get more work.
219  *
220  * \return the next task to process, or NULL if there is no more work
221  */
222 void* xbt_parmap_next(xbt_parmap_t parmap)
223 {
224   unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
225   if (index < xbt_dynar_length(parmap->data)) {
226     return xbt_dynar_get_as(parmap->data, index, void*);
227   }
228   return NULL;
229 }
230
231 static void xbt_parmap_work(xbt_parmap_t parmap)
232 {
233   unsigned index;
234   while ((index = __sync_fetch_and_add(&parmap->index, 1))
235          < xbt_dynar_length(parmap->data))
236     parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
237 }
238
239 /**
240  * \brief Main function of a worker thread.
241  * \param arg the parmap
242  */
243 static void *xbt_parmap_worker_main(void *arg)
244 {
245   xbt_parmap_t parmap = (xbt_parmap_t) arg;
246   unsigned round = 0;
247
248   XBT_DEBUG("New worker thread created");
249
250   /* Worker's main loop */
251   while (1) {
252     parmap->worker_wait_f(parmap, ++round);
253     if (parmap->status == XBT_PARMAP_WORK) {
254
255       XBT_DEBUG("Worker got a job");
256
257       xbt_parmap_work(parmap);
258       parmap->worker_signal_f(parmap);
259
260       XBT_DEBUG("Worker has finished");
261
262     /* We are destroying the parmap */
263     } else {
264       parmap->worker_signal_f(parmap);
265       return NULL;
266     }
267   }
268 }
269
270 #ifdef HAVE_FUTEX_H
271 static void futex_wait(unsigned *uaddr, unsigned val)
272 {
273   XBT_VERB("Waiting on futex %p", uaddr);
274   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
275 }
276
277 static void futex_wake(unsigned *uaddr, unsigned val)
278 {
279   XBT_VERB("Waking futex %p", uaddr);
280   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
281 }
282 #endif
283
284 /**
285  * \brief Starts the parmap: waits for all workers to be ready and returns.
286  *
287  * This function is called by the controller thread.
288  *
289  * \param parmap a parmap
290  */
291 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
292 {
293   xbt_os_mutex_acquire(parmap->done_mutex);
294   if (parmap->thread_counter < parmap->num_workers) {
295     /* wait for all workers to be ready */
296     xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
297   }
298   xbt_os_mutex_release(parmap->done_mutex);
299 }
300
301 /**
302  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
303  *
304  * This function is called by all worker threads when they end (not including
305  * the controller).
306  *
307  * \param parmap a parmap
308  */
309 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
310 {
311   xbt_os_mutex_acquire(parmap->done_mutex);
312   if (++parmap->thread_counter == parmap->num_workers) {
313     /* all workers have finished, wake the controller */
314     xbt_os_cond_signal(parmap->done_cond);
315   }
316   xbt_os_mutex_release(parmap->done_mutex);
317 }
318
319 /**
320  * \brief Wakes all workers and waits for them to finish the tasks.
321  *
322  * This function is called by the controller thread.
323  *
324  * \param parmap a parmap
325  */
326 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
327 {
328   xbt_os_mutex_acquire(parmap->ready_mutex);
329   parmap->thread_counter = 1;
330   parmap->work++;
331   /* wake all workers */
332   xbt_os_cond_broadcast(parmap->ready_cond);
333   xbt_os_mutex_release(parmap->ready_mutex);
334 }
335
336 /**
337  * \brief Waits for some work to process.
338  *
339  * This function is called by each worker thread (not including the controller)
340  * when it has no more work to do.
341  *
342  * \param parmap a parmap
343  * \param round  the expected round number
344  */
345 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
346 {
347   xbt_os_mutex_acquire(parmap->ready_mutex);
348   /* wait for more work */
349   if (parmap->work < round) {
350     xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
351   }
352   xbt_os_mutex_release(parmap->ready_mutex);
353 }
354
355 #ifdef HAVE_FUTEX_H
356 /**
357  * \brief Starts the parmap: waits for all workers to be ready and returns.
358  *
359  * This function is called by the controller thread.
360  *
361  * \param parmap a parmap
362  */
363 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
364 {
365   unsigned count = parmap->thread_counter;
366   while (count < parmap->num_workers) {
367     /* wait for all workers to be ready */
368     futex_wait(&parmap->thread_counter, count);
369     count = parmap->thread_counter;
370   }
371 }
372
373 /**
374  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
375  *
376  * This function is called by all worker threads when they end (not including
377  * the controller).
378  *
379  * \param parmap a parmap
380  */
381 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
382 {
383   unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
384   if (count == parmap->num_workers) {
385     /* all workers have finished, wake the controller */
386     futex_wake(&parmap->thread_counter, 1);
387   }
388 }
389
390 /**
391  * \brief Wakes all workers and waits for them to finish the tasks.
392  *
393  * This function is called by the controller thread.
394  *
395  * \param parmap a parmap
396  */
397 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
398 {
399   parmap->thread_counter = 1;
400   __sync_add_and_fetch(&parmap->work, 1);
401   /* wake all workers */
402   futex_wake(&parmap->work, parmap->num_workers - 1);
403 }
404
405 /**
406  * \brief Waits for some work to process.
407  *
408  * This function is called by each worker thread (not including the controller)
409  * when it has no more work to do.
410  *
411  * \param parmap a parmap
412  * \param round  the expected round number
413  */
414 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
415 {
416   unsigned work = parmap->work;
417   /* wait for more work */
418   if (work < round)
419     futex_wait(&parmap->work, work);
420 }
421 #endif
422
423 /**
424  * \brief Starts the parmap: waits for all workers to be ready and returns.
425  *
426  * This function is called by the controller thread.
427  *
428  * \param parmap a parmap
429  */
430 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
431 {
432   while (parmap->thread_counter < parmap->num_workers) {
433     xbt_os_thread_yield();
434   }
435 }
436
437 /**
438  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
439  *
440  * This function is called by all worker threads when they end.
441  *
442  * \param parmap a parmap
443  */
444 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
445 {
446   __sync_add_and_fetch(&parmap->thread_counter, 1);
447 }
448
449 /**
450  * \brief Wakes all workers and waits for them to finish the tasks.
451  *
452  * This function is called by the controller thread.
453  *
454  * \param parmap a parmap
455  */
456 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
457 {
458   parmap->thread_counter = 1;
459   __sync_add_and_fetch(&parmap->work, 1);
460 }
461
462 /**
463  * \brief Waits for some work to process.
464  *
465  * This function is called by each worker thread (not including the controller)
466  * when it has no more work to do.
467  *
468  * \param parmap a parmap
469  * \param round  the expected round number
470  */
471 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
472 {
473   /* wait for more work */
474   while (parmap->work < round) {
475     xbt_os_thread_yield();
476   }
477 }
478
479 #ifdef SIMGRID_TEST
480 #include "xbt.h"
481 #include "xbt/ex.h"
482
483 XBT_TEST_SUITE("parmap", "Parallel Map");
484 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit);
485
486 xbt_parmap_t parmap;
487
488 void fun(void *arg);
489
490 void fun(void *arg)
491 {
492   //XBT_INFO("I'm job %lu", (unsigned long)arg);
493 }
494
495 XBT_TEST_UNIT("basic", test_parmap_basic, "Basic usage")
496 {
497   xbt_test_add("Create the parmap");
498
499   unsigned long i, j;
500   xbt_dynar_t data = xbt_dynar_new(sizeof(void *), NULL);
501
502   /* Create the parallel map */
503 #ifdef HAVE_FUTEX_H
504   parmap = xbt_parmap_new(10, XBT_PARMAP_FUTEX);
505 #else
506   parmap = xbt_parmap_new(10, XBT_PARMAP_BUSY_WAIT);
507 #endif
508   for (j = 0; j < 100; j++) {
509     xbt_dynar_push_as(data, void *, (void *)j);
510   }
511
512   for (i = 0; i < 5; i++) {
513     xbt_parmap_apply(parmap, fun, data);
514   }
515
516   /* Destroy the parmap */
517   xbt_parmap_destroy(parmap);
518   xbt_dynar_free(&data);
519 }
520
521 #endif /* SIMGRID_TEST */