Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
remove a couple of MSC_VER stuff
[simgrid.git] / src / xbt / parmap.cpp
1 /* Copyright (c) 2004-2005, 2007, 2009-2014. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include <atomic>
8
9 #include "src/internal_config.h"
10 #if HAVE_UNISTD_H
11 #include <unistd.h>
12 #endif
13
14 #ifndef _WIN32
15 #include <sys/syscall.h>
16 #endif
17
18 #if HAVE_FUTEX_H
19 #include <linux/futex.h>
20 #include <limits.h>
21 #endif
22
23 #include "xbt/parmap.h"
24 #include "xbt/log.h"
25 #include "xbt/function_types.h"
26 #include "xbt/dynar.h"
27 #include "xbt/xbt_os_thread.h"
28 #include "xbt/sysdep.h"
29 #include "src/simix/smx_private.h"
30 #include "src/simix/smx_private.hpp"
31
32 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_parmap, xbt, "parmap: parallel map");
33
34 typedef enum {
35   XBT_PARMAP_WORK,
36   XBT_PARMAP_DESTROY
37 } e_xbt_parmap_flag_t;
38
39 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode);
40 static void *xbt_parmap_worker_main(void *parmap);
41 static void xbt_parmap_work(xbt_parmap_t parmap);
42
43 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap);
44 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap);
45 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap);
46 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round);
47
48 #if HAVE_FUTEX_H
49 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap);
50 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap);
51 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap);
52 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round);
53 static void futex_wait(unsigned *uaddr, unsigned val);
54 static void futex_wake(unsigned *uaddr, unsigned val);
55 #endif
56
57 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap);
58 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap);
59 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap);
60 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round);
61
62 #if HAVE_MC
63 static void xbt_parmap_mc_work(xbt_parmap_t parmap, int worker_id);
64 static void *xbt_parmap_mc_worker_main(void *arg);
65 #endif
66
67 /**
68  * \brief Parallel map structure
69  */
70 typedef struct s_xbt_parmap {
71   e_xbt_parmap_flag_t status;      /**< is the parmap active or being destroyed? */
72   unsigned work;                   /**< index of the current round */
73   unsigned thread_counter;         /**< number of workers that have done the work */
74
75   unsigned int num_workers;        /**< total number of worker threads including the controller */
76   xbt_os_thread_t *workers;        /**< worker thread handlers */
77   void_f_pvoid_t fun;              /**< function to run in parallel on each element of data */
78   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
79   std::atomic<unsigned int> index; /**< index of the next element of data to pick */
80
81 #if HAVE_MC
82   int finish;
83   void* ref_snapshot;
84   int_f_pvoid_pvoid_t snapshot_compare;
85   unsigned int length;
86   void* mc_data;
87 #endif
88
89   /* posix only */
90   xbt_os_cond_t ready_cond;
91   xbt_os_mutex_t ready_mutex;
92   xbt_os_cond_t done_cond;
93   xbt_os_mutex_t done_mutex;
94
95   /* fields that depend on the synchronization mode */
96   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
97   void (*master_wait_f)(xbt_parmap_t);    /**< wait for the workers to have done the work */
98   void (*worker_signal_f)(xbt_parmap_t);  /**< signal the master that a worker has done the work */
99   void (*master_signal_f)(xbt_parmap_t);  /**< wakes the workers threads to process tasks */
100   void (*worker_wait_f)(xbt_parmap_t, unsigned); /**< waits for more work */
101 } s_xbt_parmap_t;
102
103 /**
104  * \brief Thread data transmission structure
105  */
106 typedef struct s_xbt_parmap_thread_data{
107   xbt_parmap_t parmap;
108   int worker_id;
109 } s_xbt_parmap_thread_data_t;
110
111 typedef s_xbt_parmap_thread_data_t *xbt_parmap_thread_data_t;
112
113 /**
114  * \brief Creates a parallel map object
115  * \param num_workers number of worker threads to create
116  * \param mode how to synchronize the worker threads
117  * \return the parmap created
118  */
119 xbt_parmap_t xbt_parmap_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
120 {
121   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
122
123   /* Initialize the thread pool data structure */
124   xbt_parmap_t parmap = new s_xbt_parmap_t();
125   parmap->workers = xbt_new(xbt_os_thread_t, num_workers);
126
127   parmap->num_workers = num_workers;
128   parmap->status = XBT_PARMAP_WORK;
129   xbt_parmap_set_mode(parmap, mode);
130
131   /* Create the pool of worker threads */
132   xbt_parmap_thread_data_t data;
133   parmap->workers[0] = NULL;
134 #if HAVE_PTHREAD_SETAFFINITY
135   int core_bind = 0;
136 #endif  
137   for (unsigned int i = 1; i < num_workers; i++) {
138     data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
139     data->parmap = parmap;
140     data->worker_id = i;
141     parmap->workers[i] = xbt_os_thread_create(NULL, xbt_parmap_worker_main, data, NULL);
142 #if HAVE_PTHREAD_SETAFFINITY
143     xbt_os_thread_bind(parmap->workers[i], core_bind); 
144     if (core_bind != xbt_os_get_numcores())
145       core_bind++;
146     else
147       core_bind = 0; 
148 #endif
149   }
150   return parmap;
151 }
152
153 #if HAVE_MC
154 /**
155  * \brief Creates a parallel map object
156  * \param num_workers number of worker threads to create
157  * \param mode how to synchronize the worker threads
158  * \return the parmap created
159  */
160 xbt_parmap_t xbt_parmap_mc_new(unsigned int num_workers, e_xbt_parmap_mode_t mode)
161 {
162   unsigned int i;
163
164   XBT_DEBUG("Create new parmap (%u workers)", num_workers);
165
166   /* Initialize the thread pool data structure */
167   xbt_parmap_t parmap = new s_xbt_parmap_t();
168   parmap->workers = xbt_new(xbt_os_thread_t, num_workers);
169
170   parmap->num_workers = num_workers;
171   parmap->status = XBT_PARMAP_WORK;
172   xbt_parmap_set_mode(parmap, mode);
173
174   /* Create the pool of worker threads */
175   xbt_parmap_thread_data_t data;
176   parmap->workers[0] = NULL;
177   for (i = 1; i < num_workers; i++) {
178     data = xbt_new0(s_xbt_parmap_thread_data_t, 1);
179     data->parmap = parmap;
180     data->worker_id = i;
181     parmap->workers[i] = xbt_os_thread_create(NULL, xbt_parmap_mc_worker_main, data, NULL);
182   }
183   return parmap;
184 }
185 #endif
186
187 /**
188  * \brief Destroys a parmap
189  * \param parmap the parmap to destroy
190  */
191 void xbt_parmap_destroy(xbt_parmap_t parmap)
192 {
193   if (!parmap) {
194     return;
195   }
196
197   parmap->status = XBT_PARMAP_DESTROY;
198   parmap->master_signal_f(parmap);
199
200   unsigned int i;
201   for (i = 1; i < parmap->num_workers; i++)
202     xbt_os_thread_join(parmap->workers[i], NULL);
203
204   xbt_os_cond_destroy(parmap->ready_cond);
205   xbt_os_mutex_destroy(parmap->ready_mutex);
206   xbt_os_cond_destroy(parmap->done_cond);
207   xbt_os_mutex_destroy(parmap->done_mutex);
208
209   xbt_free(parmap->workers);
210   delete parmap;
211 }
212
213 /**
214  * \brief Sets the synchronization mode of a parmap.
215  * \param parmap a parallel map object
216  * \param mode the synchronization mode
217  */
218 static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
219 {
220   if (mode == XBT_PARMAP_DEFAULT) {
221 #if HAVE_FUTEX_H
222     mode = XBT_PARMAP_FUTEX;
223 #else
224     mode = XBT_PARMAP_POSIX;
225 #endif
226   }
227   parmap->mode = mode;
228
229   switch (mode) {
230     case XBT_PARMAP_POSIX:
231       parmap->master_wait_f = xbt_parmap_posix_master_wait;
232       parmap->worker_signal_f = xbt_parmap_posix_worker_signal;
233       parmap->master_signal_f = xbt_parmap_posix_master_signal;
234       parmap->worker_wait_f = xbt_parmap_posix_worker_wait;
235
236       parmap->ready_cond = xbt_os_cond_init();
237       parmap->ready_mutex = xbt_os_mutex_init();
238       parmap->done_cond = xbt_os_cond_init();
239       parmap->done_mutex = xbt_os_mutex_init();
240       break;
241     case XBT_PARMAP_FUTEX:
242 #if HAVE_FUTEX_H
243       parmap->master_wait_f = xbt_parmap_futex_master_wait;
244       parmap->worker_signal_f = xbt_parmap_futex_worker_signal;
245       parmap->master_signal_f = xbt_parmap_futex_master_signal;
246       parmap->worker_wait_f = xbt_parmap_futex_worker_wait;
247
248       xbt_os_cond_destroy(parmap->ready_cond);
249       xbt_os_mutex_destroy(parmap->ready_mutex);
250       xbt_os_cond_destroy(parmap->done_cond);
251       xbt_os_mutex_destroy(parmap->done_mutex);
252       break;
253 #else
254       xbt_die("Futex is not available on this OS.");
255 #endif
256     case XBT_PARMAP_BUSY_WAIT:
257       parmap->master_wait_f = xbt_parmap_busy_master_wait;
258       parmap->worker_signal_f = xbt_parmap_busy_worker_signal;
259       parmap->master_signal_f = xbt_parmap_busy_master_signal;
260       parmap->worker_wait_f = xbt_parmap_busy_worker_wait;
261
262       xbt_os_cond_destroy(parmap->ready_cond);
263       xbt_os_mutex_destroy(parmap->ready_mutex);
264       xbt_os_cond_destroy(parmap->done_cond);
265       xbt_os_mutex_destroy(parmap->done_mutex);
266       break;
267     case XBT_PARMAP_DEFAULT:
268       THROW_IMPOSSIBLE;
269       break;
270   }
271 }
272
273 /**
274  * \brief Applies a list of tasks in parallel.
275  * \param parmap a parallel map object
276  * \param fun the function to call in parallel
277  * \param data each element of this dynar will be passed as an argument to fun
278  */
279 void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
280 {
281   /* Assign resources to worker threads (we are maestro here)*/
282   parmap->fun = fun;
283   parmap->data = data;
284   parmap->index = 0;
285   parmap->master_signal_f(parmap); // maestro runs futex_wait to wake all the minions (the working threads)
286   xbt_parmap_work(parmap);         // maestro works with its minions
287   parmap->master_wait_f(parmap);   // When there is no more work to do, then maestro waits for the last minion to stop
288   XBT_DEBUG("Job done");           //   ... and proceeds
289 }
290
291 /**
292  * \brief Returns a next task to process.
293  *
294  * Worker threads call this function to get more work.
295  *
296  * \return the next task to process, or NULL if there is no more work
297  */
298 void* xbt_parmap_next(xbt_parmap_t parmap)
299 {
300   unsigned int index = parmap->index++;
301   if (index < xbt_dynar_length(parmap->data)) {
302     return xbt_dynar_get_as(parmap->data, index, void*);
303   }
304   return NULL;
305 }
306
307 static void xbt_parmap_work(xbt_parmap_t parmap)
308 {
309   unsigned index;
310   while ((index = parmap->index++) < xbt_dynar_length(parmap->data))
311     parmap->fun(xbt_dynar_get_as(parmap->data, index, void*));
312 }
313
314 /**
315  * \brief Main function of a worker thread.
316  * \param arg the parmap
317  */
318 static void *xbt_parmap_worker_main(void *arg)
319 {
320   xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
321   xbt_parmap_t parmap = data->parmap;
322   unsigned round = 0;
323   smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL);
324   SIMIX_context_set_current(context);
325
326   XBT_DEBUG("New worker thread created");
327
328   /* Worker's main loop */
329   while (1) {
330     parmap->worker_wait_f(parmap, ++round);
331     if (parmap->status == XBT_PARMAP_WORK) {
332       XBT_DEBUG("Worker %d got a job", data->worker_id);
333
334       xbt_parmap_work(parmap);
335       parmap->worker_signal_f(parmap);
336
337       XBT_DEBUG("Worker %d has finished", data->worker_id);
338     /* We are destroying the parmap */
339     } else {
340       SIMIX_context_free(context);
341       xbt_free(data);
342       return NULL;
343     }
344   }
345 }
346
347 #if HAVE_MC
348 /**
349  * \brief Applies a list of tasks in parallel.
350  * \param parmap a parallel map object
351  * \param fun the function to call in parallel
352  * \param data each element of this dynar will be passed as an argument to fun
353  */
354 int xbt_parmap_mc_apply(xbt_parmap_t parmap, int_f_pvoid_pvoid_t fun, 
355                          void* data, unsigned int length,  void* ref_snapshot)
356 {
357   /* Assign resources to worker threads */
358   parmap->snapshot_compare = fun;
359   parmap->mc_data = data;
360   parmap->index = 0;
361   parmap->finish = -1;
362   parmap->length = length;
363   parmap->ref_snapshot = ref_snapshot;
364   parmap->master_signal_f(parmap);
365   xbt_parmap_mc_work(parmap, 0);
366   parmap->master_wait_f(parmap);
367   XBT_DEBUG("Job done");
368   return parmap->finish;
369 }
370
371 static void xbt_parmap_mc_work(xbt_parmap_t parmap, int worker_id)
372 {
373   unsigned int data_size = (parmap->length / parmap->num_workers) + ((parmap->length % parmap->num_workers) ? 1 :0);
374   void* start = (char*)parmap->mc_data + (data_size*worker_id*sizeof(void*));
375   void* end = MIN((char *)start + data_size* sizeof(void*), (char*)parmap->mc_data + parmap->length*sizeof(void*));
376
377   //XBT_CRITICAL("Worker %d : %p -> %p (%d)", worker_id, start, end, data_size);
378
379   while ( start < end && parmap->finish == -1) {
380     //XBT_CRITICAL("Starting with %p", start);
381     int res = parmap->snapshot_compare(*(void**)start, parmap->ref_snapshot);
382     start = (char *)start + sizeof(start);
383     if (!res){
384       parmap->finish = ((char*)start - (char*)parmap->mc_data) / sizeof(void*);
385       //XBT_CRITICAL("Find good one %p (%p)", start, parmap->mc_data);
386       break;
387     }
388   }
389 }
390
391 /**
392  * \brief Main function of a worker thread.
393  * \param arg the parmap
394  */
395 static void *xbt_parmap_mc_worker_main(void *arg)
396 {
397   xbt_parmap_thread_data_t data = (xbt_parmap_thread_data_t) arg;
398   xbt_parmap_t parmap = data->parmap;
399   unsigned round = 0;
400   /* smx_context_t context = SIMIX_context_new(NULL, 0, NULL, NULL, NULL); */
401   /* SIMIX_context_set_current(context); */
402
403   XBT_DEBUG("New worker thread created");
404
405   /* Worker's main loop */
406   while (1) {
407     parmap->worker_wait_f(parmap, ++round);
408     if (parmap->status == XBT_PARMAP_WORK) {
409       XBT_DEBUG("Worker %d got a job", data->worker_id);
410
411       xbt_parmap_mc_work(parmap, data->worker_id);
412       parmap->worker_signal_f(parmap);
413
414       XBT_DEBUG("Worker %d has finished", data->worker_id);
415     /* We are destroying the parmap */
416     } else {
417       xbt_free(data);
418       return NULL;
419     }
420   }
421 }
422 #endif
423
424 #if HAVE_FUTEX_H
425 static void futex_wait(unsigned *uaddr, unsigned val)
426 {
427   XBT_VERB("Waiting on futex %p", uaddr);
428   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, NULL, NULL, 0);
429 }
430
431 static void futex_wake(unsigned *uaddr, unsigned val)
432 {
433   XBT_VERB("Waking futex %p", uaddr);
434   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
435 }
436 #endif
437
438 /**
439  * \brief Starts the parmap: waits for all workers to be ready and returns.
440  *
441  * This function is called by the controller thread.
442  *
443  * \param parmap a parmap
444  */
445 static void xbt_parmap_posix_master_wait(xbt_parmap_t parmap)
446 {
447   xbt_os_mutex_acquire(parmap->done_mutex);
448   if (parmap->thread_counter < parmap->num_workers) {
449     /* wait for all workers to be ready */
450     xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
451   }
452   xbt_os_mutex_release(parmap->done_mutex);
453 }
454
455 /**
456  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
457  *
458  * This function is called by all worker threads when they end (not including
459  * the controller).
460  *
461  * \param parmap a parmap
462  */
463 static void xbt_parmap_posix_worker_signal(xbt_parmap_t parmap)
464 {
465   xbt_os_mutex_acquire(parmap->done_mutex);
466   if (++parmap->thread_counter == parmap->num_workers) {
467     /* all workers have finished, wake the controller */
468     xbt_os_cond_signal(parmap->done_cond);
469   }
470   xbt_os_mutex_release(parmap->done_mutex);
471 }
472
473 /**
474  * \brief Wakes all workers and waits for them to finish the tasks.
475  *
476  * This function is called by the controller thread.
477  *
478  * \param parmap a parmap
479  */
480 static void xbt_parmap_posix_master_signal(xbt_parmap_t parmap)
481 {
482   xbt_os_mutex_acquire(parmap->ready_mutex);
483   parmap->thread_counter = 1;
484   parmap->work++;
485   /* wake all workers */
486   xbt_os_cond_broadcast(parmap->ready_cond);
487   xbt_os_mutex_release(parmap->ready_mutex);
488 }
489
490 /**
491  * \brief Waits for some work to process.
492  *
493  * This function is called by each worker thread (not including the controller)
494  * when it has no more work to do.
495  *
496  * \param parmap a parmap
497  * \param round  the expected round number
498  */
499 static void xbt_parmap_posix_worker_wait(xbt_parmap_t parmap, unsigned round)
500 {
501   xbt_os_mutex_acquire(parmap->ready_mutex);
502   /* wait for more work */
503   if (parmap->work != round) {
504     xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
505   }
506   xbt_os_mutex_release(parmap->ready_mutex);
507 }
508
509 #if HAVE_FUTEX_H
510 /**
511  * \brief Starts the parmap: waits for all workers to be ready and returns.
512  *
513  * This function is called by the controller thread.
514  *
515  * \param parmap a parmap
516  */
517 static void xbt_parmap_futex_master_wait(xbt_parmap_t parmap)
518 {
519   unsigned count = parmap->thread_counter;
520   while (count < parmap->num_workers) {
521     /* wait for all workers to be ready */
522     futex_wait(&parmap->thread_counter, count);
523     count = parmap->thread_counter;
524   }
525 }
526
527 /**
528  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
529  *
530  * This function is called by all worker threads when they end (not including
531  * the controller).
532  *
533  * \param parmap a parmap
534  */
535 static void xbt_parmap_futex_worker_signal(xbt_parmap_t parmap)
536 {
537   unsigned count = __sync_add_and_fetch(&parmap->thread_counter, 1);
538   if (count == parmap->num_workers) {
539     /* all workers have finished, wake the controller */
540     futex_wake(&parmap->thread_counter, INT_MAX);
541   }
542 }
543
544 /**
545  * \brief Wakes all workers and waits for them to finish the tasks.
546  *
547  * This function is called by the controller thread.
548  *
549  * \param parmap a parmap
550  */
551 static void xbt_parmap_futex_master_signal(xbt_parmap_t parmap)
552 {
553   parmap->thread_counter = 1;
554   __sync_add_and_fetch(&parmap->work, 1);
555   /* wake all workers */
556   futex_wake(&parmap->work, INT_MAX);
557 }
558
559 /**
560  * \brief Waits for some work to process.
561  *
562  * This function is called by each worker thread (not including the controller)
563  * when it has no more work to do.
564  *
565  * \param parmap a parmap
566  * \param round  the expected round number
567  */
568 static void xbt_parmap_futex_worker_wait(xbt_parmap_t parmap, unsigned round)
569 {
570   unsigned work = parmap->work;
571   /* wait for more work */
572   while (work != round) {
573     futex_wait(&parmap->work, work);
574     work = parmap->work;
575   }
576 }
577 #endif
578
579 /**
580  * \brief Starts the parmap: waits for all workers to be ready and returns.
581  *
582  * This function is called by the controller thread.
583  *
584  * \param parmap a parmap
585  */
586 static void xbt_parmap_busy_master_wait(xbt_parmap_t parmap)
587 {
588   while (parmap->thread_counter < parmap->num_workers) {
589     xbt_os_thread_yield();
590   }
591 }
592
593 /**
594  * \brief Ends the parmap: wakes the controller thread when all workers terminate.
595  *
596  * This function is called by all worker threads when they end.
597  *
598  * \param parmap a parmap
599  */
600 static void xbt_parmap_busy_worker_signal(xbt_parmap_t parmap)
601 {
602   __sync_add_and_fetch(&parmap->thread_counter, 1);
603 }
604
605 /**
606  * \brief Wakes all workers and waits for them to finish the tasks.
607  *
608  * This function is called by the controller thread.
609  *
610  * \param parmap a parmap
611  */
612 static void xbt_parmap_busy_master_signal(xbt_parmap_t parmap)
613 {
614   parmap->thread_counter = 1;
615   __sync_add_and_fetch(&parmap->work, 1);
616 }
617
618 /**
619  * \brief Waits for some work to process.
620  *
621  * This function is called by each worker thread (not including the controller)
622  * when it has no more work to do.
623  *
624  * \param parmap a parmap
625  * \param round  the expected round number
626  */
627 static void xbt_parmap_busy_worker_wait(xbt_parmap_t parmap, unsigned round)
628 {
629   /* wait for more work */
630   while (parmap->work != round) {
631     xbt_os_thread_yield();
632   }
633 }