Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of github.com:simgrid/simgrid into s_type_cleanup
[simgrid.git] / src / include / xbt / parmap.hpp
1 /* A thread pool (C++ version).                                             */
2
3 /* Copyright (c) 2004-2017 The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #ifndef XBT_PARMAP_HPP
10 #define XBT_PARMAP_HPP
11
12 #include "src/internal_config.h" // HAVE_FUTEX_H
13 #include "src/kernel/context/Context.hpp"
14 #include <atomic>
15 #include <boost/optional.hpp>
16 #include <simgrid/simix.h>
17 #include <vector>
18 #include <xbt/log.h>
19 #include <xbt/parmap.h>
20 #include <xbt/xbt_os_thread.h>
21
22 #if HAVE_FUTEX_H
23 #include <limits>
24 #include <linux/futex.h>
25 #include <sys/syscall.h>
26 #endif
27
28 XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap);
29
30 namespace simgrid {
31 namespace xbt {
32
33 /** \addtogroup XBT_parmap
34   * \ingroup XBT_misc
35   * \brief Parallel map class
36   * \{
37   */
38 template <typename T> class Parmap {
39 public:
40   Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode);
41   Parmap(const Parmap&) = delete;
42   ~Parmap();
43   void apply(void (*fun)(T), const std::vector<T>& data);
44   boost::optional<T> next();
45
46 private:
47   enum Flag { PARMAP_WORK, PARMAP_DESTROY };
48
49   /**
50    * \brief Thread data transmission structure
51    */
52   class ThreadData {
53   public:
54     ThreadData(Parmap<T>& parmap, int id) : parmap(parmap), worker_id(id) {}
55     Parmap<T>& parmap;
56     int worker_id;
57   };
58
59   /**
60    * \brief Synchronization object (different specializations).
61    */
62   class Synchro {
63   public:
64     explicit Synchro(Parmap<T>& parmap) : parmap(parmap) {}
65     virtual ~Synchro() {}
66     /**
67      * \brief Wakes all workers and waits for them to finish the tasks.
68      *
69      * This function is called by the controller thread.
70      */
71     virtual void master_signal()       = 0;
72     /**
73      * \brief Starts the parmap: waits for all workers to be ready and returns.
74      *
75      * This function is called by the controller thread.
76      */
77     virtual void master_wait()         = 0;
78     /**
79      * \brief Ends the parmap: wakes the controller thread when all workers terminate.
80      *
81      * This function is called by all worker threads when they end (not including the controller).
82      */
83     virtual void worker_signal()       = 0;
84     /**
85      * \brief Waits for some work to process.
86      *
87      * This function is called by each worker thread (not including the controller) when it has no more work to do.
88      *
89      * \param round  the expected round number
90      */
91     virtual void worker_wait(unsigned) = 0;
92
93   protected:
94     Parmap<T>& parmap;
95   };
96
97   class PosixSynchro : public Synchro {
98   public:
99     explicit PosixSynchro(Parmap<T>& parmap);
100     ~PosixSynchro();
101     void master_signal();
102     void master_wait();
103     void worker_signal();
104     void worker_wait(unsigned round);
105
106   private:
107     xbt_os_cond_t ready_cond;
108     xbt_os_mutex_t ready_mutex;
109     xbt_os_cond_t done_cond;
110     xbt_os_mutex_t done_mutex;
111   };
112
113 #if HAVE_FUTEX_H
114   class FutexSynchro : public Synchro {
115   public:
116     explicit FutexSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
117     void master_signal();
118     void master_wait();
119     void worker_signal();
120     void worker_wait(unsigned);
121
122   private:
123     static void futex_wait(unsigned* uaddr, unsigned val);
124     static void futex_wake(unsigned* uaddr, unsigned val);
125   };
126 #endif
127
128   class BusyWaitSynchro : public Synchro {
129   public:
130     explicit BusyWaitSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
131     void master_signal();
132     void master_wait();
133     void worker_signal();
134     void worker_wait(unsigned);
135   };
136
137   static void* worker_main(void* arg);
138   Synchro* new_synchro(e_xbt_parmap_mode_t mode);
139   void work();
140
141   Flag status;              /**< is the parmap active or being destroyed? */
142   unsigned work_round;      /**< index of the current round */
143   xbt_os_thread_t* workers; /**< worker thread handlers */
144   unsigned num_workers;     /**< total number of worker threads including the controller */
145   Synchro* synchro;         /**< synchronization object */
146
147   unsigned thread_counter    = 0;       /**< number of workers that have done the work */
148   void (*fun)(const T)       = nullptr; /**< function to run in parallel on each element of data */
149   const std::vector<T>* data = nullptr; /**< parameters to pass to fun in parallel */
150   std::atomic<unsigned> index;          /**< index of the next element of data to pick */
151 };
152
153 /**
154  * \brief Creates a parallel map object
155  * \param num_workers number of worker threads to create
156  * \param mode how to synchronize the worker threads
157  */
158 template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
159 {
160   XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers);
161
162   /* Initialize the thread pool data structure */
163   this->status      = PARMAP_WORK;
164   this->work_round  = 0;
165   this->workers     = new xbt_os_thread_t[num_workers];
166   this->num_workers = num_workers;
167   this->synchro     = new_synchro(mode);
168
169   /* Create the pool of worker threads */
170   this->workers[0] = nullptr;
171 #if HAVE_PTHREAD_SETAFFINITY
172   int core_bind = 0;
173 #endif
174   for (unsigned i = 1; i < num_workers; i++) {
175     ThreadData* data = new ThreadData(*this, i);
176     this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr);
177 #if HAVE_PTHREAD_SETAFFINITY
178     xbt_os_thread_bind(this->workers[i], core_bind);
179     if (core_bind != xbt_os_get_numcores() - 1)
180       core_bind++;
181     else
182       core_bind = 0;
183 #endif
184   }
185 }
186
187 /**
188  * \brief Destroys a parmap
189  */
190 template <typename T> Parmap<T>::~Parmap()
191 {
192   status = PARMAP_DESTROY;
193   synchro->master_signal();
194
195   for (unsigned i = 1; i < num_workers; i++)
196     xbt_os_thread_join(workers[i], nullptr);
197
198   delete[] workers;
199   delete synchro;
200 }
201
202 /**
203  * \brief Applies a list of tasks in parallel.
204  * \param fun the function to call in parallel
205  * \param data each element of this vector will be passed as an argument to fun
206  */
207 template <typename T> void Parmap<T>::apply(void (*fun)(T), const std::vector<T>& data)
208 {
209   /* Assign resources to worker threads (we are maestro here)*/
210   this->fun   = fun;
211   this->data  = &data;
212   this->index = 0;
213   this->synchro->master_signal(); // maestro runs futex_wait to wake all the minions (the working threads)
214   this->work();                   // maestro works with its minions
215   this->synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
216   XBT_CDEBUG(xbt_parmap, "Job done"); //   ... and proceeds
217 }
218
219 /**
220  * \brief Returns a next task to process.
221  *
222  * Worker threads call this function to get more work.
223  *
224  * \return the next task to process, or throws a std::out_of_range exception if there is no more work
225  */
226 template <typename T> boost::optional<T> Parmap<T>::next()
227 {
228   unsigned index = this->index++;
229   if (index < this->data->size())
230     return (*this->data)[index];
231   else
232     return boost::none;
233 }
234
235 /**
236  * \brief Main work loop: applies fun to elements in turn.
237  */
238 template <typename T> void Parmap<T>::work()
239 {
240   unsigned index = this->index++;
241   unsigned length = this->data->size();
242   while (index < length) {
243     this->fun((*this->data)[index]);
244     index = this->index++;
245   }
246 }
247
248 /**
249  * Get a synchronization object for given mode.
250  * \param mode the synchronization mode
251  */
252 template <typename T> typename Parmap<T>::Synchro* Parmap<T>::new_synchro(e_xbt_parmap_mode_t mode)
253 {
254   if (mode == XBT_PARMAP_DEFAULT) {
255 #if HAVE_FUTEX_H
256     mode = XBT_PARMAP_FUTEX;
257 #else
258     mode = XBT_PARMAP_POSIX;
259 #endif
260   }
261   Synchro* res;
262   switch (mode) {
263     case XBT_PARMAP_POSIX:
264       res = new PosixSynchro(*this);
265       break;
266     case XBT_PARMAP_FUTEX:
267 #if HAVE_FUTEX_H
268       res = new FutexSynchro(*this);
269 #else
270       xbt_die("Fute is not available on this OS.");
271 #endif
272       break;
273     case XBT_PARMAP_BUSY_WAIT:
274       res = new BusyWaitSynchro(*this);
275       break;
276     default:
277       THROW_IMPOSSIBLE;
278   }
279   return res;
280 }
281
282 /**
283  * \brief Main function of a worker thread.
284  */
285 template <typename T> void* Parmap<T>::worker_main(void* arg)
286 {
287   ThreadData* data      = static_cast<ThreadData*>(arg);
288   Parmap<T>& parmap     = data->parmap;
289   unsigned round        = 0;
290   smx_context_t context = SIMIX_context_new(std::function<void()>(), nullptr, nullptr);
291   SIMIX_context_set_current(context);
292
293   XBT_CDEBUG(xbt_parmap, "New worker thread created");
294
295   /* Worker's main loop */
296   while (1) {
297     round++;
298     parmap.synchro->worker_wait(round);
299     if (parmap.status == PARMAP_DESTROY)
300       break;
301
302     XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id);
303     parmap.work();
304     parmap.synchro->worker_signal();
305     XBT_CDEBUG(xbt_parmap, "Worker %d has finished", data->worker_id);
306   }
307   /* We are destroying the parmap */
308   delete context;
309   delete data;
310   return nullptr;
311 }
312
313 template <typename T> Parmap<T>::PosixSynchro::PosixSynchro(Parmap<T>& parmap) : Synchro(parmap)
314 {
315   ready_cond  = xbt_os_cond_init();
316   ready_mutex = xbt_os_mutex_init();
317   done_cond   = xbt_os_cond_init();
318   done_mutex  = xbt_os_mutex_init();
319 }
320
321 template <typename T> Parmap<T>::PosixSynchro::~PosixSynchro()
322 {
323   xbt_os_cond_destroy(ready_cond);
324   xbt_os_mutex_destroy(ready_mutex);
325   xbt_os_cond_destroy(done_cond);
326   xbt_os_mutex_destroy(done_mutex);
327 }
328
329 template <typename T> void Parmap<T>::PosixSynchro::master_signal()
330 {
331   xbt_os_mutex_acquire(ready_mutex);
332   this->parmap.thread_counter = 1;
333   this->parmap.work_round++;
334   /* wake all workers */
335   xbt_os_cond_broadcast(ready_cond);
336   xbt_os_mutex_release(ready_mutex);
337 }
338
339 template <typename T> void Parmap<T>::PosixSynchro::master_wait()
340 {
341   xbt_os_mutex_acquire(done_mutex);
342   if (this->parmap.thread_counter < this->parmap.num_workers) {
343     /* wait for all workers to be ready */
344     xbt_os_cond_wait(done_cond, done_mutex);
345   }
346   xbt_os_mutex_release(done_mutex);
347 }
348
349 template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
350 {
351   xbt_os_mutex_acquire(done_mutex);
352   this->parmap.thread_counter++;
353   if (this->parmap.thread_counter == this->parmap.num_workers) {
354     /* all workers have finished, wake the controller */
355     xbt_os_cond_signal(done_cond);
356   }
357   xbt_os_mutex_release(done_mutex);
358 }
359
360 template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned round)
361 {
362   xbt_os_mutex_acquire(ready_mutex);
363   /* wait for more work */
364   if (this->parmap.work_round != round) {
365     xbt_os_cond_wait(ready_cond, ready_mutex);
366   }
367   xbt_os_mutex_release(ready_mutex);
368 }
369
370 #if HAVE_FUTEX_H
371 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val)
372 {
373   XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr);
374   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0);
375 }
376
377 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val)
378 {
379   XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr);
380   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0);
381 }
382
383 template <typename T> void Parmap<T>::FutexSynchro::master_signal()
384 {
385   this->parmap.thread_counter = 1;
386   __sync_add_and_fetch(&this->parmap.work_round, 1);
387   /* wake all workers */
388   futex_wake(&this->parmap.work_round, std::numeric_limits<int>::max());
389 }
390
391 template <typename T> void Parmap<T>::FutexSynchro::master_wait()
392 {
393   unsigned count = this->parmap.thread_counter;
394   while (count < this->parmap.num_workers) {
395     /* wait for all workers to be ready */
396     futex_wait(&this->parmap.thread_counter, count);
397     count = this->parmap.thread_counter;
398   }
399 }
400
401 template <typename T> void Parmap<T>::FutexSynchro::worker_signal()
402 {
403   unsigned count = __sync_add_and_fetch(&this->parmap.thread_counter, 1);
404   if (count == this->parmap.num_workers) {
405     /* all workers have finished, wake the controller */
406     futex_wake(&this->parmap.thread_counter, std::numeric_limits<int>::max());
407   }
408 }
409
410 template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned round)
411 {
412   unsigned work_round = this->parmap.work_round;
413   /* wait for more work */
414   while (work_round != round) {
415     futex_wait(&this->parmap.work_round, work_round);
416     work_round = this->parmap.work_round;
417   }
418 }
419 #endif
420
421 template <typename T> void Parmap<T>::BusyWaitSynchro::master_signal()
422 {
423   this->parmap.thread_counter = 1;
424   __sync_add_and_fetch(&this->parmap.work_round, 1);
425 }
426
427 template <typename T> void Parmap<T>::BusyWaitSynchro::master_wait()
428 {
429   while (this->parmap.thread_counter < this->parmap.num_workers) {
430     xbt_os_thread_yield();
431   }
432 }
433
434 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_signal()
435 {
436   __sync_add_and_fetch(&this->parmap.thread_counter, 1);
437 }
438
439 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_wait(unsigned round)
440 {
441   /* wait for more work */
442   while (this->parmap.work_round != round) {
443     xbt_os_thread_yield();
444   }
445 }
446
447 /** \} */
448 }
449 }
450
451 #endif