X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/4d648ebbbe5705878080b9cbf1ca61497323c592..e3cacf008b623e18404fd006ef7121dfc9b22a63:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp index ec4fdc81ac..5b9d5831a8 100644 --- a/src/include/xbt/parmap.hpp +++ b/src/include/xbt/parmap.hpp @@ -1,6 +1,6 @@ /* A thread pool (C++ version). */ -/* Copyright (c) 2004-2019 The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2004-2021 The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -9,8 +9,8 @@ #define XBT_PARMAP_HPP #include "src/internal_config.h" // HAVE_FUTEX_H +#include "src/kernel/EngineImpl.hpp" #include "src/kernel/context/Context.hpp" -#include "src/simix/smx_private.hpp" /* simix_global */ #include #include @@ -96,8 +96,7 @@ private: class PosixSynchro : public Synchro { public: - explicit PosixSynchro(Parmap& parmap); - ~PosixSynchro(); + explicit PosixSynchro(Parmap& parmap) : Synchro(parmap) {} void master_signal() override; void master_wait() override; void worker_signal() override; @@ -138,8 +137,8 @@ private: Synchro* new_synchro(e_xbt_parmap_mode_t mode); void work(); - bool destroying; /**< is the parmap being destroyed? */ - std::atomic_uint work_round; /**< index of the current round */ + bool destroying = false; /**< is the parmap being destroyed? */ + std::atomic_uint work_round{0}; /**< index of the current round */ std::vector workers; /**< worker thread handlers */ unsigned num_workers; /**< total number of worker threads including the controller */ Synchro* synchro; /**< synchronization object */ @@ -147,7 +146,7 @@ private: std::atomic_uint thread_counter{0}; /**< number of workers that have done the work */ std::function fun; /**< function to run in parallel on each element of data */ const std::vector* data = nullptr; /**< parameters to pass to fun in parallel */ - std::atomic_uint index; /**< index of the next element of data to pick */ + std::atomic_uint index{0}; /**< index of the next element of data to pick */ }; /** @@ -160,18 +159,16 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); /* Initialize the thread pool data structure */ - this->destroying = false; - this->work_round = 0; this->workers.resize(num_workers); this->num_workers = num_workers; this->synchro = new_synchro(mode); /* Create the pool of worker threads (the caller of apply() will be worker[0]) */ this->workers[0] = nullptr; - XBT_ATTRIB_UNUSED unsigned int core_bind = 0; for (unsigned i = 1; i < num_workers; i++) { - this->workers[i] = new std::thread(worker_main, new ThreadData(*this, i)); + auto* data = new ThreadData(*this, i); + this->workers[i] = new std::thread(worker_main, data); /* Bind the worker to a core if possible */ #if HAVE_PTHREAD_SETAFFINITY @@ -183,13 +180,10 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ size_t size = sizeof(cpu_set_t); #endif pthread_t pthread = this->workers[i]->native_handle(); + int core_bind = (i - 1) % std::thread::hardware_concurrency(); CPU_ZERO(&cpuset); CPU_SET(core_bind, &cpuset); pthread_setaffinity_np(pthread, size, &cpuset); - if (core_bind != std::thread::hardware_concurrency() - 1) - core_bind++; - else - core_bind = 0; #endif } } @@ -292,15 +286,16 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ /** @brief Main function of a worker thread */ template void Parmap::worker_main(ThreadData* data) { + auto engine = simgrid::kernel::EngineImpl::get_instance(); Parmap& parmap = data->parmap; unsigned round = 0; - kernel::context::Context* context = simix_global->context_factory->create_context(std::function(), nullptr); + kernel::context::Context* context = engine->get_context_factory()->create_context(std::function(), nullptr); kernel::context::Context::set_current(context); XBT_CDEBUG(xbt_parmap, "New worker thread created"); /* Worker's main loop */ - while (1) { + while (true) { round++; // New scheduling round parmap.synchro->worker_wait(round); if (parmap.destroying) @@ -316,14 +311,6 @@ template void Parmap::worker_main(ThreadData* data) delete data; } -template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) -{ -} - -template Parmap::PosixSynchro::~PosixSynchro() -{ -} - template void Parmap::PosixSynchro::master_signal() { std::unique_lock lk(ready_mutex); @@ -336,10 +323,8 @@ template void Parmap::PosixSynchro::master_signal() template void Parmap::PosixSynchro::master_wait() { std::unique_lock lk(done_mutex); - while (this->parmap.thread_counter < this->parmap.num_workers) { - /* wait for all workers to be ready */ - done_cond.wait(lk); - } + /* wait for all workers to be ready */ + done_cond.wait(lk, [this]() { return this->parmap.thread_counter >= this->parmap.num_workers; }); } template void Parmap::PosixSynchro::worker_signal() @@ -356,9 +341,7 @@ template void Parmap::PosixSynchro::worker_wait(unsigned round) { std::unique_lock lk(ready_mutex); /* wait for more work */ - while (this->parmap.work_round != round) { - ready_cond.wait(lk); - } + ready_cond.wait(lk, [this, round]() { return this->parmap.work_round == round; }); } #if HAVE_FUTEX_H