Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'hypervisor' of scm.gforge.inria.fr:/gitroot/simgrid/simgrid into hypervisor
[simgrid.git] / src / xbt / parmap.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "internal_config.h"
7 #include <unistd.h>
8
9 #ifndef _XBT_WIN32
10 #include <sys/syscall.h>
11 #endif
12
13 #ifdef HAVE_FUTEX_H
14 #include <linux/futex.h>
15 #include <limits.h>
16 #endif
17
18 #include "xbt/parmap.h"
19 #include "xbt/log.h"
20 #include "xbt/function_types.h"
21 #include "xbt/dynar.h"
22 #include "xbt/xbt_os_thread.h"
23 #include "xbt/sysdep.h"
24 #include "simix/smx_private.h"
25
26 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
27
28 typedef enum {
29   XBT_PARMAP_WORK,
30   XBT_PARMAP_DESTROY
31 } e_xbt_parmap_flag_t;
32
33 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
34 static void *xbt_parmap_worker_main(void *parmap);
35 static void xbt_parmap_work(xbt_parmap_t parmap);
36
37 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
38 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
39 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
40 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
41
42 #ifdef HAVE_FUTEX_H
43 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
44 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
46 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
47 static void futex_wait(unsigned *uaddr, unsigned val);
48 static void futex_wake(unsigned *uaddr, unsigned val);
49 #endif
50
51 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
52 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
53 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
54 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
55
56
57 /**
58  * \brief Parallel map structure
59  */
60 typedef struct s_xbt_parmap {
61   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
62   unsigned work;                   /**< index of the current round */
63   unsigned thread_counter;         /**< number of workers that have done the work */
64
65   unsigned int num_workers;        /**< total number of worker threads including the controller */
66   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
67   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
68   unsigned int index;              /**< index of the next element of data to pick */
69
70   /* posix only */
71   xbt_os_cond_t ready_cond;
72   xbt_os_mutex_t ready_mutex;
73   xbt_os_cond_t done_cond;
74   xbt_os_mutex_t done_mutex;
75
76   /* fields that depend on the synchronization mode */
77   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
78   void (*master_wait_f)(xbt_parmap_t);    /**< wait for the workers to have done the work */
79   void (*worker_signal_f)(xbt_parmap_t);  /**< signal the master that a worker has done the work */
80   void (*master_signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
81   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
82 } s_xbt_parmap_t;
83
84 /**
85  * \brief Thread data transmission structure
86  */
87 typedef struct s_xbt_parmap_thread_data{
88   xbt_parmap_t parmap;
89   int worker_id;
90 } s_xbt_parmap_thread_data_t;
91
92 typedef s_xbt_parmap_thread_data_t *xbt_parmap_thread_data_t;
93
94 /**
95  * \brief Creates a parallel map object
96  * \param num_workers number of worker threads to create
97  * \param mode how to synchronize the worker threads
98  * \return the parmap created
99  */
100 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
101 {
102   unsigned int i;
103   xbt_os_thread_t worker = NULL;
104
105   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
106
107   /* Initialize the thread pool data structure */
108   xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
109
110   parmap->num_workers = num_workers;
111   parmap->status = XBT_PARMAP_WORK;
112   xbt_parmap_set_mode(parmap, mode);
113
114   /* Create the pool of worker threads */
115   xbt_parmap_thread_data_t data;
116   for (i = 1; i < num_workers; i++) {
117     data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
118     data->parmap = parmap;
119     data->worker_id = i;
120     worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, data, NULL);
121     xbt_os_thread_detach(worker);
122   }
123   return parmap;
124 }
125
126 /**
127  * \brief Destroys a parmap
128  * \param parmap the parmap to destroy
129  */
130 void xbt_parmap_destroy(xbt_parmap_t parmap)
131 {
132   if (!parmap) {
133     return;
134   }
135
136   parmap->status = XBT_PARMAP_DESTROY;
137   parmap->master_signal_f(parmap);
138   parmap->master_wait_f(parmap);
139
140   xbt_os_cond_destroy(parmap->ready_cond);
141   xbt_os_mutex_destroy(parmap->ready_mutex);
142   xbt_os_cond_destroy(parmap->done_cond);
143   xbt_os_mutex_destroy(parmap->done_mutex);
144
145   xbt_free(parmap);
146 }
147
148 /**
149  * \brief Sets the synchronization mode of a parmap.
150  * \param parmap a parallel map object
151  * \param mode the synchronization mode
152  */
153 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
154 {
155   if (mode == XBT_PARMAP_DEFAULT) {
156 #ifdef HAVE_FUTEX_H
157     mode = XBT_PARMAP_FUTEX;
158 #else
159     mode = XBT_PARMAP_POSIX;
160 #endif
161   }
162   parmap->mode = mode;
163
164   switch (mode) {
165
166     case XBT_PARMAP_POSIX:
167       parmap->master_wait_f = xbt_parmap_posix_master_wait;
168       parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
169       parmap->master_signal_f = xbt_parmap_posix_master_signal;
170       parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
171
172       parmap->ready_cond = xbt_os_cond_init();
173       parmap->ready_mutex = xbt_os_mutex_init();
174       parmap->done_cond = xbt_os_cond_init();
175       parmap->done_mutex = xbt_os_mutex_init();
176       break;
177
178
179     case XBT_PARMAP_FUTEX:
180 #ifdef HAVE_FUTEX_H
181       parmap->master_wait_f = xbt_parmap_futex_master_wait;
182       parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
183       parmap->master_signal_f = xbt_parmap_futex_master_signal;
184       parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
185
186       xbt_os_cond_destroy(parmap->ready_cond);
187       xbt_os_mutex_destroy(parmap->ready_mutex);
188       xbt_os_cond_destroy(parmap->done_cond);
189       xbt_os_mutex_destroy(parmap->done_mutex);
190       break;
191 #else
192       xbt_die("Futex is not available on this OS.");
193 #endif
194
195     case XBT_PARMAP_BUSY_WAIT:
196       parmap->master_wait_f = xbt_parmap_busy_master_wait;
197       parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
198       parmap->master_signal_f = xbt_parmap_busy_master_signal;
199       parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
200
201       xbt_os_cond_destroy(parmap->ready_cond);
202       xbt_os_mutex_destroy(parmap->ready_mutex);
203       xbt_os_cond_destroy(parmap->done_cond);
204       xbt_os_mutex_destroy(parmap->done_mutex);
205       break;
206
207     case XBT_PARMAP_DEFAULT:
208       THROW_IMPOSSIBLE;
209       break;
210   }
211 }
212
213 /**
214  * \brief Applies a list of tasks in parallel.
215  * \param parmap a parallel map object
216  * \param fun the function to call in parallel
217  * \param data each element of this dynar will be passed as an argument to fun
218  */
219 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
220 {
221   /* Assign resources to worker threads */
222   parmap->fun = fun;
223   parmap->data = data;
224   parmap->index = 0;
225   parmap->master_signal_f(parmap);
226   xbt_parmap_work(parmap);
227   parmap->master_wait_f(parmap);
228   XBT_DEBUG("Job done");
229 }
230
231 /**
232  * \brief Returns a next task to process.
233  *
234  * Worker threads call this function to get more work.
235  *
236  * \return the next task to process, or NULL if there is no more work
237  */
238 void* xbt_parmap_next(xbt_parmap_t parmap)
239 {
240   unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
241   if (index < xbt_dynar_length(parmap->data)) {
242     return xbt_dynar_get_as(parmap->data, index, void*);
243   }
244   return NULL;
245 }
246
247 static void xbt_parmap_work(xbt_parmap_t parmap)
248 {
249   unsigned index;
250   while ((index = __sync_fetch_and_add(&parmap->index, 1))
251          < xbt_dynar_length(parmap->data))
252     parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
253 }
254
255 /**
256  * \brief Main function of a worker thread.
257  * \param arg the parmap
258  */
259 static void *xbt_parmap_worker_main(void *arg)
260 {
261   xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
262   xbt_parmap_t parmap = data->parmap;
263   unsigned round = 0;
264   smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL);
265   SIMIX_context_set_current(context);
266
267   XBT_DEBUG("New worker thread created");
268
269   /* Worker's main loop */
270   while (1) {
271     parmap->worker_wait_f(parmap, ++round);
272     if (parmap->status == XBT_PARMAP_WORK) {
273
274       XBT_DEBUG("Worker %d got a job", data->worker_id);
275
276       xbt_parmap_work(parmap);
277       parmap->worker_signal_f(parmap);
278
279       XBT_DEBUG("Worker %d has finished", data->worker_id);
280
281     /* We are destroying the parmap */
282     } else {
283       xbt_free(data);
284       parmap->worker_signal_f(parmap);
285       return NULL;
286     }
287   }
288 }
289
290 #ifdef HAVE_FUTEX_H
291 static void futex_wait(unsigned *uaddr, unsigned val)
292 {
293   XBT_VERB("Waiting on futex %p", uaddr);
294   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
295 }
296
297 static void futex_wake(unsigned *uaddr, unsigned val)
298 {
299   XBT_VERB("Waking futex %p", uaddr);
300   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
301 }
302 #endif
303
304 /**
305  * \brief Starts the parmap: waits for all workers to be ready and returns.
306  *
307  * This function is called by the controller thread.
308  *
309  * \param parmap a parmap
310  */
311 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
312 {
313   xbt_os_mutex_acquire(parmap->done_mutex);
314   if (parmap->thread_counter < parmap->num_workers) {
315     /* wait for all workers to be ready */
316     xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
317   }
318   xbt_os_mutex_release(parmap->done_mutex);
319 }
320
321 /**
322  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
323  *
324  * This function is called by all worker threads when they end (not including
325  * the controller).
326  *
327  * \param parmap a parmap
328  */
329 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
330 {
331   xbt_os_mutex_acquire(parmap->done_mutex);
332   if (++parmap->thread_counter == parmap->num_workers) {
333     /* all workers have finished, wake the controller */
334     xbt_os_cond_signal(parmap->done_cond);
335   }
336   xbt_os_mutex_release(parmap->done_mutex);
337 }
338
339 /**
340  * \brief Wakes all workers and waits for them to finish the tasks.
341  *
342  * This function is called by the controller thread.
343  *
344  * \param parmap a parmap
345  */
346 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
347 {
348   xbt_os_mutex_acquire(parmap->ready_mutex);
349   parmap->thread_counter = 1;
350   parmap->work++;
351   /* wake all workers */
352   xbt_os_cond_broadcast(parmap->ready_cond);
353   xbt_os_mutex_release(parmap->ready_mutex);
354 }
355
356 /**
357  * \brief Waits for some work to process.
358  *
359  * This function is called by each worker thread (not including the controller)
360  * when it has no more work to do.
361  *
362  * \param parmap a parmap
363  * \param round  the expected round number
364  */
365 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
366 {
367   xbt_os_mutex_acquire(parmap->ready_mutex);
368   /* wait for more work */
369   if (parmap->work != round) {
370     xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
371   }
372   xbt_os_mutex_release(parmap->ready_mutex);
373 }
374
375 #ifdef HAVE_FUTEX_H
376 /**
377  * \brief Starts the parmap: waits for all workers to be ready and returns.
378  *
379  * This function is called by the controller thread.
380  *
381  * \param parmap a parmap
382  */
383 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
384 {
385   unsigned count = parmap->thread_counter;
386   while (count < parmap->num_workers) {
387     /* wait for all workers to be ready */
388     futex_wait(&parmap->thread_counter, count);
389     count = parmap->thread_counter;
390   }
391 }
392
393 /**
394  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
395  *
396  * This function is called by all worker threads when they end (not including
397  * the controller).
398  *
399  * \param parmap a parmap
400  */
401 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
402 {
403   unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
404   if (count == parmap->num_workers) {
405     /* all workers have finished, wake the controller */
406     futex_wake(&parmap->thread_counter, INT_MAX);
407   }
408 }
409
410 /**
411  * \brief Wakes all workers and waits for them to finish the tasks.
412  *
413  * This function is called by the controller thread.
414  *
415  * \param parmap a parmap
416  */
417 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
418 {
419   parmap->thread_counter = 1;
420   __sync_add_and_fetch(&parmap->work, 1);
421   /* wake all workers */
422   futex_wake(&parmap->work, INT_MAX);
423 }
424
425 /**
426  * \brief Waits for some work to process.
427  *
428  * This function is called by each worker thread (not including the controller)
429  * when it has no more work to do.
430  *
431  * \param parmap a parmap
432  * \param round  the expected round number
433  */
434 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
435 {
436   unsigned work = parmap->work;
437   /* wait for more work */
438   while (work != round) {
439     futex_wait(&parmap->work, work);
440     work = parmap->work;
441   }
442 }
443 #endif
444
445 /**
446  * \brief Starts the parmap: waits for all workers to be ready and returns.
447  *
448  * This function is called by the controller thread.
449  *
450  * \param parmap a parmap
451  */
452 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
453 {
454   while (parmap->thread_counter < parmap->num_workers) {
455     xbt_os_thread_yield();
456   }
457 }
458
459 /**
460  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
461  *
462  * This function is called by all worker threads when they end.
463  *
464  * \param parmap a parmap
465  */
466 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
467 {
468   __sync_add_and_fetch(&parmap->thread_counter, 1);
469 }
470
471 /**
472  * \brief Wakes all workers and waits for them to finish the tasks.
473  *
474  * This function is called by the controller thread.
475  *
476  * \param parmap a parmap
477  */
478 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
479 {
480   parmap->thread_counter = 1;
481   __sync_add_and_fetch(&parmap->work, 1);
482 }
483
484 /**
485  * \brief Waits for some work to process.
486  *
487  * This function is called by each worker thread (not including the controller)
488  * when it has no more work to do.
489  *
490  * \param parmap a parmap
491  * \param round  the expected round number
492  */
493 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
494 {
495   /* wait for more work */
496   while (parmap->work != round) {
497     xbt_os_thread_yield();
498   }
499 }