X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/ecd5f7562caf1d443bf22788fa5f4fac408776ec..HEAD:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp deleted file mode 100644 index 3eeb6a2080..0000000000 --- a/src/include/xbt/parmap.hpp +++ /dev/null @@ -1,450 +0,0 @@ -/* A thread pool (C++ version). */ - -/* Copyright (c) 2004-2017 The SimGrid Team. - * All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef XBT_PARMAP_HPP -#define XBT_PARMAP_HPP - -#include "src/internal_config.h" // HAVE_FUTEX_H -#include "src/kernel/context/Context.hpp" -#include -#include -#include -#include -#include -#include -#include - -#if HAVE_FUTEX_H -#include -#include -#include -#endif - -XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap); - -namespace simgrid { -namespace xbt { - -/** \addtogroup XBT_parmap - * \ingroup XBT_misc - * \brief Parallel map class - * \{ - */ -template class Parmap { -public: - Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode); - Parmap(const Parmap&) = delete; - ~Parmap(); - void apply(void (*fun)(T), const std::vector& data); - boost::optional next(); - -private: - enum Flag { PARMAP_WORK, PARMAP_DESTROY }; - - /** - * \brief Thread data transmission structure - */ - class ThreadData { - public: - ThreadData(Parmap& parmap, int id) : parmap(parmap), worker_id(id) {} - Parmap& parmap; - int worker_id; - }; - - /** - * \brief Synchronization object (different specializations). - */ - class Synchro { - public: - explicit Synchro(Parmap& parmap) : parmap(parmap) {} - virtual ~Synchro() = default; - /** - * \brief Wakes all workers and waits for them to finish the tasks. - * - * This function is called by the controller thread. - */ - virtual void master_signal() = 0; - /** - * \brief Starts the parmap: waits for all workers to be ready and returns. - * - * This function is called by the controller thread. - */ - virtual void master_wait() = 0; - /** - * \brief Ends the parmap: wakes the controller thread when all workers terminate. - * - * This function is called by all worker threads when they end (not including the controller). - */ - virtual void worker_signal() = 0; - /** - * \brief Waits for some work to process. - * - * This function is called by each worker thread (not including the controller) when it has no more work to do. - * - * \param round the expected round number - */ - virtual void worker_wait(unsigned) = 0; - - Parmap& parmap; - }; - - class PosixSynchro : public Synchro { - public: - explicit PosixSynchro(Parmap& parmap); - ~PosixSynchro(); - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned round); - - private: - xbt_os_cond_t ready_cond; - xbt_os_mutex_t ready_mutex; - xbt_os_cond_t done_cond; - xbt_os_mutex_t done_mutex; - }; - -#if HAVE_FUTEX_H - class FutexSynchro : public Synchro { - public: - explicit FutexSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned); - - private: - static void futex_wait(unsigned* uaddr, unsigned val); - static void futex_wake(unsigned* uaddr, unsigned val); - }; -#endif - - class BusyWaitSynchro : public Synchro { - public: - explicit BusyWaitSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned); - }; - - static void* worker_main(void* arg); - Synchro* new_synchro(e_xbt_parmap_mode_t mode); - void work(); - - Flag status; /**< is the parmap active or being destroyed? */ - unsigned work_round; /**< index of the current round */ - xbt_os_thread_t* workers; /**< worker thread handlers */ - unsigned num_workers; /**< total number of worker threads including the controller */ - Synchro* synchro; /**< synchronization object */ - - unsigned thread_counter = 0; /**< number of workers that have done the work */ - void (*fun)(const T) = nullptr; /**< function to run in parallel on each element of data */ - const std::vector* data = nullptr; /**< parameters to pass to fun in parallel */ - std::atomic index; /**< index of the next element of data to pick */ -}; - -/** - * \brief Creates a parallel map object - * \param num_workers number of worker threads to create - * \param mode how to synchronize the worker threads - */ -template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode) -{ - XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); - - /* Initialize the thread pool data structure */ - this->status = PARMAP_WORK; - this->work_round = 0; - this->workers = new xbt_os_thread_t[num_workers]; - this->num_workers = num_workers; - this->synchro = new_synchro(mode); - - /* Create the pool of worker threads */ - this->workers[0] = nullptr; -#if HAVE_PTHREAD_SETAFFINITY - int core_bind = 0; -#endif - for (unsigned i = 1; i < num_workers; i++) { - ThreadData* data = new ThreadData(*this, i); - this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr); -#if HAVE_PTHREAD_SETAFFINITY - xbt_os_thread_bind(this->workers[i], core_bind); - if (core_bind != xbt_os_get_numcores() - 1) - core_bind++; - else - core_bind = 0; -#endif - } -} - -/** - * \brief Destroys a parmap - */ -template Parmap::~Parmap() -{ - status = PARMAP_DESTROY; - synchro->master_signal(); - - for (unsigned i = 1; i < num_workers; i++) - xbt_os_thread_join(workers[i], nullptr); - - delete[] workers; - delete synchro; -} - -/** - * \brief Applies a list of tasks in parallel. - * \param fun the function to call in parallel - * \param data each element of this vector will be passed as an argument to fun - */ -template void Parmap::apply(void (*fun)(T), const std::vector& data) -{ - /* Assign resources to worker threads (we are maestro here)*/ - this->fun = fun; - this->data = &data; - this->index = 0; - this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads) - this->work(); // maestro works with its minions - this->synchro->master_wait(); // When there is no more work to do, then maestro waits for the last minion to stop - XBT_CDEBUG(xbt_parmap, "Job done"); // ... and proceeds -} - -/** - * \brief Returns a next task to process. - * - * Worker threads call this function to get more work. - * - * \return the next task to process, or throws a std::out_of_range exception if there is no more work - */ -template boost::optional Parmap::next() -{ - unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); - if (index < this->data->size()) - return (*this->data)[index]; - else - return boost::none; -} - -/** - * \brief Main work loop: applies fun to elements in turn. - */ -template void Parmap::work() -{ - unsigned length = this->data->size(); - unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); - while (index < length) { - this->fun((*this->data)[index]); - index = this->index.fetch_add(1, std::memory_order_relaxed); - } -} - -/** - * Get a synchronization object for given mode. - * \param mode the synchronization mode - */ -template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_parmap_mode_t mode) -{ - if (mode == XBT_PARMAP_DEFAULT) { -#if HAVE_FUTEX_H - mode = XBT_PARMAP_FUTEX; -#else - mode = XBT_PARMAP_POSIX; -#endif - } - Synchro* res; - switch (mode) { - case XBT_PARMAP_POSIX: - res = new PosixSynchro(*this); - break; - case XBT_PARMAP_FUTEX: -#if HAVE_FUTEX_H - res = new FutexSynchro(*this); -#else - xbt_die("Futex is not available on this OS."); -#endif - break; - case XBT_PARMAP_BUSY_WAIT: - res = new BusyWaitSynchro(*this); - break; - default: - THROW_IMPOSSIBLE; - } - return res; -} - -/** - * \brief Main function of a worker thread. - */ -template void* Parmap::worker_main(void* arg) -{ - ThreadData* data = static_cast(arg); - Parmap& parmap = data->parmap; - unsigned round = 0; - smx_context_t context = SIMIX_context_new(std::function(), nullptr, nullptr); - SIMIX_context_set_current(context); - - XBT_CDEBUG(xbt_parmap, "New worker thread created"); - - /* Worker's main loop */ - while (1) { - round++; - parmap.synchro->worker_wait(round); - if (parmap.status == PARMAP_DESTROY) - break; - - XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id); - parmap.work(); - parmap.synchro->worker_signal(); - XBT_CDEBUG(xbt_parmap, "Worker %d has finished", data->worker_id); - } - /* We are destroying the parmap */ - delete context; - delete data; - return nullptr; -} - -template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) -{ - ready_cond = xbt_os_cond_init(); - ready_mutex = xbt_os_mutex_init(); - done_cond = xbt_os_cond_init(); - done_mutex = xbt_os_mutex_init(); -} - -template Parmap::PosixSynchro::~PosixSynchro() -{ - xbt_os_cond_destroy(ready_cond); - xbt_os_mutex_destroy(ready_mutex); - xbt_os_cond_destroy(done_cond); - xbt_os_mutex_destroy(done_mutex); -} - -template void Parmap::PosixSynchro::master_signal() -{ - xbt_os_mutex_acquire(ready_mutex); - this->parmap.thread_counter = 1; - this->parmap.work_round++; - /* wake all workers */ - xbt_os_cond_broadcast(ready_cond); - xbt_os_mutex_release(ready_mutex); -} - -template void Parmap::PosixSynchro::master_wait() -{ - xbt_os_mutex_acquire(done_mutex); - while (this->parmap.thread_counter < this->parmap.num_workers) { - /* wait for all workers to be ready */ - xbt_os_cond_wait(done_cond, done_mutex); - } - xbt_os_mutex_release(done_mutex); -} - -template void Parmap::PosixSynchro::worker_signal() -{ - xbt_os_mutex_acquire(done_mutex); - this->parmap.thread_counter++; - if (this->parmap.thread_counter == this->parmap.num_workers) { - /* all workers have finished, wake the controller */ - xbt_os_cond_signal(done_cond); - } - xbt_os_mutex_release(done_mutex); -} - -template void Parmap::PosixSynchro::worker_wait(unsigned round) -{ - xbt_os_mutex_acquire(ready_mutex); - /* wait for more work */ - while (this->parmap.work_round != round) { - xbt_os_cond_wait(ready_cond, ready_mutex); - } - xbt_os_mutex_release(ready_mutex); -} - -#if HAVE_FUTEX_H -template inline void Parmap::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val) -{ - XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr); - syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0); -} - -template inline void Parmap::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val) -{ - XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr); - syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0); -} - -template void Parmap::FutexSynchro::master_signal() -{ - __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); - __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST); - /* wake all workers */ - futex_wake(&this->parmap.work_round, std::numeric_limits::max()); -} - -template void Parmap::FutexSynchro::master_wait() -{ - unsigned count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST); - while (count < this->parmap.num_workers) { - /* wait for all workers to be ready */ - futex_wait(&this->parmap.thread_counter, count); - count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST); - } -} - -template void Parmap::FutexSynchro::worker_signal() -{ - unsigned count = __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); - if (count == this->parmap.num_workers) { - /* all workers have finished, wake the controller */ - futex_wake(&this->parmap.thread_counter, std::numeric_limits::max()); - } -} - -template void Parmap::FutexSynchro::worker_wait(unsigned round) -{ - unsigned work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST); - /* wait for more work */ - while (work_round != round) { - futex_wait(&this->parmap.work_round, work_round); - work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST); - } -} -#endif - -template void Parmap::BusyWaitSynchro::master_signal() -{ - __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); - __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST); -} - -template void Parmap::BusyWaitSynchro::master_wait() -{ - while (__atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST) < this->parmap.num_workers) { - xbt_os_thread_yield(); - } -} - -template void Parmap::BusyWaitSynchro::worker_signal() -{ - __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); -} - -template void Parmap::BusyWaitSynchro::worker_wait(unsigned round) -{ - /* wait for more work */ - while (__atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST) != round) { - xbt_os_thread_yield(); - } -} - -/** \} */ -} -} - -#endif