Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Enable SIMIX parallel mode for parmap_{bench,text}.
[simgrid.git] / src / include / xbt / parmap.hpp
1 /* A thread pool (C++ version).                                             */
2
3 /* Copyright (c) 2004-2017 The SimGrid Team.
4  * All rights reserved.                                                     */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #ifndef XBT_PARMAP_HPP
10 #define XBT_PARMAP_HPP
11
12 #include "src/internal_config.h" // HAVE_FUTEX_H
13 #include "src/kernel/context/Context.hpp"
14 #include <atomic>
15 #include <boost/optional.hpp>
16 #include <simgrid/simix.h>
17 #include <vector>
18 #include <xbt/log.h>
19 #include <xbt/parmap.h>
20 #include <xbt/xbt_os_thread.h>
21
22 #if HAVE_FUTEX_H
23 #include <limits>
24 #include <linux/futex.h>
25 #include <sys/syscall.h>
26 #endif
27
28 XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap);
29
30 namespace simgrid {
31 namespace xbt {
32
33 /** \addtogroup XBT_parmap
34   * \ingroup XBT_misc
35   * \brief Parallel map class
36   * \{
37   */
38 template <typename T> class Parmap {
39 public:
40   Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode);
41   Parmap(const Parmap&) = delete;
42   ~Parmap();
43   void apply(void (*fun)(T), const std::vector<T>& data);
44   boost::optional<T> next();
45
46 private:
47   enum Flag { PARMAP_WORK, PARMAP_DESTROY };
48
49   /**
50    * \brief Thread data transmission structure
51    */
52   class ThreadData {
53   public:
54     ThreadData(Parmap<T>& parmap, int id) : parmap(parmap), worker_id(id) {}
55     Parmap<T>& parmap;
56     int worker_id;
57   };
58
59   /**
60    * \brief Synchronization object (different specializations).
61    */
62   class Synchro {
63   public:
64     explicit Synchro(Parmap<T>& parmap) : parmap(parmap) {}
65     virtual ~Synchro() = default;
66     /**
67      * \brief Wakes all workers and waits for them to finish the tasks.
68      *
69      * This function is called by the controller thread.
70      */
71     virtual void master_signal() = 0;
72     /**
73      * \brief Starts the parmap: waits for all workers to be ready and returns.
74      *
75      * This function is called by the controller thread.
76      */
77     virtual void master_wait() = 0;
78     /**
79      * \brief Ends the parmap: wakes the controller thread when all workers terminate.
80      *
81      * This function is called by all worker threads when they end (not including the controller).
82      */
83     virtual void worker_signal() = 0;
84     /**
85      * \brief Waits for some work to process.
86      *
87      * This function is called by each worker thread (not including the controller) when it has no more work to do.
88      *
89      * \param round  the expected round number
90      */
91     virtual void worker_wait(unsigned) = 0;
92
93     Parmap<T>& parmap;
94   };
95
96   class PosixSynchro : public Synchro {
97   public:
98     explicit PosixSynchro(Parmap<T>& parmap);
99     ~PosixSynchro();
100     void master_signal();
101     void master_wait();
102     void worker_signal();
103     void worker_wait(unsigned round);
104
105   private:
106     xbt_os_cond_t ready_cond;
107     xbt_os_mutex_t ready_mutex;
108     xbt_os_cond_t done_cond;
109     xbt_os_mutex_t done_mutex;
110   };
111
112 #if HAVE_FUTEX_H
113   class FutexSynchro : public Synchro {
114   public:
115     explicit FutexSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
116     void master_signal();
117     void master_wait();
118     void worker_signal();
119     void worker_wait(unsigned);
120
121   private:
122     static void futex_wait(unsigned* uaddr, unsigned val);
123     static void futex_wake(unsigned* uaddr, unsigned val);
124   };
125 #endif
126
127   class BusyWaitSynchro : public Synchro {
128   public:
129     explicit BusyWaitSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
130     void master_signal();
131     void master_wait();
132     void worker_signal();
133     void worker_wait(unsigned);
134   };
135
136   static void* worker_main(void* arg);
137   Synchro* new_synchro(e_xbt_parmap_mode_t mode);
138   void work();
139
140   Flag status;              /**< is the parmap active or being destroyed? */
141   unsigned work_round;      /**< index of the current round */
142   xbt_os_thread_t* workers; /**< worker thread handlers */
143   unsigned num_workers;     /**< total number of worker threads including the controller */
144   Synchro* synchro;         /**< synchronization object */
145
146   unsigned thread_counter    = 0;       /**< number of workers that have done the work */
147   void (*fun)(const T)       = nullptr; /**< function to run in parallel on each element of data */
148   const std::vector<T>* data = nullptr; /**< parameters to pass to fun in parallel */
149   std::atomic<unsigned> index;          /**< index of the next element of data to pick */
150 };
151
152 /**
153  * \brief Creates a parallel map object
154  * \param num_workers number of worker threads to create
155  * \param mode how to synchronize the worker threads
156  */
157 template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
158 {
159   XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers);
160
161   /* Initialize the thread pool data structure */
162   this->status      = PARMAP_WORK;
163   this->work_round  = 0;
164   this->workers     = new xbt_os_thread_t[num_workers];
165   this->num_workers = num_workers;
166   this->synchro     = new_synchro(mode);
167
168   /* Create the pool of worker threads */
169   this->workers[0] = nullptr;
170 #if HAVE_PTHREAD_SETAFFINITY
171   int core_bind = 0;
172 #endif
173   for (unsigned i = 1; i < num_workers; i++) {
174     ThreadData* data = new ThreadData(*this, i);
175     this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr);
176 #if HAVE_PTHREAD_SETAFFINITY
177     xbt_os_thread_bind(this->workers[i], core_bind);
178     if (core_bind != xbt_os_get_numcores() - 1)
179       core_bind++;
180     else
181       core_bind = 0;
182 #endif
183   }
184 }
185
186 /**
187  * \brief Destroys a parmap
188  */
189 template <typename T> Parmap<T>::~Parmap()
190 {
191   status = PARMAP_DESTROY;
192   synchro->master_signal();
193
194   for (unsigned i = 1; i < num_workers; i++)
195     xbt_os_thread_join(workers[i], nullptr);
196
197   delete[] workers;
198   delete synchro;
199 }
200
201 /**
202  * \brief Applies a list of tasks in parallel.
203  * \param fun the function to call in parallel
204  * \param data each element of this vector will be passed as an argument to fun
205  */
206 template <typename T> void Parmap<T>::apply(void (*fun)(T), const std::vector<T>& data)
207 {
208   /* Assign resources to worker threads (we are maestro here)*/
209   this->fun   = fun;
210   this->data  = &data;
211   this->index = 0;
212   this->synchro->master_signal(); // maestro runs futex_wait to wake all the minions (the working threads)
213   this->work();                   // maestro works with its minions
214   this->synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
215   XBT_CDEBUG(xbt_parmap, "Job done"); //   ... and proceeds
216 }
217
218 /**
219  * \brief Returns a next task to process.
220  *
221  * Worker threads call this function to get more work.
222  *
223  * \return the next task to process, or throws a std::out_of_range exception if there is no more work
224  */
225 template <typename T> boost::optional<T> Parmap<T>::next()
226 {
227   unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
228   if (index < this->data->size())
229     return (*this->data)[index];
230   else
231     return boost::none;
232 }
233
234 /**
235  * \brief Main work loop: applies fun to elements in turn.
236  */
237 template <typename T> void Parmap<T>::work()
238 {
239   unsigned length = this->data->size();
240   unsigned index  = this->index.fetch_add(1, std::memory_order_relaxed);
241   while (index < length) {
242     this->fun((*this->data)[index]);
243     index = this->index.fetch_add(1, std::memory_order_relaxed);
244   }
245 }
246
247 /**
248  * Get a synchronization object for given mode.
249  * \param mode the synchronization mode
250  */
251 template <typename T> typename Parmap<T>::Synchro* Parmap<T>::new_synchro(e_xbt_parmap_mode_t mode)
252 {
253   if (mode == XBT_PARMAP_DEFAULT) {
254 #if HAVE_FUTEX_H
255     mode = XBT_PARMAP_FUTEX;
256 #else
257     mode = XBT_PARMAP_POSIX;
258 #endif
259   }
260   Synchro* res;
261   switch (mode) {
262     case XBT_PARMAP_POSIX:
263       res = new PosixSynchro(*this);
264       break;
265     case XBT_PARMAP_FUTEX:
266 #if HAVE_FUTEX_H
267       res = new FutexSynchro(*this);
268 #else
269       xbt_die("Fute is not available on this OS.");
270 #endif
271       break;
272     case XBT_PARMAP_BUSY_WAIT:
273       res = new BusyWaitSynchro(*this);
274       break;
275     default:
276       THROW_IMPOSSIBLE;
277   }
278   return res;
279 }
280
281 /**
282  * \brief Main function of a worker thread.
283  */
284 template <typename T> void* Parmap<T>::worker_main(void* arg)
285 {
286   ThreadData* data      = static_cast<ThreadData*>(arg);
287   Parmap<T>& parmap     = data->parmap;
288   unsigned round        = 0;
289   smx_context_t context = SIMIX_context_new(std::function<void()>(), nullptr, nullptr);
290   SIMIX_context_set_current(context);
291
292   XBT_CDEBUG(xbt_parmap, "New worker thread created");
293
294   /* Worker's main loop */
295   while (1) {
296     round++;
297     parmap.synchro->worker_wait(round);
298     if (parmap.status == PARMAP_DESTROY)
299       break;
300
301     XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id);
302     parmap.work();
303     parmap.synchro->worker_signal();
304     XBT_CDEBUG(xbt_parmap, "Worker %d has finished", data->worker_id);
305   }
306   /* We are destroying the parmap */
307   delete context;
308   delete data;
309   return nullptr;
310 }
311
312 template <typename T> Parmap<T>::PosixSynchro::PosixSynchro(Parmap<T>& parmap) : Synchro(parmap)
313 {
314   ready_cond  = xbt_os_cond_init();
315   ready_mutex = xbt_os_mutex_init();
316   done_cond   = xbt_os_cond_init();
317   done_mutex  = xbt_os_mutex_init();
318 }
319
320 template <typename T> Parmap<T>::PosixSynchro::~PosixSynchro()
321 {
322   xbt_os_cond_destroy(ready_cond);
323   xbt_os_mutex_destroy(ready_mutex);
324   xbt_os_cond_destroy(done_cond);
325   xbt_os_mutex_destroy(done_mutex);
326 }
327
328 template <typename T> void Parmap<T>::PosixSynchro::master_signal()
329 {
330   xbt_os_mutex_acquire(ready_mutex);
331   this->parmap.thread_counter = 1;
332   this->parmap.work_round++;
333   /* wake all workers */
334   xbt_os_cond_broadcast(ready_cond);
335   xbt_os_mutex_release(ready_mutex);
336 }
337
338 template <typename T> void Parmap<T>::PosixSynchro::master_wait()
339 {
340   xbt_os_mutex_acquire(done_mutex);
341   if (this->parmap.thread_counter < this->parmap.num_workers) {
342     /* wait for all workers to be ready */
343     xbt_os_cond_wait(done_cond, done_mutex);
344   }
345   xbt_os_mutex_release(done_mutex);
346 }
347
348 template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
349 {
350   xbt_os_mutex_acquire(done_mutex);
351   this->parmap.thread_counter++;
352   if (this->parmap.thread_counter == this->parmap.num_workers) {
353     /* all workers have finished, wake the controller */
354     xbt_os_cond_signal(done_cond);
355   }
356   xbt_os_mutex_release(done_mutex);
357 }
358
359 template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned round)
360 {
361   xbt_os_mutex_acquire(ready_mutex);
362   /* wait for more work */
363   if (this->parmap.work_round != round) {
364     xbt_os_cond_wait(ready_cond, ready_mutex);
365   }
366   xbt_os_mutex_release(ready_mutex);
367 }
368
369 #if HAVE_FUTEX_H
370 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val)
371 {
372   XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr);
373   syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0);
374 }
375
376 template <typename T> inline void Parmap<T>::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val)
377 {
378   XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr);
379   syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0);
380 }
381
382 template <typename T> void Parmap<T>::FutexSynchro::master_signal()
383 {
384   this->parmap.thread_counter = 1;
385   __sync_add_and_fetch(&this->parmap.work_round, 1);
386   /* wake all workers */
387   futex_wake(&this->parmap.work_round, std::numeric_limits<int>::max());
388 }
389
390 template <typename T> void Parmap<T>::FutexSynchro::master_wait()
391 {
392   unsigned count = this->parmap.thread_counter;
393   while (count < this->parmap.num_workers) {
394     /* wait for all workers to be ready */
395     futex_wait(&this->parmap.thread_counter, count);
396     count = this->parmap.thread_counter;
397   }
398 }
399
400 template <typename T> void Parmap<T>::FutexSynchro::worker_signal()
401 {
402   unsigned count = __sync_add_and_fetch(&this->parmap.thread_counter, 1);
403   if (count == this->parmap.num_workers) {
404     /* all workers have finished, wake the controller */
405     futex_wake(&this->parmap.thread_counter, std::numeric_limits<int>::max());
406   }
407 }
408
409 template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned round)
410 {
411   unsigned work_round = this->parmap.work_round;
412   /* wait for more work */
413   while (work_round != round) {
414     futex_wait(&this->parmap.work_round, work_round);
415     work_round = this->parmap.work_round;
416   }
417 }
418 #endif
419
420 template <typename T> void Parmap<T>::BusyWaitSynchro::master_signal()
421 {
422   this->parmap.thread_counter = 1;
423   __sync_add_and_fetch(&this->parmap.work_round, 1);
424 }
425
426 template <typename T> void Parmap<T>::BusyWaitSynchro::master_wait()
427 {
428   while (this->parmap.thread_counter < this->parmap.num_workers) {
429     xbt_os_thread_yield();
430   }
431 }
432
433 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_signal()
434 {
435   __sync_add_and_fetch(&this->parmap.thread_counter, 1);
436 }
437
438 template <typename T> void Parmap<T>::BusyWaitSynchro::worker_wait(unsigned round)
439 {
440   /* wait for more work */
441   while (this->parmap.work_round != round) {
442     xbt_os_thread_yield();
443   }
444 }
445
446 /** \} */
447 }
448 }
449
450 #endif