/* A thread pool (C++ version). */
-/* Copyright (c) 2004-2017 The SimGrid Team.
- * All rights reserved. */
+/* Copyright (c) 2004-2018 The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include "src/internal_config.h" // HAVE_FUTEX_H
#include "src/kernel/context/Context.hpp"
-#include <atomic>
+#include "xbt/xbt_os_thread.h"
+
#include <boost/optional.hpp>
-#include <simgrid/simix.h>
-#include <vector>
-#include <xbt/log.h>
-#include <xbt/parmap.h>
-#include <xbt/xbt_os_thread.h>
#if HAVE_FUTEX_H
-#include <limits>
#include <linux/futex.h>
#include <sys/syscall.h>
#endif
template <typename T> class Parmap {
public:
Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode);
+ Parmap(const Parmap&) = delete;
+ Parmap& operator=(const Parmap&) = delete;
~Parmap();
void apply(void (*fun)(T), const std::vector<T>& data);
boost::optional<T> next();
*/
class Synchro {
public:
- Synchro(Parmap<T>& parmap) : parmap(parmap) {}
- virtual ~Synchro() {}
+ explicit Synchro(Parmap<T>& parmap) : parmap(parmap) {}
+ virtual ~Synchro() = default;
/**
* \brief Wakes all workers and waits for them to finish the tasks.
*
* This function is called by the controller thread.
*/
- virtual void master_signal() = 0;
+ virtual void master_signal() = 0;
/**
* \brief Starts the parmap: waits for all workers to be ready and returns.
*
* This function is called by the controller thread.
*/
- virtual void master_wait() = 0;
+ virtual void master_wait() = 0;
/**
* \brief Ends the parmap: wakes the controller thread when all workers terminate.
*
* This function is called by all worker threads when they end (not including the controller).
*/
- virtual void worker_signal() = 0;
+ virtual void worker_signal() = 0;
/**
* \brief Waits for some work to process.
*
*/
virtual void worker_wait(unsigned) = 0;
- protected:
Parmap<T>& parmap;
};
class PosixSynchro : public Synchro {
public:
- PosixSynchro(Parmap<T>& parmap);
+ explicit PosixSynchro(Parmap<T>& parmap);
~PosixSynchro();
void master_signal();
void master_wait();
#if HAVE_FUTEX_H
class FutexSynchro : public Synchro {
public:
- FutexSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
+ explicit FutexSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
void master_signal();
void master_wait();
void worker_signal();
class BusyWaitSynchro : public Synchro {
public:
- BusyWaitSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
+ explicit BusyWaitSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
void master_signal();
void master_wait();
void worker_signal();
Synchro* new_synchro(e_xbt_parmap_mode_t mode);
void work();
- Flag status; /**< is the parmap active or being destroyed? */
- unsigned work_round; /**< index of the current round */
- unsigned thread_counter; /**< number of workers that have done the work */
- unsigned num_workers; /**< total number of worker threads including the controller */
- xbt_os_thread_t* workers; /**< worker thread handlers */
- void (*fun)(const T); /**< function to run in parallel on each element of data */
- const std::vector<T>* data; /**< parameters to pass to fun in parallel */
- std::atomic<unsigned> index; /**< index of the next element of data to pick */
- Synchro* synchro; /**< synchronization object */
+ Flag status; /**< is the parmap active or being destroyed? */
+ unsigned work_round; /**< index of the current round */
+ xbt_os_thread_t* workers; /**< worker thread handlers */
+ unsigned num_workers; /**< total number of worker threads including the controller */
+ Synchro* synchro; /**< synchronization object */
+
+ unsigned thread_counter = 0; /**< number of workers that have done the work */
+ void (*fun)(const T) = nullptr; /**< function to run in parallel on each element of data */
+ const std::vector<T>* data = nullptr; /**< parameters to pass to fun in parallel */
+ std::atomic<unsigned> index; /**< index of the next element of data to pick */
};
/**
this->fun = fun;
this->data = &data;
this->index = 0;
- this->synchro->master_signal(); // maestro runs futex_wait to wake all the minions (the working threads)
+ this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads)
this->work(); // maestro works with its minions
this->synchro->master_wait(); // When there is no more work to do, then maestro waits for the last minion to stop
XBT_CDEBUG(xbt_parmap, "Job done"); // ... and proceeds
*/
template <typename T> boost::optional<T> Parmap<T>::next()
{
- unsigned index = this->index++;
+ unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
if (index < this->data->size())
return (*this->data)[index];
else
*/
template <typename T> void Parmap<T>::work()
{
- unsigned index = this->index++;
unsigned length = this->data->size();
+ unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
while (index < length) {
this->fun((*this->data)[index]);
- index = this->index++;
+ index = this->index.fetch_add(1, std::memory_order_relaxed);
}
}
#if HAVE_FUTEX_H
res = new FutexSynchro(*this);
#else
- xbt_die("Fute is not available on this OS.");
+ xbt_die("Futex is not available on this OS.");
#endif
break;
case XBT_PARMAP_BUSY_WAIT:
template <typename T> void Parmap<T>::PosixSynchro::master_wait()
{
xbt_os_mutex_acquire(done_mutex);
- if (this->parmap.thread_counter < this->parmap.num_workers) {
+ while (this->parmap.thread_counter < this->parmap.num_workers) {
/* wait for all workers to be ready */
xbt_os_cond_wait(done_cond, done_mutex);
}
{
xbt_os_mutex_acquire(ready_mutex);
/* wait for more work */
- if (this->parmap.work_round != round) {
+ while (this->parmap.work_round != round) {
xbt_os_cond_wait(ready_cond, ready_mutex);
}
xbt_os_mutex_release(ready_mutex);
template <typename T> void Parmap<T>::FutexSynchro::master_signal()
{
- this->parmap.thread_counter = 1;
- __sync_add_and_fetch(&this->parmap.work_round, 1);
+ __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST);
/* wake all workers */
futex_wake(&this->parmap.work_round, std::numeric_limits<int>::max());
}
template <typename T> void Parmap<T>::FutexSynchro::master_wait()
{
- unsigned count = this->parmap.thread_counter;
+ unsigned count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST);
while (count < this->parmap.num_workers) {
/* wait for all workers to be ready */
futex_wait(&this->parmap.thread_counter, count);
- count = this->parmap.thread_counter;
+ count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST);
}
}
template <typename T> void Parmap<T>::FutexSynchro::worker_signal()
{
- unsigned count = __sync_add_and_fetch(&this->parmap.thread_counter, 1);
+ unsigned count = __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
if (count == this->parmap.num_workers) {
/* all workers have finished, wake the controller */
futex_wake(&this->parmap.thread_counter, std::numeric_limits<int>::max());
template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned round)
{
- unsigned work_round = this->parmap.work_round;
+ unsigned work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST);
/* wait for more work */
while (work_round != round) {
futex_wait(&this->parmap.work_round, work_round);
- work_round = this->parmap.work_round;
+ work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST);
}
}
#endif
template <typename T> void Parmap<T>::BusyWaitSynchro::master_signal()
{
- this->parmap.thread_counter = 1;
- __sync_add_and_fetch(&this->parmap.work_round, 1);
+ __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
+ __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST);
}
template <typename T> void Parmap<T>::BusyWaitSynchro::master_wait()
{
- while (this->parmap.thread_counter < this->parmap.num_workers) {
+ while (__atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST) < this->parmap.num_workers) {
xbt_os_thread_yield();
}
}
template <typename T> void Parmap<T>::BusyWaitSynchro::worker_signal()
{
- __sync_add_and_fetch(&this->parmap.thread_counter, 1);
+ __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST);
}
template <typename T> void Parmap<T>::BusyWaitSynchro::worker_wait(unsigned round)
{
/* wait for more work */
- while (this->parmap.work_round != round) {
+ while (__atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST) != round) {
xbt_os_thread_yield();
}
}