X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/d8eb62b207b566949a0d9ce649a7b21e226b9168..54fded314ada71fc0b12a5493025f9effe475215:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp index 18c0392b31..a94e6894c6 100644 --- a/src/include/xbt/parmap.hpp +++ b/src/include/xbt/parmap.hpp @@ -1,6 +1,6 @@ /* A thread pool (C++ version). */ -/* Copyright (c) 2004-2018 The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2004-2019 The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -13,12 +13,19 @@ #include "xbt/xbt_os_thread.h" #include +#include +#include +#include #if HAVE_FUTEX_H #include #include #endif +#if HAVE_PTHREAD_NP_H +#include +#endif + XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap); namespace simgrid { @@ -98,10 +105,10 @@ private: void worker_wait(unsigned round); private: - xbt_os_cond_t ready_cond; - xbt_os_mutex_t ready_mutex; - xbt_os_cond_t done_cond; - xbt_os_mutex_t done_mutex; + std::condition_variable ready_cond; + std::mutex ready_mutex; + std::condition_variable done_cond; + std::mutex done_mutex; }; #if HAVE_FUTEX_H @@ -134,7 +141,7 @@ private: Flag status; /**< is the parmap active or being destroyed? */ unsigned work_round; /**< index of the current round */ - xbt_os_thread_t* workers; /**< worker thread handlers */ + std::vector workers; /**< worker thread handlers */ unsigned num_workers; /**< total number of worker threads including the controller */ Synchro* synchro; /**< synchronization object */ @@ -156,21 +163,31 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ /* Initialize the thread pool data structure */ this->status = PARMAP_WORK; this->work_round = 0; - this->workers = new xbt_os_thread_t[num_workers]; + this->workers.resize(num_workers); this->num_workers = num_workers; this->synchro = new_synchro(mode); - /* Create the pool of worker threads */ + /* Create the pool of worker threads (the caller of apply() will be worker[0]) */ this->workers[0] = nullptr; -#if HAVE_PTHREAD_SETAFFINITY - int core_bind = 0; -#endif + XBT_ATTRIB_UNUSED unsigned int core_bind = 0; + for (unsigned i = 1; i < num_workers; i++) { - ThreadData* data = new ThreadData(*this, i); - this->workers[i] = xbt_os_thread_create(nullptr, worker_main, data, nullptr); + this->workers[i] = new std::thread(worker_main, new ThreadData(*this, i)); + + /* Bind the worker to a core if possible */ #if HAVE_PTHREAD_SETAFFINITY - xbt_os_thread_bind(this->workers[i], core_bind); - if (core_bind != xbt_os_get_numcores() - 1) +#if HAVE_PTHREAD_NP_H /* FreeBSD ? */ + cpuset_t cpuset; + size_t size = sizeof(cpuset_t); +#else /* Linux ? */ + cpu_set_t cpuset; + size_t size = sizeof(cpu_set_t); +#endif + pthread_t pthread = this->workers[i]->native_handle(); + CPU_ZERO(&cpuset); + CPU_SET(core_bind, &cpuset); + pthread_setaffinity_np(pthread, size, &cpuset); + if (core_bind != std::thread::hardware_concurrency() - 1) core_bind++; else core_bind = 0; @@ -187,9 +204,8 @@ template Parmap::~Parmap() synchro->master_signal(); for (unsigned i = 1; i < num_workers; i++) - xbt_os_thread_join(workers[i], nullptr); + workers[i]->join(); - delete[] workers; delete synchro; } @@ -273,22 +289,20 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ return res; } -/** - * @brief Main function of a worker thread. - */ +/** @brief Main function of a worker thread */ template void* Parmap::worker_main(void* arg) { ThreadData* data = static_cast(arg); Parmap& parmap = data->parmap; unsigned round = 0; smx_context_t context = SIMIX_context_new(std::function(), nullptr, nullptr); - SIMIX_context_set_current(context); + kernel::context::Context::set_current(context); XBT_CDEBUG(xbt_parmap, "New worker thread created"); /* Worker's main loop */ while (1) { - round++; + round++; // New scheduling round parmap.synchro->worker_wait(round); if (parmap.status == PARMAP_DESTROY) break; @@ -306,59 +320,47 @@ template void* Parmap::worker_main(void* arg) template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap) { - ready_cond = xbt_os_cond_init(); - ready_mutex = xbt_os_mutex_init(); - done_cond = xbt_os_cond_init(); - done_mutex = xbt_os_mutex_init(); } template Parmap::PosixSynchro::~PosixSynchro() { - xbt_os_cond_destroy(ready_cond); - xbt_os_mutex_destroy(ready_mutex); - xbt_os_cond_destroy(done_cond); - xbt_os_mutex_destroy(done_mutex); } template void Parmap::PosixSynchro::master_signal() { - xbt_os_mutex_acquire(ready_mutex); + std::unique_lock lk(ready_mutex); this->parmap.thread_counter = 1; this->parmap.work_round++; /* wake all workers */ - xbt_os_cond_broadcast(ready_cond); - xbt_os_mutex_release(ready_mutex); + ready_cond.notify_all(); } template void Parmap::PosixSynchro::master_wait() { - xbt_os_mutex_acquire(done_mutex); + std::unique_lock lk(done_mutex); while (this->parmap.thread_counter < this->parmap.num_workers) { /* wait for all workers to be ready */ - xbt_os_cond_wait(done_cond, done_mutex); + done_cond.wait(lk); } - xbt_os_mutex_release(done_mutex); } template void Parmap::PosixSynchro::worker_signal() { - xbt_os_mutex_acquire(done_mutex); + std::unique_lock lk(done_mutex); this->parmap.thread_counter++; if (this->parmap.thread_counter == this->parmap.num_workers) { /* all workers have finished, wake the controller */ - xbt_os_cond_signal(done_cond); + done_cond.notify_one(); } - xbt_os_mutex_release(done_mutex); } template void Parmap::PosixSynchro::worker_wait(unsigned round) { - xbt_os_mutex_acquire(ready_mutex); + std::unique_lock lk(ready_mutex); /* wait for more work */ while (this->parmap.work_round != round) { - xbt_os_cond_wait(ready_cond, ready_mutex); + ready_cond.wait(lk); } - xbt_os_mutex_release(ready_mutex); } #if HAVE_FUTEX_H @@ -421,7 +423,7 @@ template void Parmap::BusyWaitSynchro::master_signal() template void Parmap::BusyWaitSynchro::master_wait() { while (__atomic_load_n(&this->parmap.thread_counter, __ATOMIC_SEQ_CST) < this->parmap.num_workers) { - xbt_os_thread_yield(); + std::this_thread::yield(); } } @@ -434,7 +436,7 @@ template void Parmap::BusyWaitSynchro::worker_wait(unsigned roun { /* wait for more work */ while (__atomic_load_n(&this->parmap.work_round, __ATOMIC_SEQ_CST) != round) { - xbt_os_thread_yield(); + std::this_thread::yield(); } }