X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/3f751bc729c137138174208b9b296879c9b43807..e3cacf008b623e18404fd006ef7121dfc9b22a63:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp index b82c126914..5b9d5831a8 100644 --- a/src/include/xbt/parmap.hpp +++ b/src/include/xbt/parmap.hpp @@ -1,6 +1,6 @@ /* A thread pool (C++ version). */ -/* Copyright (c) 2004-2019 The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2004-2021 The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -9,11 +9,12 @@ #define XBT_PARMAP_HPP #include "src/internal_config.h" // HAVE_FUTEX_H +#include "src/kernel/EngineImpl.hpp" #include "src/kernel/context/Context.hpp" -#include "xbt/xbt_os_thread.h" #include #include +#include #include #include @@ -42,12 +43,10 @@ public: Parmap(const Parmap&) = delete; Parmap& operator=(const Parmap&) = delete; ~Parmap(); - void apply(void (*fun)(T), const std::vector& data); + void apply(std::function&& fun, const std::vector& data); boost::optional next(); private: - enum Flag { PARMAP_WORK, PARMAP_DESTROY }; - /** * @brief Thread data transmission structure */ @@ -97,12 +96,11 @@ private: class PosixSynchro : public Synchro { public: - explicit PosixSynchro(Parmap& parmap); - ~PosixSynchro(); - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned round); + explicit PosixSynchro(Parmap& parmap) : Synchro(parmap) {} + void master_signal() override; + void master_wait() override; + void worker_signal() override; + void worker_wait(unsigned round) override; private: std::condition_variable ready_cond; @@ -115,40 +113,40 @@ private: class FutexSynchro : public Synchro { public: explicit FutexSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned); + void master_signal() override; + void master_wait() override; + void worker_signal() override; + void worker_wait(unsigned) override; private: - static void futex_wait(unsigned* uaddr, unsigned val); - static void futex_wake(unsigned* uaddr, unsigned val); + static void futex_wait(std::atomic_uint* uaddr, unsigned val); + static void futex_wake(std::atomic_uint* uaddr, unsigned val); }; #endif class BusyWaitSynchro : public Synchro { public: explicit BusyWaitSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal(); - void master_wait(); - void worker_signal(); - void worker_wait(unsigned); + void master_signal() override; + void master_wait() override; + void worker_signal() override; + void worker_wait(unsigned) override; }; - static void* worker_main(void* arg); + static void worker_main(ThreadData* data); Synchro* new_synchro(e_xbt_parmap_mode_t mode); void work(); - Flag status; /**< is the parmap active or being destroyed? */ - unsigned work_round; /**< index of the current round */ + bool destroying = false; /**< is the parmap being destroyed? */ + std::atomic_uint work_round{0}; /**< index of the current round */ std::vector workers; /**< worker thread handlers */ unsigned num_workers; /**< total number of worker threads including the controller */ Synchro* synchro; /**< synchronization object */ - unsigned thread_counter = 0; /**< number of workers that have done the work */ - void (*fun)(const T) = nullptr; /**< function to run in parallel on each element of data */ + std::atomic_uint thread_counter{0}; /**< number of workers that have done the work */ + std::function fun; /**< function to run in parallel on each element of data */ const std::vector* data = nullptr; /**< parameters to pass to fun in parallel */ - std::atomic index; /**< index of the next element of data to pick */ + std::atomic_uint index{0}; /**< index of the next element of data to pick */ }; /** @@ -161,18 +159,16 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); /* Initialize the thread pool data structure */ - this->status = PARMAP_WORK; - this->work_round = 0; - this->workers.reserve(num_workers); + this->workers.resize(num_workers); this->num_workers = num_workers; this->synchro = new_synchro(mode); /* Create the pool of worker threads (the caller of apply() will be worker[0]) */ this->workers[0] = nullptr; - XBT_ATTRIB_UNUSED unsigned int core_bind = 0; for (unsigned i = 1; i < num_workers; i++) { - this->workers[i] = new std::thread(worker_main, new ThreadData(*this, i)); + auto* data = new ThreadData(*this, i); + this->workers[i] = new std::thread(worker_main, data); /* Bind the worker to a core if possible */ #if HAVE_PTHREAD_SETAFFINITY @@ -184,13 +180,10 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ size_t size = sizeof(cpu_set_t); #endif pthread_t pthread = this->workers[i]->native_handle(); + int core_bind = (i - 1) % std::thread::hardware_concurrency(); CPU_ZERO(&cpuset); CPU_SET(core_bind, &cpuset); pthread_setaffinity_np(pthread, size, &cpuset); - if (core_bind != std::thread::hardware_concurrency() - 1) - core_bind++; - else - core_bind = 0; #endif } } @@ -200,13 +193,13 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ */ template Parmap::~Parmap() { - status = PARMAP_DESTROY; + destroying = true; synchro->master_signal(); - for (unsigned i = 1; i < num_workers; i++) + for (unsigned i = 1; i < num_workers; i++) { workers[i]->join(); - - workers.clear(); + delete workers[i]; + } delete synchro; } @@ -215,10 +208,10 @@ template Parmap::~Parmap() * @param fun the function to call in parallel * @param data each element of this vector will be passed as an argument to fun */ -template void Parmap::apply(void (*fun)(T), const std::vector& data) +template void Parmap::apply(std::function&& fun, const std::vector& data) { /* Assign resources to worker threads (we are maestro here)*/ - this->fun = fun; + this->fun = std::move(fun); this->data = &data; this->index = 0; this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads) @@ -291,21 +284,21 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ } /** @brief Main function of a worker thread */ -template void* Parmap::worker_main(void* arg) +template void Parmap::worker_main(ThreadData* data) { - ThreadData* data = static_cast(arg); + auto engine = simgrid::kernel::EngineImpl::get_instance(); Parmap& parmap = data->parmap; unsigned round = 0; - smx_context_t context = SIMIX_context_new(std::function(), nullptr, nullptr); + kernel::context::Context* context = engine->get_context_factory()->create_context(std::function(), nullptr); kernel::context::Context::set_current(context); XBT_CDEBUG(xbt_parmap, "New worker thread created"); /* Worker's main loop */ - while (1) { + while (true) { round++; // New scheduling round parmap.synchro->worker_wait(round); - if (parmap.status == PARMAP_DESTROY) + if (parmap.destroying) break; XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id); @@ -316,15 +309,6 @@ template void* Parmap::worker_main(void* arg) /* We are destroying the parmap */ delete context; delete data; - return nullptr; -} - -template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) -{ -} - -template Parmap::PosixSynchro::~PosixSynchro() -{ } template void Parmap::PosixSynchro::master_signal() @@ -339,10 +323,8 @@ template void Parmap::PosixSynchro::master_signal() template void Parmap::PosixSynchro::master_wait() { std::unique_lock lk(done_mutex); - while (this->parmap.thread_counter < this->parmap.num_workers) { - /* wait for all workers to be ready */ - done_cond.wait(lk); - } + /* wait for all workers to be ready */ + done_cond.wait(lk, [this]() { return this->parmap.thread_counter >= this->parmap.num_workers; }); } template void Parmap::PosixSynchro::worker_signal() @@ -359,19 +341,17 @@ template void Parmap::PosixSynchro::worker_wait(unsigned round) { std::unique_lock lk(ready_mutex); /* wait for more work */ - while (this->parmap.work_round != round) { - ready_cond.wait(lk); - } + ready_cond.wait(lk, [this, round]() { return this->parmap.work_round == round; }); } #if HAVE_FUTEX_H -template inline void Parmap::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val) +template inline void Parmap::FutexSynchro::futex_wait(std::atomic_uint* uaddr, unsigned val) { XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr); syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0); } -template inline void Parmap::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val) +template inline void Parmap::FutexSynchro::futex_wake(std::atomic_uint* uaddr, unsigned val) { XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr); syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0); @@ -379,25 +359,25 @@ template inline void Parmap::FutexSynchro::futex_wake(unsigned* template void Parmap::FutexSynchro::master_signal() { - __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); - __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST); + this->parmap.thread_counter.store(1); + this->parmap.work_round.fetch_add(1); /* wake all workers */ futex_wake(&this->parmap.work_round, std::numeric_limits::max()); } template void Parmap::FutexSynchro::master_wait() { - unsigned count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST); + unsigned count = this->parmap.thread_counter.load(); while (count < this->parmap.num_workers) { /* wait for all workers to be ready */ futex_wait(&this->parmap.thread_counter, count); - count = __atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST); + count = this->parmap.thread_counter.load(); } } template void Parmap::FutexSynchro::worker_signal() { - unsigned count = __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); + unsigned count = this->parmap.thread_counter.fetch_add(1) + 1; if (count == this->parmap.num_workers) { /* all workers have finished, wake the controller */ futex_wake(&this->parmap.thread_counter, std::numeric_limits::max()); @@ -406,37 +386,37 @@ template void Parmap::FutexSynchro::worker_signal() template void Parmap::FutexSynchro::worker_wait(unsigned round) { - unsigned work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST); + unsigned work_round = this->parmap.work_round.load(); /* wait for more work */ while (work_round != round) { futex_wait(&this->parmap.work_round, work_round); - work_round = __atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST); + work_round = this->parmap.work_round.load(); } } #endif template void Parmap::BusyWaitSynchro::master_signal() { - __atomic_store_n(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); - __atomic_add_fetch(&this->parmap.work_round, 1, __ATOMIC_SEQ_CST); + this->parmap.thread_counter.store(1); + this->parmap.work_round.fetch_add(1); } template void Parmap::BusyWaitSynchro::master_wait() { - while (__atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST) < this->parmap.num_workers) { + while (this->parmap.thread_counter.load() < this->parmap.num_workers) { std::this_thread::yield(); } } template void Parmap::BusyWaitSynchro::worker_signal() { - __atomic_add_fetch(&this->parmap.thread_counter, 1, __ATOMIC_SEQ_CST); + this->parmap.thread_counter.fetch_add(1); } template void Parmap::BusyWaitSynchro::worker_wait(unsigned round) { /* wait for more work */ - while (__atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST) != round) { + while (this->parmap.work_round.load() != round) { std::this_thread::yield(); } }