X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/08e7455d67920bbd7a87f440d00f2c1e071314a0..872cf95ab5a2b08aa1f2c6ebba29b9f86b0ba54e:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp index 1d23673478..f782f59a7c 100644 --- a/src/include/xbt/parmap.hpp +++ b/src/include/xbt/parmap.hpp @@ -1,6 +1,6 @@ /* A thread pool (C++ version). */ -/* Copyright (c) 2004-2021 The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2004-2022 The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -9,8 +9,8 @@ #define XBT_PARMAP_HPP #include "src/internal_config.h" // HAVE_FUTEX_H +#include "src/kernel/EngineImpl.hpp" #include "src/kernel/context/Context.hpp" -#include "src/simix/smx_private.hpp" /* simix_global */ #include #include @@ -29,8 +29,7 @@ XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap); -namespace simgrid { -namespace xbt { +namespace simgrid::xbt { /** @addtogroup XBT_parmap * @ingroup XBT_misc @@ -87,7 +86,7 @@ private: * * This function is called by each worker thread (not including the controller) when it has no more work to do. * - * @param round the expected round number + * @param expected_round the expected round number */ virtual void worker_wait(unsigned) = 0; @@ -100,7 +99,7 @@ private: void master_signal() override; void master_wait() override; void worker_signal() override; - void worker_wait(unsigned round) override; + void worker_wait(unsigned expected_round) override; private: std::condition_variable ready_cond; @@ -144,9 +143,9 @@ private: Synchro* synchro; /**< synchronization object */ std::atomic_uint thread_counter{0}; /**< number of workers that have done the work */ - std::function fun; /**< function to run in parallel on each element of data */ - const std::vector* data = nullptr; /**< parameters to pass to fun in parallel */ - std::atomic_uint index{0}; /**< index of the next element of data to pick */ + std::function worker_fun; /**< function to run in parallel on each element of data */ + const std::vector* common_data = nullptr; /**< parameters to pass to fun in parallel */ + std::atomic_uint common_index{0}; /**< index of the next element of data to pick */ }; /** @@ -154,21 +153,18 @@ private: * @param num_workers number of worker threads to create * @param mode how to synchronize the worker threads */ -template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode) +template +Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode) + : workers(num_workers), num_workers(num_workers), synchro(new_synchro(mode)) { XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); - /* Initialize the thread pool data structure */ - this->workers.resize(num_workers); - this->num_workers = num_workers; - this->synchro = new_synchro(mode); - /* Create the pool of worker threads (the caller of apply() will be worker[0]) */ - this->workers[0] = nullptr; + workers[0] = nullptr; for (unsigned i = 1; i < num_workers; i++) { - auto* data = new ThreadData(*this, i); - this->workers[i] = new std::thread(worker_main, data); + auto* data = new ThreadData(*this, i); + workers[i] = new std::thread(worker_main, data); /* Bind the worker to a core if possible */ #if HAVE_PTHREAD_SETAFFINITY @@ -179,7 +175,7 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ cpu_set_t cpuset; size_t size = sizeof(cpu_set_t); #endif - pthread_t pthread = this->workers[i]->native_handle(); + pthread_t pthread = workers[i]->native_handle(); int core_bind = (i - 1) % std::thread::hardware_concurrency(); CPU_ZERO(&cpuset); CPU_SET(core_bind, &cpuset); @@ -211,12 +207,12 @@ template Parmap::~Parmap() template void Parmap::apply(std::function&& fun, const std::vector& data) { /* Assign resources to worker threads (we are maestro here)*/ - this->fun = std::move(fun); - this->data = &data; - this->index = 0; - this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads) - this->work(); // maestro works with its minions - this->synchro->master_wait(); // When there is no more work to do, then maestro waits for the last minion to stop + worker_fun = std::move(fun); + common_data = &data; + common_index = 0; + synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads) + work(); // maestro works with its minions + synchro->master_wait(); // When there is no more work to do, then maestro waits for the last minion to stop XBT_CDEBUG(xbt_parmap, "Job done"); // ... and proceeds } @@ -229,9 +225,9 @@ template void Parmap::apply(std::function&& fun, const */ template boost::optional Parmap::next() { - unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); - if (index < this->data->size()) - return (*this->data)[index]; + unsigned index = common_index.fetch_add(1, std::memory_order_relaxed); + if (index < common_data->size()) + return (*common_data)[index]; else return boost::none; } @@ -241,11 +237,11 @@ template boost::optional Parmap::next() */ template void Parmap::work() { - unsigned length = this->data->size(); - unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); + unsigned length = static_cast(common_data->size()); + unsigned index = common_index.fetch_add(1, std::memory_order_relaxed); while (index < length) { - this->fun((*this->data)[index]); - index = this->index.fetch_add(1, std::memory_order_relaxed); + worker_fun((*common_data)[index]); + index = common_index.fetch_add(1, std::memory_order_relaxed); } } @@ -286,9 +282,10 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ /** @brief Main function of a worker thread */ template void Parmap::worker_main(ThreadData* data) { + auto engine = simgrid::kernel::EngineImpl::get_instance(); Parmap& parmap = data->parmap; unsigned round = 0; - kernel::context::Context* context = simix_global->context_factory->create_context(std::function(), nullptr); + kernel::context::Context* context = engine->get_context_factory()->create_context(std::function(), nullptr); kernel::context::Context::set_current(context); XBT_CDEBUG(xbt_parmap, "New worker thread created"); @@ -312,7 +309,7 @@ template void Parmap::worker_main(ThreadData* data) template void Parmap::PosixSynchro::master_signal() { - std::unique_lock lk(ready_mutex); + std::unique_lock lk(ready_mutex); this->parmap.thread_counter = 1; this->parmap.work_round++; /* wake all workers */ @@ -321,14 +318,14 @@ template void Parmap::PosixSynchro::master_signal() template void Parmap::PosixSynchro::master_wait() { - std::unique_lock lk(done_mutex); + std::unique_lock lk(done_mutex); /* wait for all workers to be ready */ done_cond.wait(lk, [this]() { return this->parmap.thread_counter >= this->parmap.num_workers; }); } template void Parmap::PosixSynchro::worker_signal() { - std::unique_lock lk(done_mutex); + std::unique_lock lk(done_mutex); this->parmap.thread_counter++; if (this->parmap.thread_counter == this->parmap.num_workers) { /* all workers have finished, wake the controller */ @@ -336,11 +333,11 @@ template void Parmap::PosixSynchro::worker_signal() } } -template void Parmap::PosixSynchro::worker_wait(unsigned round) +template void Parmap::PosixSynchro::worker_wait(unsigned expected_round) { - std::unique_lock lk(ready_mutex); + std::unique_lock lk(ready_mutex); /* wait for more work */ - ready_cond.wait(lk, [this, round]() { return this->parmap.work_round == round; }); + ready_cond.wait(lk, [this, expected_round]() { return this->parmap.work_round == expected_round; }); } #if HAVE_FUTEX_H @@ -383,13 +380,13 @@ template void Parmap::FutexSynchro::worker_signal() } } -template void Parmap::FutexSynchro::worker_wait(unsigned round) +template void Parmap::FutexSynchro::worker_wait(unsigned expected_round) { - unsigned work_round = this->parmap.work_round.load(); + unsigned round = this->parmap.work_round.load(); /* wait for more work */ - while (work_round != round) { - futex_wait(&this->parmap.work_round, work_round); - work_round = this->parmap.work_round.load(); + while (round != expected_round) { + futex_wait(&this->parmap.work_round, round); + round = this->parmap.work_round.load(); } } #endif @@ -421,7 +418,6 @@ template void Parmap::BusyWaitSynchro::worker_wait(unsigned roun } /** @} */ -} -} +} // namespace simgrid::xbt #endif