X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/f5792a3bf76ce15a573ae5e9c63097595ae5f2bd..420e66292bea56f21c29d230aa63c6bf6095db31:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp index 24affcc830..51f481805b 100644 --- a/src/include/xbt/parmap.hpp +++ b/src/include/xbt/parmap.hpp @@ -1,7 +1,6 @@ /* A thread pool (C++ version). */ -/* Copyright (c) 2004-2017 The SimGrid Team. - * All rights reserved. */ +/* Copyright (c) 2004-2019 The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -11,43 +10,45 @@ #include "src/internal_config.h" // HAVE_FUTEX_H #include "src/kernel/context/Context.hpp" -#include +#include "src/simix/smx_private.hpp" /* simix_global */ + #include -#include -#include -#include -#include -#include +#include +#include +#include +#include #if HAVE_FUTEX_H -#include #include #include #endif +#if HAVE_PTHREAD_NP_H +#include +#endif + XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap); namespace simgrid { namespace xbt { -/** \addtogroup XBT_parmap - * \ingroup XBT_misc - * \brief Parallel map class - * \{ - */ +/** @addtogroup XBT_parmap + * @ingroup XBT_misc + * @brief Parallel map class + * @{ + */ template class Parmap { public: Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode); Parmap(const Parmap&) = delete; + Parmap& operator=(const Parmap&) = delete; ~Parmap(); - void apply(void (*fun)(T), const std::vector& data); + void apply(std::function&& fun, const std::vector& data); boost::optional next(); private: - enum Flag { PARMAP_WORK, PARMAP_DESTROY }; - /** - * \brief Thread data transmission structure + * @brief Thread data transmission structure */ class ThreadData { public: @@ -57,40 +58,39 @@ private: }; /** - * \brief Synchronization object (different specializations). + * @brief Synchronization object (different specializations). */ class Synchro { public: explicit Synchro(Parmap& parmap) : parmap(parmap) {} - virtual ~Synchro() {} + virtual ~Synchro() = default; /** - * \brief Wakes all workers and waits for them to finish the tasks. + * @brief Wakes all workers and waits for them to finish the tasks. * * This function is called by the controller thread. */ - virtual void master_signal() = 0; + virtual void master_signal() = 0; /** - * \brief Starts the parmap: waits for all workers to be ready and returns. + * @brief Starts the parmap: waits for all workers to be ready and returns. * * This function is called by the controller thread. */ - virtual void master_wait() = 0; + virtual void master_wait() = 0; /** - * \brief Ends the parmap: wakes the controller thread when all workers terminate. + * @brief Ends the parmap: wakes the controller thread when all workers terminate. * * This function is called by all worker threads when they end (not including the controller). */ - virtual void worker_signal() = 0; + virtual void worker_signal() = 0; /** - * \brief Waits for some work to process. + * @brief Waits for some work to process. * * This function is called by each worker thread (not including the controller) when it has no more work to do. * - * \param round the expected round number + * @param round the expected round number */ virtual void worker_wait(unsigned) = 0; - protected: Parmap& parmap; }; @@ -98,134 +98,139 @@ private: public: explicit PosixSynchro(Parmap& parmap); ~PosixSynchro(); - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned round); + void master_signal() override; + void master_wait() override; + void worker_signal() override; + void worker_wait(unsigned round) override; private: - xbt_os_cond_t ready_cond; - xbt_os_mutex_t ready_mutex; - xbt_os_cond_t done_cond; - xbt_os_mutex_t done_mutex; + std::condition_variable ready_cond; + std::mutex ready_mutex; + std::condition_variable done_cond; + std::mutex done_mutex; }; #if HAVE_FUTEX_H class FutexSynchro : public Synchro { public: explicit FutexSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned); + void master_signal() override; + void master_wait() override; + void worker_signal() override; + void worker_wait(unsigned) override; private: - static void futex_wait(unsigned* uaddr, unsigned val); - static void futex_wake(unsigned* uaddr, unsigned val); + static void futex_wait(std::atomic_uint* uaddr, unsigned val); + static void futex_wake(std::atomic_uint* uaddr, unsigned val); }; #endif class BusyWaitSynchro : public Synchro { public: explicit BusyWaitSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned); + void master_signal() override; + void master_wait() override; + void worker_signal() override; + void worker_wait(unsigned) override; }; - static void* worker_main(void* arg); + static void worker_main(ThreadData* data); Synchro* new_synchro(e_xbt_parmap_mode_t mode); void work(); - Flag status; /**< is the parmap active or being destroyed? */ - unsigned work_round; /**< index of the current round */ - xbt_os_thread_t* workers; /**< worker thread handlers */ + bool destroying = false; /**< is the parmap being destroyed? */ + std::atomic_uint work_round{0}; /**< index of the current round */ + std::vector workers; /**< worker thread handlers */ unsigned num_workers; /**< total number of worker threads including the controller */ Synchro* synchro; /**< synchronization object */ - unsigned thread_counter = 0; /**< number of workers that have done the work */ - void (*fun)(const T) = nullptr; /**< function to run in parallel on each element of data */ + std::atomic_uint thread_counter{0}; /**< number of workers that have done the work */ + std::function fun; /**< function to run in parallel on each element of data */ const std::vector* data = nullptr; /**< parameters to pass to fun in parallel */ - std::atomic index; /**< index of the next element of data to pick */ + std::atomic_uint index{0}; /**< index of the next element of data to pick */ }; /** - * \brief Creates a parallel map object - * \param num_workers number of worker threads to create - * \param mode how to synchronize the worker threads + * @brief Creates a parallel map object + * @param num_workers number of worker threads to create + * @param mode how to synchronize the worker threads */ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode) { XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); /* Initialize the thread pool data structure */ - this->status = PARMAP_WORK; - this->work_round = 0; - this->workers = new xbt_os_thread_t[num_workers]; + this->workers.resize(num_workers); this->num_workers = num_workers; this->synchro = new_synchro(mode); - /* Create the pool of worker threads */ + /* Create the pool of worker threads (the caller of apply() will be worker[0]) */ this->workers[0] = nullptr; -#if HAVE_PTHREAD_SETAFFINITY - int core_bind = 0; -#endif + for (unsigned i = 1; i < num_workers; i++) { ThreadData* data = new ThreadData(*this, i); - this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr); + this->workers[i] = new std::thread(worker_main, data); + + /* Bind the worker to a core if possible */ #if HAVE_PTHREAD_SETAFFINITY - xbt_os_thread_bind(this->workers[i], core_bind); - if (core_bind != xbt_os_get_numcores() - 1) - core_bind++; - else - core_bind = 0; +#if HAVE_PTHREAD_NP_H /* FreeBSD ? */ + cpuset_t cpuset; + size_t size = sizeof(cpuset_t); +#else /* Linux ? */ + cpu_set_t cpuset; + size_t size = sizeof(cpu_set_t); +#endif + pthread_t pthread = this->workers[i]->native_handle(); + int core_bind = (i - 1) % std::thread::hardware_concurrency(); + CPU_ZERO(&cpuset); + CPU_SET(core_bind, &cpuset); + pthread_setaffinity_np(pthread, size, &cpuset); #endif } } /** - * \brief Destroys a parmap + * @brief Destroys a parmap */ template Parmap::~Parmap() { - status = PARMAP_DESTROY; + destroying = true; synchro->master_signal(); - for (unsigned i = 1; i < num_workers; i++) - xbt_os_thread_join(workers[i], nullptr); - - delete[] workers; + for (unsigned i = 1; i < num_workers; i++) { + workers[i]->join(); + delete workers[i]; + } delete synchro; } /** - * \brief Applies a list of tasks in parallel. - * \param fun the function to call in parallel - * \param data each element of this vector will be passed as an argument to fun + * @brief Applies a list of tasks in parallel. + * @param fun the function to call in parallel + * @param data each element of this vector will be passed as an argument to fun */ -template void Parmap::apply(void (*fun)(T), const std::vector& data) +template void Parmap::apply(std::function&& fun, const std::vector& data) { /* Assign resources to worker threads (we are maestro here)*/ - this->fun = fun; + this->fun = std::move(fun); this->data = &data; this->index = 0; - this->synchro->master_signal(); // maestro runs futex_wait to wake all the minions (the working threads) + this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads) this->work(); // maestro works with its minions this->synchro->master_wait(); // When there is no more work to do, then maestro waits for the last minion to stop XBT_CDEBUG(xbt_parmap, "Job done"); // ... and proceeds } /** - * \brief Returns a next task to process. + * @brief Returns a next task to process. * * Worker threads call this function to get more work. * - * \return the next task to process, or throws a std::out_of_range exception if there is no more work + * @return the next task to process, or throws a std::out_of_range exception if there is no more work */ template boost::optional Parmap::next() { - unsigned index = this->index++; + unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); if (index < this->data->size()) return (*this->data)[index]; else @@ -233,21 +238,21 @@ template boost::optional Parmap::next() } /** - * \brief Main work loop: applies fun to elements in turn. + * @brief Main work loop: applies fun to elements in turn. */ template void Parmap::work() { - unsigned index = this->index++; unsigned length = this->data->size(); + unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); while (index < length) { this->fun((*this->data)[index]); - index = this->index++; + index = this->index.fetch_add(1, std::memory_order_relaxed); } } /** * Get a synchronization object for given mode. - * \param mode the synchronization mode + * @param mode the synchronization mode */ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_parmap_mode_t mode) { @@ -267,7 +272,7 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ #if HAVE_FUTEX_H res = new FutexSynchro(*this); #else - xbt_die("Fute is not available on this OS."); + xbt_die("Futex is not available on this OS."); #endif break; case XBT_PARMAP_BUSY_WAIT: @@ -279,24 +284,21 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ return res; } -/** - * \brief Main function of a worker thread. - */ -template void* Parmap::worker_main(void* arg) +/** @brief Main function of a worker thread */ +template void Parmap::worker_main(ThreadData* data) { - ThreadData* data = static_cast(arg); Parmap& parmap = data->parmap; unsigned round = 0; - smx_context_t context = SIMIX_context_new(std::function(), nullptr, nullptr); - SIMIX_context_set_current(context); + kernel::context::Context* context = simix_global->context_factory->create_context(std::function(), nullptr); + kernel::context::Context::set_current(context); XBT_CDEBUG(xbt_parmap, "New worker thread created"); /* Worker's main loop */ while (1) { - round++; + round++; // New scheduling round parmap.synchro->worker_wait(round); - if (parmap.status == PARMAP_DESTROY) + if (parmap.destroying) break; XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id); @@ -307,74 +309,61 @@ template void* Parmap::worker_main(void* arg) /* We are destroying the parmap */ delete context; delete data; - return nullptr; } template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) { - ready_cond = xbt_os_cond_init(); - ready_mutex = xbt_os_mutex_init(); - done_cond = xbt_os_cond_init(); - done_mutex = xbt_os_mutex_init(); } template Parmap::PosixSynchro::~PosixSynchro() { - xbt_os_cond_destroy(ready_cond); - xbt_os_mutex_destroy(ready_mutex); - xbt_os_cond_destroy(done_cond); - xbt_os_mutex_destroy(done_mutex); } template void Parmap::PosixSynchro::master_signal() { - xbt_os_mutex_acquire(ready_mutex); + std::unique_lock lk(ready_mutex); this->parmap.thread_counter = 1; this->parmap.work_round++; /* wake all workers */ - xbt_os_cond_broadcast(ready_cond); - xbt_os_mutex_release(ready_mutex); + ready_cond.notify_all(); } template void Parmap::PosixSynchro::master_wait() { - xbt_os_mutex_acquire(done_mutex); - if (this->parmap.thread_counter < this->parmap.num_workers) { + std::unique_lock lk(done_mutex); + while (this->parmap.thread_counter < this->parmap.num_workers) { /* wait for all workers to be ready */ - xbt_os_cond_wait(done_cond, done_mutex); + done_cond.wait(lk); } - xbt_os_mutex_release(done_mutex); } template void Parmap::PosixSynchro::worker_signal() { - xbt_os_mutex_acquire(done_mutex); + std::unique_lock lk(done_mutex); this->parmap.thread_counter++; if (this->parmap.thread_counter == this->parmap.num_workers) { /* all workers have finished, wake the controller */ - xbt_os_cond_signal(done_cond); + done_cond.notify_one(); } - xbt_os_mutex_release(done_mutex); } template void Parmap::PosixSynchro::worker_wait(unsigned round) { - xbt_os_mutex_acquire(ready_mutex); + std::unique_lock lk(ready_mutex); /* wait for more work */ - if (this->parmap.work_round != round) { - xbt_os_cond_wait(ready_cond, ready_mutex); + while (this->parmap.work_round != round) { + ready_cond.wait(lk); } - xbt_os_mutex_release(ready_mutex); } #if HAVE_FUTEX_H -template inline void Parmap::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val) +template inline void Parmap::FutexSynchro::futex_wait(std::atomic_uint* uaddr, unsigned val) { XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr); syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0); } -template inline void Parmap::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val) +template inline void Parmap::FutexSynchro::futex_wake(std::atomic_uint* uaddr, unsigned val) { XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr); syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0); @@ -382,25 +371,25 @@ template inline void Parmap::FutexSynchro::futex_wake(unsigned* template void Parmap::FutexSynchro::master_signal() { - this->parmap.thread_counter = 1; - __sync_add_and_fetch(&this->parmap.work_round, 1); + this->parmap.thread_counter.store(1); + this->parmap.work_round.fetch_add(1); /* wake all workers */ futex_wake(&this->parmap.work_round, std::numeric_limits::max()); } template void Parmap::FutexSynchro::master_wait() { - unsigned count = this->parmap.thread_counter; + unsigned count = this->parmap.thread_counter.load(); while (count < this->parmap.num_workers) { /* wait for all workers to be ready */ futex_wait(&this->parmap.thread_counter, count); - count = this->parmap.thread_counter; + count = this->parmap.thread_counter.load(); } } template void Parmap::FutexSynchro::worker_signal() { - unsigned count = __sync_add_and_fetch(&this->parmap.thread_counter, 1); + unsigned count = this->parmap.thread_counter.fetch_add(1) + 1; if (count == this->parmap.num_workers) { /* all workers have finished, wake the controller */ futex_wake(&this->parmap.thread_counter, std::numeric_limits::max()); @@ -409,42 +398,42 @@ template void Parmap::FutexSynchro::worker_signal() template void Parmap::FutexSynchro::worker_wait(unsigned round) { - unsigned work_round = this->parmap.work_round; + unsigned work_round = this->parmap.work_round.load(); /* wait for more work */ while (work_round != round) { futex_wait(&this->parmap.work_round, work_round); - work_round = this->parmap.work_round; + work_round = this->parmap.work_round.load(); } } #endif template void Parmap::BusyWaitSynchro::master_signal() { - this->parmap.thread_counter = 1; - __sync_add_and_fetch(&this->parmap.work_round, 1); + this->parmap.thread_counter.store(1); + this->parmap.work_round.fetch_add(1); } template void Parmap::BusyWaitSynchro::master_wait() { - while (this->parmap.thread_counter < this->parmap.num_workers) { - xbt_os_thread_yield(); + while (this->parmap.thread_counter.load() < this->parmap.num_workers) { + std::this_thread::yield(); } } template void Parmap::BusyWaitSynchro::worker_signal() { - __sync_add_and_fetch(&this->parmap.thread_counter, 1); + this->parmap.thread_counter.fetch_add(1); } template void Parmap::BusyWaitSynchro::worker_wait(unsigned round) { /* wait for more work */ - while (this->parmap.work_round != round) { - xbt_os_thread_yield(); + while (this->parmap.work_round.load() != round) { + std::this_thread::yield(); } } -/** \} */ +/** @} */ } }