Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
snake_case s4u::Comm
[simgrid.git] / src / include / xbt / parmap.hpp
1 /* A thread pool (C++ version).                                             */
2
3 /* Copyright (c) 2004-2018 The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #ifndef XBT_PARMAP_HPP
10 #define XBT_PARMAP_HPP
11
12 #include "src/internal_config.h" // HAVE_FUTEX_H
13 #include "src/kernel/context/Context.hpp"
14 #include <atomic>
15 #include <boost/optional.hpp>
16 #include <simgrid/simix.h>
17 #include <vector>
18 #include <xbt/log.h>
19 #include <xbt/parmap.h>
20 #include <xbt/xbt_os_thread.h>
21
22 #if HAVE_FUTEX_H
23 #include <limits>
24 #include <linux/futex.h>
25 #include <sys/syscall.h>
26 #endif
27
28 XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap);
29
30 namespace simgrid {
31 namespace xbt {
32
33 /** \addtogroup XBT_parmap
34   * \ingroup XBT_misc
35   * \brief Parallel map class
36   * \{
37   */
38 template <typename T> class Parmap {
39 public:
40   Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode);
41   Parmap(const Parmap&) = delete;
42   Parmap& operator=(const Parmap&) = delete;
43   ~Parmap();
44   void apply(void (*fun)(T), const std::vector<T>& data);
45   boost::optional<T> next();
46
47 private:
48   enum Flag { PARMAP_WORK, PARMAP_DESTROY };
49
50   /**
51    * \brief Thread data transmission structure
52    */
53   class ThreadData {
54   public:
55     ThreadData(Parmap<T>& parmap, int id) : parmap(parmap), worker_id(id) {}
56     Parmap<T>& parmap;
57     int worker_id;
58   };
59
60   /**
61    * \brief Synchronization object (different specializations).
62    */
63   class Synchro {
64   public:
65     explicit Synchro(Parmap<T>& parmap) : parmap(parmap) {}
66     virtual ~Synchro() = default;
67     /**
68      * \brief Wakes all workers and waits for them to finish the tasks.
69      *
70      * This function is called by the controller thread.
71      */
72     virtual void master_signal() = 0;
73     /**
74      * \brief Starts the parmap: waits for all workers to be ready and returns.
75      *
76      * This function is called by the controller thread.
77      */
78     virtual void master_wait() = 0;
79     /**
80      * \brief Ends the parmap: wakes the controller thread when all workers terminate.
81      *
82      * This function is called by all worker threads when they end (not including the controller).
83      */
84     virtual void worker_signal() = 0;
85     /**
86      * \brief Waits for some work to process.
87      *
88      * This function is called by each worker thread (not including the controller) when it has no more work to do.
89      *
90      * \param round  the expected round number
91      */
92     virtual void worker_wait(unsigned) = 0;
93
94     Parmap<T>& parmap;
95   };
96
97   class PosixSynchro : public Synchro {
98   public:
99     explicit PosixSynchro(Parmap<T>& parmap);
100     ~PosixSynchro();
101     void master_signal();
102     void master_wait();
103     void worker_signal();
104     void worker_wait(unsigned round);
105
106   private:
107     xbt_os_cond_t ready_cond;
108     xbt_os_mutex_t ready_mutex;
109     xbt_os_cond_t done_cond;
110     xbt_os_mutex_t done_mutex;
111   };
112
113 #if HAVE_FUTEX_H
114   class FutexSynchro : public Synchro {
115   public:
116     explicit FutexSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
117     void master_signal();
118     void master_wait();
119     void worker_signal();
120     void worker_wait(unsigned);
121
122   private:
123     static void futex_wait(unsigned* uaddr, unsigned val);
124     static void futex_wake(unsigned* uaddr, unsigned val);
125   };
126 #endif
127
128   class BusyWaitSynchro : public Synchro {
129   public:
130     explicit BusyWaitSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
131     void master_signal();
132     void master_wait();
133     void worker_signal();
134     void worker_wait(unsigned);
135   };
136
137   static void* worker_main(void* arg);
138   Synchro* new_synchro(e_xbt_parmap_mode_t mode);
139   void work();
140
141   Flag status;              /**< is the parmap active or being destroyed? */
142   unsigned work_round;      /**< index of the current round */
143   xbt_os_thread_t* workers; /**< worker thread handlers */
144   unsigned num_workers;     /**< total number of worker threads including the controller */
145   Synchro* synchro;         /**< synchronization object */
146
147   unsigned thread_counter    = 0;       /**< number of workers that have done the work */
148   void (*fun)(const T)       = nullptr; /**< function to run in parallel on each element of data */
149   const std::vector<T>* data = nullptr; /**< parameters to pass to fun in parallel */
150   std::atomic<unsigned> index;          /**< index of the next element of data to pick */
151 };
152
153 /**
154  * \brief Creates a parallel map object
155  * \param num_workers number of worker threads to create
156  * \param mode how to synchronize the worker threads
157  */
158 template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
159 {
160   XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers);
161
162   /* Initialize the thread pool data structure */
163   this->status      = PARMAP_WORK;
164   this->work_round  = 0;
165   this->workers     = new xbt_os_thread_t[num_workers];
166   this->num_workers = num_workers;
167   this->synchro     = new_synchro(mode);
168
169   /* Create the pool of worker threads */
170   this->workers[0] = nullptr;
171 #if HAVE_PTHREAD_SETAFFINITY
172   int core_bind = 0;
173 #endif
174   for (unsigned i = 1; i < num_workers; i++) {
175     ThreadData* data = new ThreadData(*this, i);
176     this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr);
177 #if HAVE_PTHREAD_SETAFFINITY
178     xbt_os_thread_bind(this->workers[i], core_bind);
179     if (core_bind != xbt_os_get_numcores() - 1)
180       core_bind++;
181     else
182       core_bind = 0;
183 #endif
184   }
185 }
186
187 /**
188  * \brief Destroys a parmap
189  */
190 template <typename T> Parmap<T>::~Parmap()
191 {
192   status = PARMAP_DESTROY;
193   synchro->master_signal();
194
195   for (unsigned i = 1; i < num_workers; i++)
196     xbt_os_thread_join(workers[i], nullptr);
197
198   delete[] workers;
199   delete synchro;
200 }
201
202 /**
203  * \brief Applies a list of tasks in parallel.
204  * \param fun the function to call in parallel
205  * \param data each element of this vector will be passed as an argument to fun
206  */
207 template <typename T> void Parmap<T>::apply(void (*fun)(T), const std::vector<T>& data)
208 {
209   /* Assign resources to worker threads (we are maestro here)*/
210   this->fun   = fun;
211   this->data  = &data;
212   this->index = 0;
213   this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads)
214   this->work();                   // maestro works with its minions
215   this->synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
216   XBT_CDEBUG(xbt_parmap, "Job done"); //   ... and proceeds
217 }
218
219 /**
220  * \brief Returns a next task to process.
221  *
222  * Worker threads call this function to get more work.
223  *
224  * \return the next task to process, or throws a std::out_of_range exception if there is no more work
225  */
226 template <typename T> boost::optional<T> Parmap<T>::next()
227 {
228   unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
229   if (index < this->data->size())
230     return (*this->data)[index];
231   else
232     return boost::none;
233 }
234
235 /**
236  * \brief Main work loop: applies fun to elements in turn.
237  */
238 template <typename T> void Parmap<T>::work()
239 {
240   unsigned length = this->data->size();
241   unsigned index  = this->index.fetch_add(1, std::memory_order_relaxed);
242   while (index < length) {
243     this->fun((*this->data)[index]);
244     index = this->index.fetch_add(1, std::memory_order_relaxed);
245   }
246 }
247
248 /**
249  * Get a synchronization object for given mode.
250  * \param mode the synchronization mode
251  */
252 template <typename T> typename Parmap<T>::Synchro* Parmap<T>::new_synchro(e_xbt_parmap_mode_t mode)
253 {
254   if (mode == XBT_PARMAP_DEFAULT) {
255 #if HAVE_FUTEX_H
256     mode = XBT_PARMAP_FUTEX;
257 #else
258     mode = XBT_PARMAP_POSIX;
259 #endif
260   }
261   Synchro* res;
262   switch (mode) {
263     case XBT_PARMAP_POSIX:
264       res = new PosixSynchro(*this);
265       break;
266     case XBT_PARMAP_FUTEX:
267 #if HAVE_FUTEX_H
268       res = new FutexSynchro(*this);
269 #else
270       xbt_die("Futex is not available on this OS.");
271 #endif
272       break;
273     case XBT_PARMAP_BUSY_WAIT:
274       res = new BusyWaitSynchro(*this);
275       break;
276     default:
277       THROW_IMPOSSIBLE;
278   }
279   return res;
280 }
281
282 /**
283  * \brief Main function of a worker thread.
284  */
285 template <typename T> void* Parmap<T>::worker_main(void* arg)
286 {
287   ThreadData* data      = static_cast<ThreadData*>(arg);
288   Parmap<T>& parmap     = data->parmap;
289   unsigned round        = 0;
290   smx_context_t context = SIMIX_context_new(std::function<void()>(), nullptr, nullptr);
291   SIMIX_context_set_current(context);
292
293   XBT_CDEBUG(xbt_parmap, "New worker thread created");
294
295   /* Worker's main loop */
296   while (1) {
297     round++;
298     parmap.synchro->worker_wait(round);
299     if (parmap.status == PARMAP_DESTROY)
300       break;
301
302     XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id);
303     parmap.work();
304     parmap.synchro->worker_signal();
305     XBT_CDEBUG(xbt_parmap, "Worker %d has finished", data->worker_id);
306   }
307   /* We are destroying the parmap */
308   delete context;
309   delete data;
310   return nullptr;
311 }
312
313 template <typename T> Parmap<T>::PosixSynchro::PosixSynchro(Parmap<T>& parmap) : Synchro(parmap)
314 {
315   ready_cond  = xbt_os_cond_init();
316   ready_mutex = xbt_os_mutex_init();
317   done_cond   = xbt_os_cond_init();
318   done_mutex  = xbt_os_mutex_init();
319 }
320
321 template <typename T> Parmap<T>::PosixSynchro::~PosixSynchro()
322 {
323   xbt_os_cond_destroy(ready_cond);
324   xbt_os_mutex_destroy(ready_mutex);
325   xbt_os_cond_destroy(done_cond);
326   xbt_os_mutex_destroy(done_mutex);
327 }
328
329 template <typename T> void Parmap<T>::PosixSynchro::master_signal()
330 {
331   xbt_os_mutex_acquire(ready_mutex);
332   this->parmap.thread_counter = 1;
333   this->parmap.work_round++;
334   /* wake all workers */
335   xbt_os_cond_broadcast(ready_cond);
336   xbt_os_mutex_release(ready_mutex);
337 }
338
339 template <typename T> void Parmap<T>::PosixSynchro::master_wait()
340 {
341   xbt_os_mutex_acquire(done_mutex);
342   while (this->parmap.thread_counter < this->parmap.num_workers) {
343     /* wait for all workers to be ready */
344     xbt_os_cond_wait(done_cond, done_mutex);
345   }
346   xbt_os_mutex_release(done_mutex);
347 }
348
349 template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
350 {
351   xbt_os_mutex_acquire(done_mutex);
352   this->parmap.thread_counter++;
353   if (this->parmap.thread_counter == this->parmap.num_workers) {
354     /* all workers have finished, wake the controller */
355     xbt_os_cond_signal(done_cond);
356   }
357   xbt_os_mutex_release(done_mutex);
358 }
359
360 template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned round)
361 {
362   xbt_os_mutex_acquire(ready_mutex);
363   /* wait for more work */
364   while (this->parmap.work_round != round) {
365     xbt_os_cond_wait(ready_cond, ready_mutex);
366   }
367   xbt_os_mutex_release(ready_mutex);
368 }
369
370 #if HAVE_FUTEX_H
371 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val)
372 {
373   XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr);
374   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0);
375 }
376
377 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val)
378 {
379   XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr);
380   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0);
381 }
382
383 template <typename T> void Parmap<T>::FutexSynchro::master_signal()
384 {
385   __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
386   __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST);
387   /* wake all workers */
388   futex_wake(&this->parmap.work_round, std::numeric_limits<int>::max());
389 }
390
391 template <typename T> void Parmap<T>::FutexSynchro::master_wait()
392 {
393   unsigned count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST);
394   while (count < this->parmap.num_workers) {
395     /* wait for all workers to be ready */
396     futex_wait(&this->parmap.thread_counter, count);
397     count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST);
398   }
399 }
400
401 template <typename T> void Parmap<T>::FutexSynchro::worker_signal()
402 {
403   unsigned count = __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
404   if (count == this->parmap.num_workers) {
405     /* all workers have finished, wake the controller */
406     futex_wake(&this->parmap.thread_counter, std::numeric_limits<int>::max());
407   }
408 }
409
410 template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned round)
411 {
412   unsigned work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST);
413   /* wait for more work */
414   while (work_round != round) {
415     futex_wait(&this->parmap.work_round, work_round);
416     work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST);
417   }
418 }
419 #endif
420
421 template <typename T> void Parmap<T>::BusyWaitSynchro::master_signal()
422 {
423   __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
424   __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST);
425 }
426
427 template <typename T> void Parmap<T>::BusyWaitSynchro::master_wait()
428 {
429   while (__atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST) < this->parmap.num_workers) {
430     xbt_os_thread_yield();
431   }
432 }
433
434 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_signal()
435 {
436   __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
437 }
438
439 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_wait(unsigned round)
440 {
441   /* wait for more work */
442   while (__atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST) != round) {
443     xbt_os_thread_yield();
444   }
445 }
446
447 /** \} */
448 }
449 }
450
451 #endif