Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Allow xbt_parmap_destroy to be called with a NULL parameter
[simgrid.git] / src / xbt / parmap.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6 #include "gras_config.h"
7 #include <unistd.h>
8
9 #ifndef _XBT_WIN32
10 #include <sys/syscall.h>
11 #endif
12
13 #ifdef HAVE_FUTEX_H
14 #include <linux/futex.h>
15 #endif
16
17 #include "xbt/parmap.h"
18 #include "xbt/log.h"
19 #include "xbt/function_types.h"
20 #include "xbt/dynar.h"
21 #include "xbt/xbt_os_thread.h"
22 #include "xbt/sysdep.h"
23
24 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
25 XBT_LOG_NEW_SUBCATEGORY(xbt_parmap_unit, xbt_parmap, "parmap unit testing");
26
27 typedef enum {
28   XBT_PARMAP_WORK = 0,
29   XBT_PARMAP_DESTROY
30 } e_xbt_parmap_flag_t;
31
32 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
33 static void *xbt_parmap_worker_main(void *parmap);
34
35 static void xbt_parmap_posix_start(xbt_parmap_t parmap);
36 static void xbt_parmap_posix_end(xbt_parmap_t parmap);
37 static void xbt_parmap_posix_signal(xbt_parmap_t parmap);
38 static void xbt_parmap_posix_wait(xbt_parmap_t parmap);
39
40 #ifdef HAVE_FUTEX_H
41 static void xbt_parmap_futex_start(xbt_parmap_t parmap);
42 static void xbt_parmap_futex_end(xbt_parmap_t parmap);
43 static void xbt_parmap_futex_signal(xbt_parmap_t parmap);
44 static void xbt_parmap_futex_wait(xbt_parmap_t parmap);
45 static void futex_wait(int *uaddr, int val);
46 static void futex_wake(int *uaddr, int val);
47 #endif
48
49 static void xbt_parmap_busy_start(xbt_parmap_t parmap);
50 static void xbt_parmap_busy_end(xbt_parmap_t parmap);
51 static void xbt_parmap_busy_signal(xbt_parmap_t parmap);
52 static void xbt_parmap_busy_wait(xbt_parmap_t parmap);
53
54
55 /**
56  * \brief Parallel map structure
57  */
58 typedef struct s_xbt_parmap {
59   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
60   int work;                        /**< index of the current round (1 is the first) */
61   int done;                        /**< number of rounds already done (futexes only) */
62   unsigned int thread_counter;     /**< number of threads currently working */
63   unsigned int num_workers;        /**< total number of worker threads including the controller */
64   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
65   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
66   unsigned int index;              /**< index of the next element of data to pick */
67
68   /* fields that depend on the synchronization mode */
69   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
70   void (*start_f)(xbt_parmap_t);   /**< initializes the worker threads */
71   void (*end_f)(xbt_parmap_t);     /**< finalizes the worker threads */
72   void (*signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
73   void (*wait_f)(xbt_parmap_t);    /**< waits for more work */
74 } s_xbt_parmap_t;
75
76 /**
77  * \brief Creates a parallel map object
78  * \param num_workers number of worker threads to create
79  * \param mode how to synchronize the worker threads
80  * \return the parmap created
81  */
82 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
83 {
84   unsigned int i;
85   xbt_os_thread_t worker = NULL;
86
87   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
88
89   /* Initialize the thread pool data structure */
90   xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
91
92   parmap->num_workers = num_workers;
93   parmap->status = XBT_PARMAP_WORK;
94   xbt_parmap_set_mode(parmap, mode);
95
96   /* Create the pool of worker threads */
97   for (i = 0; i < num_workers - 1; i++) {
98     worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
99     xbt_os_thread_detach(worker);
100   }
101   parmap->start_f(parmap);
102   return parmap;
103 }
104
105 /**
106  * \brief Destroys a parmap
107  * \param parmap the parmap to destroy
108  */
109 void xbt_parmap_destroy(xbt_parmap_t parmap)
110 {
111   if (!parmap) {
112     return;
113   }
114
115   parmap->status = XBT_PARMAP_DESTROY;
116   parmap->signal_f(parmap);
117   xbt_free(parmap);
118 }
119
120 /**
121  * \brief Sets the synchronization mode of a parmap.
122  * \param parmap a parallel map object
123  * \param mode the synchronization mode
124  */
125 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
126 {
127   if (mode == XBT_PARMAP_DEFAULT) {
128 #ifdef HAVE_FUTEX_H
129     mode = XBT_PARMAP_FUTEX;
130 #else
131     //For now use busy wait because posix is unimplemented
132     mode = XBT_PARMAP_BUSY_WAIT;
133 #endif
134   }
135   parmap->mode = mode;
136
137   switch (mode) {
138
139     case XBT_PARMAP_POSIX:
140       parmap->start_f = xbt_parmap_posix_start;
141       parmap->end_f = xbt_parmap_posix_end;
142       parmap->signal_f = xbt_parmap_posix_signal;
143       parmap->wait_f = xbt_parmap_posix_wait;
144       break;
145
146
147     case XBT_PARMAP_FUTEX:
148 #ifdef HAVE_FUTEX_H
149       parmap->start_f = xbt_parmap_futex_start;
150       parmap->end_f = xbt_parmap_futex_end;
151       parmap->signal_f = xbt_parmap_futex_signal;
152       parmap->wait_f = xbt_parmap_futex_wait;
153       break;
154 #else
155       xbt_die("Futex is not available on this OS (maybe you are on a Mac).");
156 #endif
157
158     case XBT_PARMAP_BUSY_WAIT:
159       parmap->start_f = xbt_parmap_busy_start;
160       parmap->end_f = xbt_parmap_busy_end;
161       parmap->signal_f = xbt_parmap_busy_signal;
162       parmap->wait_f = xbt_parmap_busy_wait;
163       break;
164
165     case XBT_PARMAP_DEFAULT:
166       THROW_IMPOSSIBLE;
167       break;
168   }
169 }
170
171 /**
172  * \brief Applies a list of tasks in parallel.
173  * \param parmap a parallel map object
174  * \param fun the function to call in parallel
175  * \param data each element of this dynar will be passed as an argument to fun
176  */
177 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
178 {
179   /* Assign resources to worker threads */
180   parmap->fun = fun;
181   parmap->data = data;
182   parmap->index = 0;
183   parmap->signal_f(parmap);
184   XBT_DEBUG("Job done");
185 }
186
187 /**
188  * \brief Returns a next task to process.
189  *
190  * Worker threads call this function to get more work.
191  *
192  * \return the next task to process, or NULL if there is no more work
193  */
194 void* xbt_parmap_next(xbt_parmap_t parmap)
195 {
196   unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
197   if (index < xbt_dynar_length(parmap->data)) {
198     return xbt_dynar_get_as(parmap->data, index, void*);
199   }
200   return NULL;
201 }
202
203 /**
204  * \brief Main function of a worker thread.
205  * \param arg the parmap
206  */
207 static void *xbt_parmap_worker_main(void *arg)
208 {
209   xbt_parmap_t parmap = (xbt_parmap_t) arg;
210
211   XBT_DEBUG("New worker thread created");
212
213   /* Worker's main loop */
214   while (1) {
215     parmap->wait_f(parmap);
216     if (parmap->status == XBT_PARMAP_WORK) {
217
218       XBT_DEBUG("Worker got a job");
219
220       void* work = xbt_parmap_next(parmap);
221       while (work != NULL) {
222         parmap->fun(work);
223         work = xbt_parmap_next(parmap);
224       }
225
226       XBT_DEBUG("Worker has finished");
227
228     /* We are destroying the parmap */
229     } else {
230       parmap->end_f(parmap);
231       XBT_DEBUG("Shutting down worker");
232       return NULL;
233     }
234   }
235 }
236
237 #ifdef HAVE_FUTEX_H
238 static void futex_wait(int *uaddr, int val)
239 {
240   XBT_VERB("Waiting on futex %p", uaddr);
241   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
242 }
243
244 static void futex_wake(int *uaddr, int val)
245 {
246   XBT_VERB("Waking futex %p", uaddr);
247   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
248 }
249 #endif
250
251 static void xbt_parmap_posix_start(xbt_parmap_t parmap)
252 {
253   THROW_UNIMPLEMENTED;
254 }
255
256 static void xbt_parmap_posix_end(xbt_parmap_t parmap)
257 {
258   THROW_UNIMPLEMENTED;
259 }
260
261 static void xbt_parmap_posix_signal(xbt_parmap_t parmap)
262 {
263   THROW_UNIMPLEMENTED;
264 }
265
266 static void xbt_parmap_posix_wait(xbt_parmap_t parmap)
267 {
268   THROW_UNIMPLEMENTED;
269 }
270
271 #ifdef HAVE_FUTEX_H
272 /**
273  * \brief Starts the parmap: waits for all workers to be ready and returns.
274  *
275  * This function is called by the controller thread.
276  *
277  * \param parmap a parmap
278  */
279 static void xbt_parmap_futex_start(xbt_parmap_t parmap)
280 {
281   int myflag = parmap->done;
282   __sync_fetch_and_add(&parmap->thread_counter, 1);
283   if (parmap->thread_counter < parmap->num_workers) {
284     /* wait for all workers to be ready */
285     futex_wait(&parmap->done, myflag);
286   }
287 }
288
289 /**
290  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
291  *
292  * This function is called by all worker threads when they end (not including
293  * the controller).
294  *
295  * \param parmap a parmap
296  */
297 static void xbt_parmap_futex_end(xbt_parmap_t parmap)
298 {
299   unsigned int mycount;
300
301   mycount = __sync_add_and_fetch(&parmap->thread_counter, 1);
302   if (mycount == parmap->num_workers) {
303     /* all workers have finished, wake the controller */
304     parmap->done++;
305     futex_wake(&parmap->done, 1);
306   }
307 }
308
309 /**
310  * \brief Wakes all workers and waits for them to finish the tasks.
311  *
312  * This function is called by the controller thread.
313  *
314  * \param parmap a parmap
315  */
316 static void xbt_parmap_futex_signal(xbt_parmap_t parmap)
317 {
318   int myflag = parmap->done;
319   parmap->thread_counter = 0;
320   parmap->work++;
321
322   /* wake all workers */
323   futex_wake(&parmap->work, parmap->num_workers);
324
325   if (parmap->status == XBT_PARMAP_WORK) {
326     /* also work myself */
327     void* work = xbt_parmap_next(parmap);
328     while (work != NULL) {
329       parmap->fun(work);
330       work = xbt_parmap_next(parmap);
331     }
332   }
333
334   unsigned int mycount = __sync_add_and_fetch(&parmap->thread_counter, 1);
335   if (mycount < parmap->num_workers) {
336     /* some workers have not finished yet */
337     futex_wait(&parmap->done, myflag);
338   }
339 }
340
341 /**
342  * \brief Waits for some work to process.
343  *
344  * This function is called by each worker thread (not including the controller)
345  * when it has no more work to do.
346  *
347  * \param parmap a parmap
348  */
349 static void xbt_parmap_futex_wait(xbt_parmap_t parmap)
350 {
351   int myflag;
352   unsigned int mycount;
353
354   myflag = parmap->work;
355   mycount = __sync_add_and_fetch(&parmap->thread_counter, 1);
356   if (mycount == parmap->num_workers) {
357     /* all workers have finished, wake the controller */
358     parmap->done++;
359     futex_wake(&parmap->done, 1);
360   }
361
362   /* wait for more work */
363   futex_wait(&parmap->work, myflag);
364 }
365 #endif
366
367 /**
368  * \brief Starts the parmap: waits for all workers to be ready and returns.
369  *
370  * This function is called by the controller thread.
371  *
372  * \param parmap a parmap
373  */
374 static void xbt_parmap_busy_start(xbt_parmap_t parmap)
375 {
376   __sync_fetch_and_add(&parmap->thread_counter, 1);
377   while (parmap->thread_counter < parmap->num_workers) {
378     xbt_os_thread_yield();
379   }
380 }
381
382 /**
383  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
384  *
385  * This function is called by all worker threads when they end.
386  *
387  * \param parmap a parmap
388  */
389 static void xbt_parmap_busy_end(xbt_parmap_t parmap)
390 {
391   __sync_add_and_fetch(&parmap->thread_counter, 1);
392 }
393
394 /**
395  * \brief Wakes all workers and waits for them to finish the tasks.
396  *
397  * This function is called by the controller thread.
398  *
399  * \param parmap a parmap
400  */
401 static void xbt_parmap_busy_signal(xbt_parmap_t parmap)
402 {
403   parmap->thread_counter = 0;
404   parmap->work++;
405
406   if (parmap->status == XBT_PARMAP_WORK) {
407     /* also work myself */
408     void* work = xbt_parmap_next(parmap);
409     while (work != NULL) {
410       parmap->fun(work);
411       work = xbt_parmap_next(parmap);
412     }
413   }
414
415   /* I have finished, wait for the others */
416   __sync_add_and_fetch(&parmap->thread_counter, 1);
417   while (parmap->thread_counter < parmap->num_workers) {
418     xbt_os_thread_yield();
419   }
420 }
421
422 /**
423  * \brief Waits for some work to process.
424  *
425  * This function is called by each worker thread (not including the controller)
426  * when it has no more work to do.
427  *
428  * \param parmap a parmap
429  */
430 static void xbt_parmap_busy_wait(xbt_parmap_t parmap)
431 {
432   int work = parmap->work;
433   __sync_add_and_fetch(&parmap->thread_counter, 1);
434
435   /* wait for more work */
436   while (parmap->work == work) {
437     xbt_os_thread_yield();
438   }
439 }
440
441 #ifdef SIMGRID_TEST
442 #include "xbt.h"
443 #include "xbt/ex.h"
444
445 XBT_TEST_SUITE("parmap", "Parallel Map");
446 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(xbt_parmap_unit);
447
448 xbt_parmap_t parmap;
449
450 void fun(void *arg);
451
452 void fun(void *arg)
453 {
454   //XBT_INFO("I'm job %lu", (unsigned long)arg);
455 }
456
457 XBT_TEST_UNIT("basic", test_parmap_basic, "Basic usage")
458 {
459   xbt_test_add("Create the parmap");
460
461   unsigned long i, j;
462   xbt_dynar_t data = xbt_dynar_new(sizeof(void *), NULL);
463
464   /* Create the parallel map */
465 #ifdef HAVE_FUTEX_H
466   parmap = xbt_parmap_new(10, XBT_PARMAP_FUTEX);
467 #else
468   parmap = xbt_parmap_new(10, XBT_PARMAP_BUSY_WAIT);
469 #endif
470   for (j = 0; j < 100; j++) {
471     xbt_dynar_push_as(data, void *, (void *)j);
472   }
473
474   for (i = 0; i < 5; i++) {
475     xbt_parmap_apply(parmap, fun, data);
476   }
477
478   /* Destroy the parmap */
479   xbt_parmap_destroy(parmap);
480   xbt_dynar_free(&data);
481 }
482
483 #endif /* SIMGRID_TEST */