template <typename T> class Parmap {
public:
Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode);
+ Parmap(const Parmap&) = delete;
~Parmap();
void apply(void (*fun)(T), const std::vector<T>& data);
boost::optional<T> next();
class Synchro {
public:
explicit Synchro(Parmap<T>& parmap) : parmap(parmap) {}
- virtual ~Synchro() {}
+ virtual ~Synchro() = default;
/**
* \brief Wakes all workers and waits for them to finish the tasks.
*
* This function is called by the controller thread.
*/
- virtual void master_signal() = 0;
+ virtual void master_signal() = 0;
/**
* \brief Starts the parmap: waits for all workers to be ready and returns.
*
* This function is called by the controller thread.
*/
- virtual void master_wait() = 0;
+ virtual void master_wait() = 0;
/**
* \brief Ends the parmap: wakes the controller thread when all workers terminate.
*
* This function is called by all worker threads when they end (not including the controller).
*/
- virtual void worker_signal() = 0;
+ virtual void worker_signal() = 0;
/**
* \brief Waits for some work to process.
*
*/
virtual void worker_wait(unsigned) = 0;
- protected:
Parmap<T>& parmap;
};
Synchro* new_synchro(e_xbt_parmap_mode_t mode);
void work();
- Flag status; /**< is the parmap active or being destroyed? */
- unsigned work_round; /**< index of the current round */
- unsigned thread_counter; /**< number of workers that have done the work */
- unsigned num_workers; /**< total number of worker threads including the controller */
- xbt_os_thread_t* workers; /**< worker thread handlers */
- void (*fun)(const T); /**< function to run in parallel on each element of data */
- const std::vector<T>* data; /**< parameters to pass to fun in parallel */
- std::atomic<unsigned> index; /**< index of the next element of data to pick */
- Synchro* synchro; /**< synchronization object */
+ Flag status; /**< is the parmap active or being destroyed? */
+ unsigned work_round; /**< index of the current round */
+ xbt_os_thread_t* workers; /**< worker thread handlers */
+ unsigned num_workers; /**< total number of worker threads including the controller */
+ Synchro* synchro; /**< synchronization object */
+
+ unsigned thread_counter = 0; /**< number of workers that have done the work */
+ void (*fun)(const T) = nullptr; /**< function to run in parallel on each element of data */
+ const std::vector<T>* data = nullptr; /**< parameters to pass to fun in parallel */
+ std::atomic<unsigned> index; /**< index of the next element of data to pick */
};
/**
*/
template <typename T> boost::optional<T> Parmap<T>::next()
{
- unsigned index = this->index++;
+ unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
if (index < this->data->size())
return (*this->data)[index];
else
*/
template <typename T> void Parmap<T>::work()
{
- unsigned index = this->index++;
unsigned length = this->data->size();
+ unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
while (index < length) {
this->fun((*this->data)[index]);
- index = this->index++;
+ index = this->index.fetch_add(1, std::memory_order_relaxed);
}
}