Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Please codacy (explicit constructor).
[simgrid.git] / src / include / xbt / parmap.hpp
1 /* A thread pool (C++ version).                                             */
2
3 /* Copyright (c) 2004-2017 The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #ifndef XBT_PARMAP_HPP
10 #define XBT_PARMAP_HPP
11
12 #include "src/internal_config.h" // HAVE_FUTEX_H
13 #include "src/kernel/context/Context.hpp"
14 #include <atomic>
15 #include <boost/optional.hpp>
16 #include <simgrid/simix.h>
17 #include <vector>
18 #include <xbt/log.h>
19 #include <xbt/parmap.h>
20 #include <xbt/xbt_os_thread.h>
21
22 #if HAVE_FUTEX_H
23 #include <limits>
24 #include <linux/futex.h>
25 #include <sys/syscall.h>
26 #endif
27
28 XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap);
29
30 namespace simgrid {
31 namespace xbt {
32
33 /** \addtogroup XBT_parmap
34   * \ingroup XBT_misc
35   * \brief Parallel map class
36   * \{
37   */
38 template <typename T> class Parmap {
39 public:
40   Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode);
41   ~Parmap();
42   void apply(void (*fun)(T), const std::vector<T>& data);
43   boost::optional<T> next();
44
45 private:
46   enum Flag { PARMAP_WORK, PARMAP_DESTROY };
47
48   /**
49    * \brief Thread data transmission structure
50    */
51   class ThreadData {
52   public:
53     ThreadData(Parmap<T>& parmap, int id) : parmap(parmap), worker_id(id) {}
54     Parmap<T>& parmap;
55     int worker_id;
56   };
57
58   /**
59    * \brief Synchronization object (different specializations).
60    */
61   class Synchro {
62   public:
63     explicit Synchro(Parmap<T>& parmap) : parmap(parmap) {}
64     virtual ~Synchro() {}
65     /**
66      * \brief Wakes all workers and waits for them to finish the tasks.
67      *
68      * This function is called by the controller thread.
69      */
70     virtual void master_signal()       = 0;
71     /**
72      * \brief Starts the parmap: waits for all workers to be ready and returns.
73      *
74      * This function is called by the controller thread.
75      */
76     virtual void master_wait()         = 0;
77     /**
78      * \brief Ends the parmap: wakes the controller thread when all workers terminate.
79      *
80      * This function is called by all worker threads when they end (not including the controller).
81      */
82     virtual void worker_signal()       = 0;
83     /**
84      * \brief Waits for some work to process.
85      *
86      * This function is called by each worker thread (not including the controller) when it has no more work to do.
87      *
88      * \param round  the expected round number
89      */
90     virtual void worker_wait(unsigned) = 0;
91
92   protected:
93     Parmap<T>& parmap;
94   };
95
96   class PosixSynchro : public Synchro {
97   public:
98     explicit PosixSynchro(Parmap<T>& parmap);
99     ~PosixSynchro();
100     void master_signal();
101     void master_wait();
102     void worker_signal();
103     void worker_wait(unsigned round);
104
105   private:
106     xbt_os_cond_t ready_cond;
107     xbt_os_mutex_t ready_mutex;
108     xbt_os_cond_t done_cond;
109     xbt_os_mutex_t done_mutex;
110   };
111
112 #if HAVE_FUTEX_H
113   class FutexSynchro : public Synchro {
114   public:
115     explicit FutexSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
116     void master_signal();
117     void master_wait();
118     void worker_signal();
119     void worker_wait(unsigned);
120
121   private:
122     static void futex_wait(unsigned* uaddr, unsigned val);
123     static void futex_wake(unsigned* uaddr, unsigned val);
124   };
125 #endif
126
127   class BusyWaitSynchro : public Synchro {
128   public:
129     explicit BusyWaitSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
130     void master_signal();
131     void master_wait();
132     void worker_signal();
133     void worker_wait(unsigned);
134   };
135
136   static void* worker_main(void* arg);
137   Synchro* new_synchro(e_xbt_parmap_mode_t mode);
138   void work();
139
140   Flag status;                 /**< is the parmap active or being destroyed? */
141   unsigned work_round;         /**< index of the current round */
142   unsigned thread_counter;     /**< number of workers that have done the work */
143   unsigned num_workers;        /**< total number of worker threads including the controller */
144   xbt_os_thread_t* workers;    /**< worker thread handlers */
145   void (*fun)(const T);        /**< function to run in parallel on each element of data */
146   const std::vector<T>* data;  /**< parameters to pass to fun in parallel */
147   std::atomic<unsigned> index; /**< index of the next element of data to pick */
148   Synchro* synchro;            /**< synchronization object */
149 };
150
151 /**
152  * \brief Creates a parallel map object
153  * \param num_workers number of worker threads to create
154  * \param mode how to synchronize the worker threads
155  */
156 template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
157 {
158   XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers);
159
160   /* Initialize the thread pool data structure */
161   this->status      = PARMAP_WORK;
162   this->work_round  = 0;
163   this->workers     = new xbt_os_thread_t[num_workers];
164   this->num_workers = num_workers;
165   this->synchro     = new_synchro(mode);
166
167   /* Create the pool of worker threads */
168   this->workers[0] = nullptr;
169 #if HAVE_PTHREAD_SETAFFINITY
170   int core_bind = 0;
171 #endif
172   for (unsigned i = 1; i < num_workers; i++) {
173     ThreadData* data = new ThreadData(*this, i);
174     this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr);
175 #if HAVE_PTHREAD_SETAFFINITY
176     xbt_os_thread_bind(this->workers[i], core_bind);
177     if (core_bind != xbt_os_get_numcores() - 1)
178       core_bind++;
179     else
180       core_bind = 0;
181 #endif
182   }
183 }
184
185 /**
186  * \brief Destroys a parmap
187  */
188 template <typename T> Parmap<T>::~Parmap()
189 {
190   status = PARMAP_DESTROY;
191   synchro->master_signal();
192
193   for (unsigned i = 1; i < num_workers; i++)
194     xbt_os_thread_join(workers[i], nullptr);
195
196   delete[] workers;
197   delete synchro;
198 }
199
200 /**
201  * \brief Applies a list of tasks in parallel.
202  * \param fun the function to call in parallel
203  * \param data each element of this vector will be passed as an argument to fun
204  */
205 template <typename T> void Parmap<T>::apply(void (*fun)(T), const std::vector<T>& data)
206 {
207   /* Assign resources to worker threads (we are maestro here)*/
208   this->fun   = fun;
209   this->data  = &data;
210   this->index = 0;
211   this->synchro->master_signal(); // maestro runs futex_wait to wake all the minions (the working threads)
212   this->work();                   // maestro works with its minions
213   this->synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
214   XBT_CDEBUG(xbt_parmap, "Job done"); //   ... and proceeds
215 }
216
217 /**
218  * \brief Returns a next task to process.
219  *
220  * Worker threads call this function to get more work.
221  *
222  * \return the next task to process, or throws a std::out_of_range exception if there is no more work
223  */
224 template <typename T> boost::optional<T> Parmap<T>::next()
225 {
226   unsigned index = this->index++;
227   if (index < this->data->size())
228     return (*this->data)[index];
229   else
230     return boost::none;
231 }
232
233 /**
234  * \brief Main work loop: applies fun to elements in turn.
235  */
236 template <typename T> void Parmap<T>::work()
237 {
238   unsigned index = this->index++;
239   unsigned length = this->data->size();
240   while (index < length) {
241     this->fun((*this->data)[index]);
242     index = this->index++;
243   }
244 }
245
246 /**
247  * Get a synchronization object for given mode.
248  * \param mode the synchronization mode
249  */
250 template <typename T> typename Parmap<T>::Synchro* Parmap<T>::new_synchro(e_xbt_parmap_mode_t mode)
251 {
252   if (mode == XBT_PARMAP_DEFAULT) {
253 #if HAVE_FUTEX_H
254     mode = XBT_PARMAP_FUTEX;
255 #else
256     mode = XBT_PARMAP_POSIX;
257 #endif
258   }
259   Synchro* res;
260   switch (mode) {
261     case XBT_PARMAP_POSIX:
262       res = new PosixSynchro(*this);
263       break;
264     case XBT_PARMAP_FUTEX:
265 #if HAVE_FUTEX_H
266       res = new FutexSynchro(*this);
267 #else
268       xbt_die("Fute is not available on this OS.");
269 #endif
270       break;
271     case XBT_PARMAP_BUSY_WAIT:
272       res = new BusyWaitSynchro(*this);
273       break;
274     default:
275       THROW_IMPOSSIBLE;
276   }
277   return res;
278 }
279
280 /**
281  * \brief Main function of a worker thread.
282  */
283 template <typename T> void* Parmap<T>::worker_main(void* arg)
284 {
285   ThreadData* data      = static_cast<ThreadData*>(arg);
286   Parmap<T>& parmap     = data->parmap;
287   unsigned round        = 0;
288   smx_context_t context = SIMIX_context_new(std::function<void()>(), nullptr, nullptr);
289   SIMIX_context_set_current(context);
290
291   XBT_CDEBUG(xbt_parmap, "New worker thread created");
292
293   /* Worker's main loop */
294   while (1) {
295     round++;
296     parmap.synchro->worker_wait(round);
297     if (parmap.status == PARMAP_DESTROY)
298       break;
299
300     XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id);
301     parmap.work();
302     parmap.synchro->worker_signal();
303     XBT_CDEBUG(xbt_parmap, "Worker %d has finished", data->worker_id);
304   }
305   /* We are destroying the parmap */
306   delete context;
307   delete data;
308   return nullptr;
309 }
310
311 template <typename T> Parmap<T>::PosixSynchro::PosixSynchro(Parmap<T>& parmap) : Synchro(parmap)
312 {
313   ready_cond  = xbt_os_cond_init();
314   ready_mutex = xbt_os_mutex_init();
315   done_cond   = xbt_os_cond_init();
316   done_mutex  = xbt_os_mutex_init();
317 }
318
319 template <typename T> Parmap<T>::PosixSynchro::~PosixSynchro()
320 {
321   xbt_os_cond_destroy(ready_cond);
322   xbt_os_mutex_destroy(ready_mutex);
323   xbt_os_cond_destroy(done_cond);
324   xbt_os_mutex_destroy(done_mutex);
325 }
326
327 template <typename T> void Parmap<T>::PosixSynchro::master_signal()
328 {
329   xbt_os_mutex_acquire(ready_mutex);
330   this->parmap.thread_counter = 1;
331   this->parmap.work_round++;
332   /* wake all workers */
333   xbt_os_cond_broadcast(ready_cond);
334   xbt_os_mutex_release(ready_mutex);
335 }
336
337 template <typename T> void Parmap<T>::PosixSynchro::master_wait()
338 {
339   xbt_os_mutex_acquire(done_mutex);
340   if (this->parmap.thread_counter < this->parmap.num_workers) {
341     /* wait for all workers to be ready */
342     xbt_os_cond_wait(done_cond, done_mutex);
343   }
344   xbt_os_mutex_release(done_mutex);
345 }
346
347 template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
348 {
349   xbt_os_mutex_acquire(done_mutex);
350   this->parmap.thread_counter++;
351   if (this->parmap.thread_counter == this->parmap.num_workers) {
352     /* all workers have finished, wake the controller */
353     xbt_os_cond_signal(done_cond);
354   }
355   xbt_os_mutex_release(done_mutex);
356 }
357
358 template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned round)
359 {
360   xbt_os_mutex_acquire(ready_mutex);
361   /* wait for more work */
362   if (this->parmap.work_round != round) {
363     xbt_os_cond_wait(ready_cond, ready_mutex);
364   }
365   xbt_os_mutex_release(ready_mutex);
366 }
367
368 #if HAVE_FUTEX_H
369 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val)
370 {
371   XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr);
372   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0);
373 }
374
375 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val)
376 {
377   XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr);
378   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0);
379 }
380
381 template <typename T> void Parmap<T>::FutexSynchro::master_signal()
382 {
383   this->parmap.thread_counter = 1;
384   __sync_add_and_fetch(&this->parmap.work_round, 1);
385   /* wake all workers */
386   futex_wake(&this->parmap.work_round, std::numeric_limits<int>::max());
387 }
388
389 template <typename T> void Parmap<T>::FutexSynchro::master_wait()
390 {
391   unsigned count = this->parmap.thread_counter;
392   while (count < this->parmap.num_workers) {
393     /* wait for all workers to be ready */
394     futex_wait(&this->parmap.thread_counter, count);
395     count = this->parmap.thread_counter;
396   }
397 }
398
399 template <typename T> void Parmap<T>::FutexSynchro::worker_signal()
400 {
401   unsigned count = __sync_add_and_fetch(&this->parmap.thread_counter, 1);
402   if (count == this->parmap.num_workers) {
403     /* all workers have finished, wake the controller */
404     futex_wake(&this->parmap.thread_counter, std::numeric_limits<int>::max());
405   }
406 }
407
408 template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned round)
409 {
410   unsigned work_round = this->parmap.work_round;
411   /* wait for more work */
412   while (work_round != round) {
413     futex_wait(&this->parmap.work_round, work_round);
414     work_round = this->parmap.work_round;
415   }
416 }
417 #endif
418
419 template <typename T> void Parmap<T>::BusyWaitSynchro::master_signal()
420 {
421   this->parmap.thread_counter = 1;
422   __sync_add_and_fetch(&this->parmap.work_round, 1);
423 }
424
425 template <typename T> void Parmap<T>::BusyWaitSynchro::master_wait()
426 {
427   while (this->parmap.thread_counter < this->parmap.num_workers) {
428     xbt_os_thread_yield();
429   }
430 }
431
432 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_signal()
433 {
434   __sync_add_and_fetch(&this->parmap.thread_counter, 1);
435 }
436
437 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_wait(unsigned round)
438 {
439   /* wait for more work */
440   while (this->parmap.work_round != round) {
441     xbt_os_thread_yield();
442   }
443 }
444
445 /** \} */
446 }
447 }
448
449 #endif