X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/a1fcc0c0597c993b03448f6244bbdfef3c6850e4..d18416a04b075e719e3860a11292b6ac98546949:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp index 1bde553e5c..f37540bbcb 100644 --- a/src/include/xbt/parmap.hpp +++ b/src/include/xbt/parmap.hpp @@ -13,6 +13,9 @@ #include "xbt/xbt_os_thread.h" #include +#include +#include +#include #if HAVE_FUTEX_H #include @@ -24,11 +27,11 @@ XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap); namespace simgrid { namespace xbt { -/** \addtogroup XBT_parmap - * \ingroup XBT_misc - * \brief Parallel map class - * \{ - */ +/** @addtogroup XBT_parmap + * @ingroup XBT_misc + * @brief Parallel map class + * @{ + */ template class Parmap { public: Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode); @@ -42,7 +45,7 @@ private: enum Flag { PARMAP_WORK, PARMAP_DESTROY }; /** - * \brief Thread data transmission structure + * @brief Thread data transmission structure */ class ThreadData { public: @@ -52,36 +55,36 @@ private: }; /** - * \brief Synchronization object (different specializations). + * @brief Synchronization object (different specializations). */ class Synchro { public: explicit Synchro(Parmap& parmap) : parmap(parmap) {} virtual ~Synchro() = default; /** - * \brief Wakes all workers and waits for them to finish the tasks. + * @brief Wakes all workers and waits for them to finish the tasks. * * This function is called by the controller thread. */ virtual void master_signal() = 0; /** - * \brief Starts the parmap: waits for all workers to be ready and returns. + * @brief Starts the parmap: waits for all workers to be ready and returns. * * This function is called by the controller thread. */ virtual void master_wait() = 0; /** - * \brief Ends the parmap: wakes the controller thread when all workers terminate. + * @brief Ends the parmap: wakes the controller thread when all workers terminate. * * This function is called by all worker threads when they end (not including the controller). */ virtual void worker_signal() = 0; /** - * \brief Waits for some work to process. + * @brief Waits for some work to process. * * This function is called by each worker thread (not including the controller) when it has no more work to do. * - * \param round the expected round number + * @param round the expected round number */ virtual void worker_wait(unsigned) = 0; @@ -98,10 +101,10 @@ private: void worker_wait(unsigned round); private: - xbt_os_cond_t ready_cond; - xbt_os_mutex_t ready_mutex; - xbt_os_cond_t done_cond; - xbt_os_mutex_t done_mutex; + std::condition_variable ready_cond; + std::mutex ready_mutex; + std::condition_variable done_cond; + std::mutex done_mutex; }; #if HAVE_FUTEX_H @@ -145,9 +148,9 @@ private: }; /** - * \brief Creates a parallel map object - * \param num_workers number of worker threads to create - * \param mode how to synchronize the worker threads + * @brief Creates a parallel map object + * @param num_workers number of worker threads to create + * @param mode how to synchronize the worker threads */ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode) { @@ -162,24 +165,20 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ /* Create the pool of worker threads */ this->workers[0] = nullptr; -#if HAVE_PTHREAD_SETAFFINITY - int core_bind = 0; -#endif + unsigned int core_bind = 0; for (unsigned i = 1; i < num_workers; i++) { ThreadData* data = new ThreadData(*this, i); this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr); -#if HAVE_PTHREAD_SETAFFINITY xbt_os_thread_bind(this->workers[i], core_bind); - if (core_bind != xbt_os_get_numcores() - 1) + if (core_bind != std::thread::hardware_concurrency() - 1) core_bind++; else core_bind = 0; -#endif } } /** - * \brief Destroys a parmap + * @brief Destroys a parmap */ template Parmap::~Parmap() { @@ -194,9 +193,9 @@ template Parmap::~Parmap() } /** - * \brief Applies a list of tasks in parallel. - * \param fun the function to call in parallel - * \param data each element of this vector will be passed as an argument to fun + * @brief Applies a list of tasks in parallel. + * @param fun the function to call in parallel + * @param data each element of this vector will be passed as an argument to fun */ template void Parmap::apply(void (*fun)(T), const std::vector& data) { @@ -211,11 +210,11 @@ template void Parmap::apply(void (*fun)(T), const std::vector } /** - * \brief Returns a next task to process. + * @brief Returns a next task to process. * * Worker threads call this function to get more work. * - * \return the next task to process, or throws a std::out_of_range exception if there is no more work + * @return the next task to process, or throws a std::out_of_range exception if there is no more work */ template boost::optional Parmap::next() { @@ -227,7 +226,7 @@ template boost::optional Parmap::next() } /** - * \brief Main work loop: applies fun to elements in turn. + * @brief Main work loop: applies fun to elements in turn. */ template void Parmap::work() { @@ -241,7 +240,7 @@ template void Parmap::work() /** * Get a synchronization object for given mode. - * \param mode the synchronization mode + * @param mode the synchronization mode */ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_parmap_mode_t mode) { @@ -274,7 +273,7 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ } /** - * \brief Main function of a worker thread. + * @brief Main function of a worker thread. */ template void* Parmap::worker_main(void* arg) { @@ -306,59 +305,47 @@ template void* Parmap::worker_main(void* arg) template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) { - ready_cond = xbt_os_cond_init(); - ready_mutex = xbt_os_mutex_init(); - done_cond = xbt_os_cond_init(); - done_mutex = xbt_os_mutex_init(); } template Parmap::PosixSynchro::~PosixSynchro() { - xbt_os_cond_destroy(ready_cond); - xbt_os_mutex_destroy(ready_mutex); - xbt_os_cond_destroy(done_cond); - xbt_os_mutex_destroy(done_mutex); } template void Parmap::PosixSynchro::master_signal() { - xbt_os_mutex_acquire(ready_mutex); + std::unique_lock lk(ready_mutex); this->parmap.thread_counter = 1; this->parmap.work_round++; /* wake all workers */ - xbt_os_cond_broadcast(ready_cond); - xbt_os_mutex_release(ready_mutex); + ready_cond.notify_all(); } template void Parmap::PosixSynchro::master_wait() { - xbt_os_mutex_acquire(done_mutex); + std::unique_lock lk(done_mutex); while (this->parmap.thread_counter < this->parmap.num_workers) { /* wait for all workers to be ready */ - xbt_os_cond_wait(done_cond, done_mutex); + done_cond.wait(lk); } - xbt_os_mutex_release(done_mutex); } template void Parmap::PosixSynchro::worker_signal() { - xbt_os_mutex_acquire(done_mutex); + std::unique_lock lk(done_mutex); this->parmap.thread_counter++; if (this->parmap.thread_counter == this->parmap.num_workers) { /* all workers have finished, wake the controller */ - xbt_os_cond_signal(done_cond); + done_cond.notify_one(); } - xbt_os_mutex_release(done_mutex); } template void Parmap::PosixSynchro::worker_wait(unsigned round) { - xbt_os_mutex_acquire(ready_mutex); + std::unique_lock lk(ready_mutex); /* wait for more work */ while (this->parmap.work_round != round) { - xbt_os_cond_wait(ready_cond, ready_mutex); + ready_cond.wait(lk); } - xbt_os_mutex_release(ready_mutex); } #if HAVE_FUTEX_H @@ -421,7 +408,7 @@ template void Parmap::BusyWaitSynchro::master_signal() template void Parmap::BusyWaitSynchro::master_wait() { while (__atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST) < this->parmap.num_workers) { - xbt_os_thread_yield(); + std::this_thread::yield(); } } @@ -434,11 +421,11 @@ template void Parmap::BusyWaitSynchro::worker_wait(unsigned roun { /* wait for more work */ while (__atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST) != round) { - xbt_os_thread_yield(); + std::this_thread::yield(); } } -/** \} */ +/** @} */ } }