X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/8adc6066afcd5faa0a14551e96b1eff092eb20af..4a4884ead9a3d55f9247a3facc7b310f1a0fe942:/src/include/xbt/parmap.hpp diff --git a/src/include/xbt/parmap.hpp b/src/include/xbt/parmap.hpp index ceea1e062c..18bf58d496 100644 --- a/src/include/xbt/parmap.hpp +++ b/src/include/xbt/parmap.hpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -42,12 +43,10 @@ public: Parmap(const Parmap&) = delete; Parmap& operator=(const Parmap&) = delete; ~Parmap(); - void apply(void (*fun)(T), const std::vector& data); + void apply(std::function&& fun, const std::vector& data); boost::optional next(); private: - enum Flag { PARMAP_WORK, PARMAP_DESTROY }; - /** * @brief Thread data transmission structure */ @@ -135,18 +134,18 @@ private: void worker_wait(unsigned) override; }; - static void* worker_main(void* arg); + static void worker_main(ThreadData* data); Synchro* new_synchro(e_xbt_parmap_mode_t mode); void work(); - Flag status; /**< is the parmap active or being destroyed? */ + bool destroying; /**< is the parmap being destroyed? */ std::atomic_uint work_round; /**< index of the current round */ std::vector workers; /**< worker thread handlers */ unsigned num_workers; /**< total number of worker threads including the controller */ Synchro* synchro; /**< synchronization object */ std::atomic_uint thread_counter{0}; /**< number of workers that have done the work */ - void (*fun)(const T) = nullptr; /**< function to run in parallel on each element of data */ + std::function fun; /**< function to run in parallel on each element of data */ const std::vector* data = nullptr; /**< parameters to pass to fun in parallel */ std::atomic_uint index; /**< index of the next element of data to pick */ }; @@ -161,7 +160,7 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers); /* Initialize the thread pool data structure */ - this->status = PARMAP_WORK; + this->destroying = false; this->work_round = 0; this->workers.resize(num_workers); this->num_workers = num_workers; @@ -172,7 +171,8 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ XBT_ATTRIB_UNUSED unsigned int core_bind = 0; for (unsigned i = 1; i < num_workers; i++) { - this->workers[i] = new std::thread(worker_main, new ThreadData(*this, i)); + ThreadData* data = new ThreadData(*this, i); + this->workers[i] = new std::thread(worker_main, data); /* Bind the worker to a core if possible */ #if HAVE_PTHREAD_SETAFFINITY @@ -200,7 +200,7 @@ template Parmap::Parmap(unsigned num_workers, e_xbt_parmap_mode_ */ template Parmap::~Parmap() { - status = PARMAP_DESTROY; + destroying = true; synchro->master_signal(); for (unsigned i = 1; i < num_workers; i++) { @@ -215,10 +215,10 @@ template Parmap::~Parmap() * @param fun the function to call in parallel * @param data each element of this vector will be passed as an argument to fun */ -template void Parmap::apply(void (*fun)(T), const std::vector& data) +template void Parmap::apply(std::function&& fun, const std::vector& data) { /* Assign resources to worker threads (we are maestro here)*/ - this->fun = fun; + this->fun = std::move(fun); this->data = &data; this->index = 0; this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads) @@ -291,12 +291,11 @@ template typename Parmap::Synchro* Parmap::new_synchro(e_xbt_ } /** @brief Main function of a worker thread */ -template void* Parmap::worker_main(void* arg) +template void Parmap::worker_main(ThreadData* data) { - ThreadData* data = static_cast(arg); Parmap& parmap = data->parmap; unsigned round = 0; - smx_context_t context = simix_global->context_factory->create_context(std::function(), nullptr); + kernel::context::Context* context = simix_global->context_factory->create_context(std::function(), nullptr); kernel::context::Context::set_current(context); XBT_CDEBUG(xbt_parmap, "New worker thread created"); @@ -305,7 +304,7 @@ template void* Parmap::worker_main(void* arg) while (1) { round++; // New scheduling round parmap.synchro->worker_wait(round); - if (parmap.status == PARMAP_DESTROY) + if (parmap.destroying) break; XBT_CDEBUG(xbt_parmap, "Worker %d got a job", data->worker_id); @@ -316,7 +315,6 @@ template void* Parmap::worker_main(void* arg) /* We are destroying the parmap */ delete context; delete data; - return nullptr; } template Parmap::PosixSynchro::PosixSynchro(Parmap& parmap) : Synchro(parmap)