From: Arnaud Giersch Date: Thu, 3 Aug 2017 11:56:50 +0000 (+0200) Subject: Port xbt_parmap to C++. X-Git-Tag: v3_17~270 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/2813149944d9088300f26fbeac1645e15fd7dfcc Port xbt_parmap to C++. --- diff --git a/doc/Doxyfile.in b/doc/Doxyfile.in index 2ec75ddba2..500cbe435e 100644 --- a/doc/Doxyfile.in +++ b/doc/Doxyfile.in @@ -689,6 +689,7 @@ INPUT = doxygen/index.doc \ @CMAKE_HOME_DIRECTORY@/include/simgrid \ @CMAKE_HOME_DIRECTORY@/include/simgrid/s4u \ @CMAKE_HOME_DIRECTORY@/src/include/surf \ + @CMAKE_HOME_DIRECTORY@/src/include/xbt \ @CMAKE_HOME_DIRECTORY@/src/msg/ \ @CMAKE_HOME_DIRECTORY@/src/kernel/ \ @CMAKE_HOME_DIRECTORY@/src/kernel/activity/ \ diff --git a/include/xbt/parmap.h b/include/xbt/parmap.h index 905b12c16f..2b75fcd687 100644 --- a/include/xbt/parmap.h +++ b/include/xbt/parmap.h @@ -1,6 +1,6 @@ /* A thread pool. */ -/* Copyright (c) 2007, 2009-2014. The SimGrid Team. +/* Copyright (c) 2007, 2009-2014, 2016-2017. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it @@ -19,11 +19,11 @@ SG_BEGIN_DECL() * \ingroup XBT_misc * \brief Parallel map. * - * A function is applied to all elements of a dynar in parallel with n worker threads. - * The worker threads are persistent until the destruction of the parmap. + * A function is applied to all elements of a std::vector in parallel with n worker threads. The worker threads are + * persistent until the destruction of the parmap. * - * If there are more than n elements in the dynar, the worker threads are allowed to fetch themselves remaining work - * with xbt_parmap_next() and execute it. + * If there are more than n elements in the vector, the worker threads are allowed to fetch themselves remaining work + * with method next() and execute it. * * \{ */ diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp new file mode 100644 index 0000000000..be2c81a67a --- /dev/null +++ b/src/include/xbt/parmap.hpp @@ -0,0 +1,449 @@ +/* A thread pool (C++ version). */ + +/* Copyright (c) 2004-2017 The SimGrid Team. + * All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef XBT_PARMAP_HPP +#define XBT_PARMAP_HPP + +#include "src/internal_config.h" // HAVE_FUTEX_H +#include "src/kernel/context/Context.hpp" +#include +#include +#include +#include +#include +#include +#include + +#if HAVE_FUTEX_H +#include +#include +#include +#endif + +XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap); + +namespace simgrid { +namespace xbt { + +/** \addtogroup XBT_parmap + * \ingroup XBT_misc + * \brief Parallel map class + * \{ + */ +template class Parmap { +public: + Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode); + ~Parmap(); + void apply(void (*fun)(T), const std::vector& data); + boost::optional next(); + +private: + enum Flag { PARMAP_WORK, PARMAP_DESTROY }; + + /** + * \brief Thread data transmission structure + */ + class ThreadData { + public: + ThreadData(Parmap& parmap, int id) : parmap(parmap), worker_id(id) {} + Parmap& parmap; + int worker_id; + }; + + /** + * \brief Synchronization object (different specializations). + */ + class Synchro { + public: + Synchro(Parmap& parmap) : parmap(parmap) {} + virtual ~Synchro() {} + /** + * \brief Wakes all workers and waits for them to finish the tasks. + * + * This function is called by the controller thread. + */ + virtual void master_signal() = 0; + /** + * \brief Starts the parmap: waits for all workers to be ready and returns. + * + * This function is called by the controller thread. + */ + virtual void master_wait() = 0; + /** + * \brief Ends the parmap: wakes the controller thread when all workers terminate. + * + * This function is called by all worker threads when they end (not including the controller). + */ + virtual void worker_signal() = 0; + /** + * \brief Waits for some work to process. + * + * This function is called by each worker thread (not including the controller) when it has no more work to do. + * + * \param round the expected round number + */ + virtual void worker_wait(unsigned) = 0; + + protected: + Parmap& parmap; + }; + + class PosixSynchro : public Synchro { + public: + PosixSynchro(Parmap& parmap); + ~PosixSynchro(); + void master_signal(); + void master_wait(); + void worker_signal(); + void worker_wait(unsigned round); + + private: + xbt_os_cond_t ready_cond; + xbt_os_mutex_t ready_mutex; + xbt_os_cond_t done_cond; + xbt_os_mutex_t done_mutex; + }; + +#if HAVE_FUTEX_H + class FutexSynchro : public Synchro { + public: + FutexSynchro(Parmap& parmap) : Synchro(parmap) {} + void master_signal(); + void master_wait(); + void worker_signal(); + void worker_wait(unsigned); + + private: + static void futex_wait(unsigned* uaddr, unsigned val); + static void futex_wake(unsigned* uaddr, unsigned val); + }; +#endif + + class BusyWaitSynchro : public Synchro { + public: + BusyWaitSynchro(Parmap& parmap) : Synchro(parmap) {} + void master_signal(); + void master_wait(); + void worker_signal(); + void worker_wait(unsigned); + }; + + static void* worker_main(void* arg); + Synchro* new_synchro(e_xbt_parmap_mode_t mode); + void work(); + + Flag status; /**< is the parmap active or being destroyed? */ + unsigned work_round; /**< index of the current round */ + unsigned thread_counter; /**< number of workers that have done the work */ + unsigned num_workers; /**< total number of worker threads including the controller */ + xbt_os_thread_t* workers; /**< worker thread handlers */ + void (*fun)(const T); /**< function to run in parallel on each element of data */ + const std::vector* data; /**< parameters to pass to fun in parallel */ + std::atomic index; /**< index of the next element of data to pick */ + Synchro* synchro; /**< synchronization object */ +}; + +/** + * \brief Creates a parallel map object + * \param num_workers number of worker threads to create + * \param mode how to synchronize the worker threads + */ +template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode) +{ + XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); + + /* Initialize the thread pool data structure */ + this->status = PARMAP_WORK; + this->work_round = 0; + this->workers = new xbt_os_thread_t[num_workers]; + this->num_workers = num_workers; + this->synchro = new_synchro(mode); + + /* Create the pool of worker threads */ + this->workers[0] = nullptr; +#if HAVE_PTHREAD_SETAFFINITY + int core_bind = 0; +#endif + for (unsigned i = 1; i < num_workers; i++) { + ThreadData* data = new ThreadData(*this, i); + this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr); +#if HAVE_PTHREAD_SETAFFINITY + xbt_os_thread_bind(this->workers[i], core_bind); + if (core_bind != xbt_os_get_numcores() - 1) + core_bind++; + else + core_bind = 0; +#endif + } +} + +/** + * \brief Destroys a parmap + */ +template Parmap::~Parmap() +{ + status = PARMAP_DESTROY; + synchro->master_signal(); + + for (unsigned i = 1; i < num_workers; i++) + xbt_os_thread_join(workers[i], nullptr); + + delete[] workers; + delete synchro; +} + +/** + * \brief Applies a list of tasks in parallel. + * \param fun the function to call in parallel + * \param data each element of this vector will be passed as an argument to fun + */ +template void Parmap::apply(void (*fun)(T), const std::vector& data) +{ + /* Assign resources to worker threads (we are maestro here)*/ + this->fun = fun; + this->data = &data; + this->index = 0; + this->synchro->master_signal(); // maestro runs futex_wait to wake all the minions (the working threads) + this->work(); // maestro works with its minions + this->synchro->master_wait(); // When there is no more work to do, then maestro waits for the last minion to stop + XBT_CDEBUG(xbt_parmap, "Job done"); // ... and proceeds +} + +/** + * \brief Returns a next task to process. + * + * Worker threads call this function to get more work. + * + * \return the next task to process, or throws a std::out_of_range exception if there is no more work + */ +template boost::optional Parmap::next() +{ + unsigned index = this->index++; + if (index < this->data->size()) + return (*this->data)[index]; + else + return boost::none; +} + +/** + * \brief Main work loop: applies fun to elements in turn. + */ +template void Parmap::work() +{ + unsigned index = this->index++; + unsigned length = this->data->size(); + while (index < length) { + this->fun((*this->data)[index]); + index = this->index++; + } +} + +/** + * Get a synchronization object for given mode. + * \param mode the synchronization mode + */ +template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_parmap_mode_t mode) +{ + if (mode == XBT_PARMAP_DEFAULT) { +#if HAVE_FUTEX_H + mode = XBT_PARMAP_FUTEX; +#else + mode = XBT_PARMAP_POSIX; +#endif + } + Synchro* res; + switch (mode) { + case XBT_PARMAP_POSIX: + res = new PosixSynchro(*this); + break; + case XBT_PARMAP_FUTEX: +#if HAVE_FUTEX_H + res = new FutexSynchro(*this); +#else + xbt_die("Fute is not available on this OS."); +#endif + break; + case XBT_PARMAP_BUSY_WAIT: + res = new BusyWaitSynchro(*this); + break; + default: + THROW_IMPOSSIBLE; + } + return res; +} + +/** + * \brief Main function of a worker thread. + */ +template void* Parmap::worker_main(void* arg) +{ + ThreadData* data = static_cast(arg); + Parmap& parmap = data->parmap; + unsigned round = 0; + smx_context_t context = SIMIX_context_new(std::function(), nullptr, nullptr); + SIMIX_context_set_current(context); + + XBT_CDEBUG(xbt_parmap, "New worker thread created"); + + /* Worker's main loop */ + while (1) { + round++; + parmap.synchro->worker_wait(round); + if (parmap.status == PARMAP_DESTROY) + break; + + XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id); + parmap.work(); + parmap.synchro->worker_signal(); + XBT_CDEBUG(xbt_parmap, "Worker %d has finished", data->worker_id); + } + /* We are destroying the parmap */ + delete context; + delete data; + return nullptr; +} + +template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) +{ + ready_cond = xbt_os_cond_init(); + ready_mutex = xbt_os_mutex_init(); + done_cond = xbt_os_cond_init(); + done_mutex = xbt_os_mutex_init(); +} + +template Parmap::PosixSynchro::~PosixSynchro() +{ + xbt_os_cond_destroy(ready_cond); + xbt_os_mutex_destroy(ready_mutex); + xbt_os_cond_destroy(done_cond); + xbt_os_mutex_destroy(done_mutex); +} + +template void Parmap::PosixSynchro::master_signal() +{ + xbt_os_mutex_acquire(ready_mutex); + this->parmap.thread_counter = 1; + this->parmap.work_round++; + /* wake all workers */ + xbt_os_cond_broadcast(ready_cond); + xbt_os_mutex_release(ready_mutex); +} + +template void Parmap::PosixSynchro::master_wait() +{ + xbt_os_mutex_acquire(done_mutex); + if (this->parmap.thread_counter < this->parmap.num_workers) { + /* wait for all workers to be ready */ + xbt_os_cond_wait(done_cond, done_mutex); + } + xbt_os_mutex_release(done_mutex); +} + +template void Parmap::PosixSynchro::worker_signal() +{ + xbt_os_mutex_acquire(done_mutex); + this->parmap.thread_counter++; + if (this->parmap.thread_counter == this->parmap.num_workers) { + /* all workers have finished, wake the controller */ + xbt_os_cond_signal(done_cond); + } + xbt_os_mutex_release(done_mutex); +} + +template void Parmap::PosixSynchro::worker_wait(unsigned round) +{ + xbt_os_mutex_acquire(ready_mutex); + /* wait for more work */ + if (this->parmap.work_round != round) { + xbt_os_cond_wait(ready_cond, ready_mutex); + } + xbt_os_mutex_release(ready_mutex); +} + +#if HAVE_FUTEX_H +template inline void Parmap::FutexSynchro::futex_wait(unsigned* uaddr, unsigned val) +{ + XBT_CVERB(xbt_parmap, "Waiting on futex %p", uaddr); + syscall(SYS_futex, uaddr, FUTEX_WAIT_PRIVATE, val, nullptr, nullptr, 0); +} + +template inline void Parmap::FutexSynchro::futex_wake(unsigned* uaddr, unsigned val) +{ + XBT_CVERB(xbt_parmap, "Waking futex %p", uaddr); + syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, nullptr, nullptr, 0); +} + +template void Parmap::FutexSynchro::master_signal() +{ + this->parmap.thread_counter = 1; + __sync_add_and_fetch(&this->parmap.work_round, 1); + /* wake all workers */ + futex_wake(&this->parmap.work_round, std::numeric_limits::max()); +} + +template void Parmap::FutexSynchro::master_wait() +{ + unsigned count = this->parmap.thread_counter; + while (count < this->parmap.num_workers) { + /* wait for all workers to be ready */ + futex_wait(&this->parmap.thread_counter, count); + count = this->parmap.thread_counter; + } +} + +template void Parmap::FutexSynchro::worker_signal() +{ + unsigned count = __sync_add_and_fetch(&this->parmap.thread_counter, 1); + if (count == this->parmap.num_workers) { + /* all workers have finished, wake the controller */ + futex_wake(&this->parmap.thread_counter, std::numeric_limits::max()); + } +} + +template void Parmap::FutexSynchro::worker_wait(unsigned round) +{ + unsigned work_round = this->parmap.work_round; + /* wait for more work */ + while (work_round != round) { + futex_wait(&this->parmap.work_round, work_round); + work_round = this->parmap.work_round; + } +} +#endif + +template void Parmap::BusyWaitSynchro::master_signal() +{ + this->parmap.thread_counter = 1; + __sync_add_and_fetch(&this->parmap.work_round, 1); +} + +template void Parmap::BusyWaitSynchro::master_wait() +{ + while (this->parmap.thread_counter < this->parmap.num_workers) { + xbt_os_thread_yield(); + } +} + +template void Parmap::BusyWaitSynchro::worker_signal() +{ + __sync_add_and_fetch(&this->parmap.thread_counter, 1); +} + +template void Parmap::BusyWaitSynchro::worker_wait(unsigned round) +{ + /* wait for more work */ + while (this->parmap.work_round != round) { + xbt_os_thread_yield(); + } +} + +/** \} */ +} +} + +#endif diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index 958684d51b..d0e01374e4 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -5,14 +5,15 @@ set(EXTRA_DIST src/include/instr/instr_interface.h src/include/mc/datatypes.h src/include/mc/mc.h - src/mc/mc_mmu.h - src/mc/PageStore.hpp - src/mc/mc_record.h src/include/simgrid/sg_config.h src/include/smpi/smpi_utils.hpp src/include/surf/datatypes.h src/include/surf/maxmin.h src/include/surf/surf.h + src/include/xbt/parmap.hpp + src/mc/mc_mmu.h + src/mc/mc_record.h + src/mc/PageStore.hpp src/msg/msg_private.h src/simdag/dax.dtd src/simdag/dax_dtd.c