X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/983b978b89158ac187fab66595bd2fc236e5b1fc..HEAD:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp deleted file mode 100644 index de0177c01b..0000000000 --- a/src/include/xbt/parmap.hpp +++ /dev/null @@ -1,447 +0,0 @@ -/* A thread pool (C++ version). */ - -/* Copyright (c) 2004-2019 The SimGrid Team. All rights reserved. */ - -/* This program is free software; you can redistribute it and/or modify it - * under the terms of the license (GNU LGPL) which comes with this package. */ - -#ifndef XBT_PARMAP_HPP -#define XBT_PARMAP_HPP - -#include "src/internal_config.h" // HAVE_FUTEX_H -#include "src/kernel/context/Context.hpp" -#include "src/simix/smx_private.hpp" /* simix_global */ - -#include -#include -#include -#include -#include - -#if HAVE_FUTEX_H -#include -#include -#endif - -#if HAVE_PTHREAD_NP_H -#include -#endif - -XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap); - -namespace simgrid { -namespace xbt { - -/** @addtogroup XBT_parmap - * @ingroup XBT_misc - * @brief Parallel map class - * @{ - */ -template class Parmap { -public: - Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode); - Parmap(const Parmap&) = delete; - Parmap& operator=(const Parmap&) = delete; - ~Parmap(); - void apply(std::function&& fun, const std::vector& data); - boost::optional next(); - -private: - enum Flag { PARMAP_WORK, PARMAP_DESTROY }; - - /** - * @brief Thread data transmission structure - */ - class ThreadData { - public: - ThreadData(Parmap& parmap, int id) : parmap(parmap), worker_id(id) {} - Parmap& parmap; - int worker_id; - }; - - /** - * @brief Synchronization object (different specializations). - */ - class Synchro { - public: - explicit Synchro(Parmap& parmap) : parmap(parmap) {} - virtual ~Synchro() = default; - /** - * @brief Wakes all workers and waits for them to finish the tasks. - * - * This function is called by the controller thread. - */ - virtual void master_signal() = 0; - /** - * @brief Starts the parmap: waits for all workers to be ready and returns. - * - * This function is called by the controller thread. - */ - virtual void master_wait() = 0; - /** - * @brief Ends the parmap: wakes the controller thread when all workers terminate. - * - * This function is called by all worker threads when they end (not including the controller). - */ - virtual void worker_signal() = 0; - /** - * @brief Waits for some work to process. - * - * This function is called by each worker thread (not including the controller) when it has no more work to do. - * - * @param round the expected round number - */ - virtual void worker_wait(unsigned) = 0; - - Parmap& parmap; - }; - - class PosixSynchro : public Synchro { - public: - explicit PosixSynchro(Parmap& parmap); - ~PosixSynchro(); - void master_signal() override; - void master_wait() override; - void worker_signal() override; - void worker_wait(unsigned round) override; - - private: - std::condition_variable ready_cond; - std::mutex ready_mutex; - std::condition_variable done_cond; - std::mutex done_mutex; - }; - -#if HAVE_FUTEX_H - class FutexSynchro : public Synchro { - public: - explicit FutexSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal() override; - void master_wait() override; - void worker_signal() override; - void worker_wait(unsigned) override; - - private: - static void futex_wait(std::atomic_uint* uaddr, unsigned val); - static void futex_wake(std::atomic_uint* uaddr, unsigned val); - }; -#endif - - class BusyWaitSynchro : public Synchro { - public: - explicit BusyWaitSynchro(Parmap& parmap) : Synchro(parmap) {} - void master_signal() override; - void master_wait() override; - void worker_signal() override; - void worker_wait(unsigned) override; - }; - - static void worker_main(ThreadData* data); - Synchro* new_synchro(e_xbt_parmap_mode_t mode); - void work(); - - Flag status; /**< is the parmap active or being destroyed? */ - std::atomic_uint work_round; /**< index of the current round */ - std::vector workers; /**< worker thread handlers */ - unsigned num_workers; /**< total number of worker threads including the controller */ - Synchro* synchro; /**< synchronization object */ - - std::atomic_uint thread_counter{0}; /**< number of workers that have done the work */ - std::function fun; /**< function to run in parallel on each element of data */ - const std::vector* data = nullptr; /**< parameters to pass to fun in parallel */ - std::atomic_uint index; /**< index of the next element of data to pick */ -}; - -/** - * @brief Creates a parallel map object - * @param num_workers number of worker threads to create - * @param mode how to synchronize the worker threads - */ -template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode) -{ - XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); - - /* Initialize the thread pool data structure */ - this->status = PARMAP_WORK; - this->work_round = 0; - this->workers.resize(num_workers); - this->num_workers = num_workers; - this->synchro = new_synchro(mode); - - /* Create the pool of worker threads (the caller of apply() will be worker[0]) */ - this->workers[0] = nullptr; - XBT_ATTRIB_UNUSED unsigned int core_bind = 0; - - for (unsigned i = 1; i < num_workers; i++) { - this->workers[i] = new std::thread(worker_main, new ThreadData(*this, i)); - - /* Bind the worker to a core if possible */ -#if HAVE_PTHREAD_SETAFFINITY -#if HAVE_PTHREAD_NP_H /* FreeBSD ? */ - cpuset_t cpuset; - size_t size = sizeof(cpuset_t); -#else /* Linux ? */ - cpu_set_t cpuset; - size_t size = sizeof(cpu_set_t); -#endif - pthread_t pthread = this->workers[i]->native_handle(); - CPU_ZERO(&cpuset); - CPU_SET(core_bind, &cpuset); - pthread_setaffinity_np(pthread, size, &cpuset); - if (core_bind != std::thread::hardware_concurrency() - 1) - core_bind++; - else - core_bind = 0; -#endif - } -} - -/** - * @brief Destroys a parmap - */ -template Parmap::~Parmap() -{ - status = PARMAP_DESTROY; - synchro->master_signal(); - - for (unsigned i = 1; i < num_workers; i++) { - workers[i]->join(); - delete workers[i]; - } - delete synchro; -} - -/** - * @brief Applies a list of tasks in parallel. - * @param fun the function to call in parallel - * @param data each element of this vector will be passed as an argument to fun - */ -template void Parmap::apply(std::function&& fun, const std::vector& data) -{ - /* Assign resources to worker threads (we are maestro here)*/ - this->fun = std::move(fun); - this->data = &data; - this->index = 0; - this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads) - this->work(); // maestro works with its minions - this->synchro->master_wait(); // When there is no more work to do, then maestro waits for the last minion to stop - XBT_CDEBUG(xbt_parmap, "Job done"); // ... and proceeds -} - -/** - * @brief Returns a next task to process. - * - * Worker threads call this function to get more work. - * - * @return the next task to process, or throws a std::out_of_range exception if there is no more work - */ -template boost::optional Parmap::next() -{ - unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); - if (index < this->data->size()) - return (*this->data)[index]; - else - return boost::none; -} - -/** - * @brief Main work loop: applies fun to elements in turn. - */ -template void Parmap::work() -{ - unsigned length = this->data->size(); - unsigned index = this->index.fetch_add(1, std::memory_order_relaxed); - while (index < length) { - this->fun((*this->data)[index]); - index = this->index.fetch_add(1, std::memory_order_relaxed); - } -} - -/** - * Get a synchronization object for given mode. - * @param mode the synchronization mode - */ -template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_parmap_mode_t mode) -{ - if (mode == XBT_PARMAP_DEFAULT) { -#if HAVE_FUTEX_H - mode = XBT_PARMAP_FUTEX; -#else - mode = XBT_PARMAP_POSIX; -#endif - } - Synchro* res; - switch (mode) { - case XBT_PARMAP_POSIX: - res = new PosixSynchro(*this); - break; - case XBT_PARMAP_FUTEX: -#if HAVE_FUTEX_H - res = new FutexSynchro(*this); -#else - xbt_die("Futex is not available on this OS."); -#endif - break; - case XBT_PARMAP_BUSY_WAIT: - res = new BusyWaitSynchro(*this); - break; - default: - THROW_IMPOSSIBLE; - } - return res; -} - -/** @brief Main function of a worker thread */ -template void Parmap::worker_main(ThreadData* data) -{ - Parmap& parmap = data->parmap; - unsigned round = 0; - smx_context_t context = simix_global->context_factory->create_context(std::function(), nullptr); - kernel::context::Context::set_current(context); - - XBT_CDEBUG(xbt_parmap, "New worker thread created"); - - /* Worker's main loop */ - while (1) { - round++; // New scheduling round - parmap.synchro->worker_wait(round); - if (parmap.status == PARMAP_DESTROY) - break; - - XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id); - parmap.work(); - parmap.synchro->worker_signal(); - XBT_CDEBUG(xbt_parmap, "Worker %d has finished", data->worker_id); - } - /* We are destroying the parmap */ - delete context; - delete data; -} - -template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) -{ -} - -template Parmap::PosixSynchro::~PosixSynchro() -{ -} - -template void Parmap::PosixSynchro::master_signal() -{ - std::unique_lock lk(ready_mutex); - this->parmap.thread_counter = 1; - this->parmap.work_round++; - /* wake all workers */ - ready_cond.notify_all(); -} - -template void Parmap::PosixSynchro::master_wait() -{ - std::unique_lock lk(done_mutex); - while (this->parmap.thread_counter < this->parmap.num_workers) { - /* wait for all workers to be ready */ - done_cond.wait(lk); - } -} - -template void Parmap::PosixSynchro::worker_signal() -{ - std::unique_lock lk(done_mutex); - this->parmap.thread_counter++; - if (this->parmap.thread_counter == this->parmap.num_workers) { - /* all workers have finished, wake the controller */ - done_cond.notify_one(); - } -} - -template void Parmap::PosixSynchro::worker_wait(unsigned round) -{ - std::unique_lock lk(ready_mutex); - /* wait for more work */ - while (this->parmap.work_round != round) { - ready_cond.wait(lk); - } -} - -#if HAVE_FUTEX_H -template inline void Parmap::FutexSynchro::futex_wait(std::atomic_uint* uaddr, unsigned val) -{ - XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr); - syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0); -} - -template inline void Parmap::FutexSynchro::futex_wake(std::atomic_uint* uaddr, unsigned val) -{ - XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr); - syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0); -} - -template void Parmap::FutexSynchro::master_signal() -{ - this->parmap.thread_counter.store(1); - this->parmap.work_round.fetch_add(1); - /* wake all workers */ - futex_wake(&this->parmap.work_round, std::numeric_limits::max()); -} - -template void Parmap::FutexSynchro::master_wait() -{ - unsigned count = this->parmap.thread_counter.load(); - while (count < this->parmap.num_workers) { - /* wait for all workers to be ready */ - futex_wait(&this->parmap.thread_counter, count); - count = this->parmap.thread_counter.load(); - } -} - -template void Parmap::FutexSynchro::worker_signal() -{ - unsigned count = this->parmap.thread_counter.fetch_add(1) + 1; - if (count == this->parmap.num_workers) { - /* all workers have finished, wake the controller */ - futex_wake(&this->parmap.thread_counter, std::numeric_limits::max()); - } -} - -template void Parmap::FutexSynchro::worker_wait(unsigned round) -{ - unsigned work_round = this->parmap.work_round.load(); - /* wait for more work */ - while (work_round != round) { - futex_wait(&this->parmap.work_round, work_round); - work_round = this->parmap.work_round.load(); - } -} -#endif - -template void Parmap::BusyWaitSynchro::master_signal() -{ - this->parmap.thread_counter.store(1); - this->parmap.work_round.fetch_add(1); -} - -template void Parmap::BusyWaitSynchro::master_wait() -{ - while (this->parmap.thread_counter.load() < this->parmap.num_workers) { - std::this_thread::yield(); - } -} - -template void Parmap::BusyWaitSynchro::worker_signal() -{ - this->parmap.thread_counter.fetch_add(1); -} - -template void Parmap::BusyWaitSynchro::worker_wait(unsigned round) -{ - /* wait for more work */ - while (this->parmap.work_round.load() != round) { - std::this_thread::yield(); - } -} - -/** @} */ -} -} - -#endif