Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Remove the need of pthread_mutex in mmalloc, to allow its use with sthread
[simgrid.git] / src / include / xbt / parmap.hpp
index bc3b9f6..f782f59 100644 (file)
@@ -1,6 +1,6 @@
 /* A thread pool (C++ version).                                             */
 
-/* Copyright (c) 2004-2020 The SimGrid Team. All rights reserved.           */
+/* Copyright (c) 2004-2022 The SimGrid Team. All rights reserved.           */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -9,8 +9,8 @@
 #define XBT_PARMAP_HPP
 
 #include "src/internal_config.h" // HAVE_FUTEX_H
+#include "src/kernel/EngineImpl.hpp"
 #include "src/kernel/context/Context.hpp"
-#include "src/simix/smx_private.hpp" /* simix_global */
 
 #include <boost/optional.hpp>
 #include <condition_variable>
@@ -29,8 +29,7 @@
 
 XBT_LOG_EXTERNAL_CATEGORY(xbt_parmap);
 
-namespace simgrid {
-namespace xbt {
+namespace simgrid::xbt {
 
 /** @addtogroup XBT_parmap
  * @ingroup XBT_misc
@@ -87,7 +86,7 @@ private:
      *
      * This function is called by each worker thread (not including the controller) when it has no more work to do.
      *
-     * @param round  the expected round number
+     * @param expected_round  the expected round number
      */
     virtual void worker_wait(unsigned) = 0;
 
@@ -100,7 +99,7 @@ private:
     void master_signal() override;
     void master_wait() override;
     void worker_signal() override;
-    void worker_wait(unsigned round) override;
+    void worker_wait(unsigned expected_round) override;
 
   private:
     std::condition_variable ready_cond;
@@ -144,9 +143,9 @@ private:
   Synchro* synchro;         /**< synchronization object */
 
   std::atomic_uint thread_counter{0};   /**< number of workers that have done the work */
-  std::function<void(T)> fun;           /**< function to run in parallel on each element of data */
-  const std::vector<T>* data = nullptr; /**< parameters to pass to fun in parallel */
-  std::atomic_uint index{0};            /**< index of the next element of data to pick */
+  std::function<void(T)> worker_fun;    /**< function to run in parallel on each element of data */
+  const std::vector<T>* common_data = nullptr; /**< parameters to pass to fun in parallel */
+  std::atomic_uint common_index{0};            /**< index of the next element of data to pick */
 };
 
 /**
@@ -154,21 +153,18 @@ private:
  * @param num_workers number of worker threads to create
  * @param mode how to synchronize the worker threads
  */
-template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
+template <typename T>
+Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
+    : workers(num_workers), num_workers(num_workers), synchro(new_synchro(mode))
 {
   XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers);
 
-  /* Initialize the thread pool data structure */
-  this->workers.resize(num_workers);
-  this->num_workers = num_workers;
-  this->synchro     = new_synchro(mode);
-
   /* Create the pool of worker threads (the caller of apply() will be worker[0]) */
-  this->workers[0] = nullptr;
+  workers[0] = nullptr;
 
   for (unsigned i = 1; i < num_workers; i++) {
-    auto* data       = new ThreadData(*this, i);
-    this->workers[i] = new std::thread(worker_main, data);
+    auto* data = new ThreadData(*this, i);
+    workers[i] = new std::thread(worker_main, data);
 
     /* Bind the worker to a core if possible */
 #if HAVE_PTHREAD_SETAFFINITY
@@ -179,7 +175,7 @@ template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_
     cpu_set_t cpuset;
     size_t size = sizeof(cpu_set_t);
 #endif
-    pthread_t pthread = this->workers[i]->native_handle();
+    pthread_t pthread = workers[i]->native_handle();
     int core_bind     = (i - 1) % std::thread::hardware_concurrency();
     CPU_ZERO(&cpuset);
     CPU_SET(core_bind, &cpuset);
@@ -211,12 +207,12 @@ template <typename T> Parmap<T>::~Parmap()
 template <typename T> void Parmap<T>::apply(std::function<void(T)>&& fun, const std::vector<T>& data)
 {
   /* Assign resources to worker threads (we are maestro here)*/
-  this->fun   = std::move(fun);
-  this->data  = &data;
-  this->index = 0;
-  this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads)
-  this->work();                   // maestro works with its minions
-  this->synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
+  worker_fun   = std::move(fun);
+  common_data  = &data;
+  common_index = 0;
+  synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads)
+  work();                   // maestro works with its minions
+  synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
   XBT_CDEBUG(xbt_parmap, "Job done"); //   ... and proceeds
 }
 
@@ -229,9 +225,9 @@ template <typename T> void Parmap<T>::apply(std::function<void(T)>&& fun, const
  */
 template <typename T> boost::optional<T> Parmap<T>::next()
 {
-  unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
-  if (index < this->data->size())
-    return (*this->data)[index];
+  unsigned index = common_index.fetch_add(1, std::memory_order_relaxed);
+  if (index < common_data->size())
+    return (*common_data)[index];
   else
     return boost::none;
 }
@@ -241,11 +237,11 @@ template <typename T> boost::optional<T> Parmap<T>::next()
  */
 template <typename T> void Parmap<T>::work()
 {
-  unsigned length = this->data->size();
-  unsigned index  = this->index.fetch_add(1, std::memory_order_relaxed);
+  unsigned length = static_cast<unsigned>(common_data->size());
+  unsigned index  = common_index.fetch_add(1, std::memory_order_relaxed);
   while (index < length) {
-    this->fun((*this->data)[index]);
-    index = this->index.fetch_add(1, std::memory_order_relaxed);
+    worker_fun((*common_data)[index]);
+    index = common_index.fetch_add(1, std::memory_order_relaxed);
   }
 }
 
@@ -286,9 +282,10 @@ template <typename T> typename Parmap<T>::Synchro* Parmap<T>::new_synchro(e_xbt_
 /** @brief Main function of a worker thread */
 template <typename T> void Parmap<T>::worker_main(ThreadData* data)
 {
+  auto engine                       = simgrid::kernel::EngineImpl::get_instance();
   Parmap<T>& parmap     = data->parmap;
   unsigned round        = 0;
-  kernel::context::Context* context = simix_global->context_factory->create_context(std::function<void()>(), nullptr);
+  kernel::context::Context* context = engine->get_context_factory()->create_context(std::function<void()>(), nullptr);
   kernel::context::Context::set_current(context);
 
   XBT_CDEBUG(xbt_parmap, "New worker thread created");
@@ -312,7 +309,7 @@ template <typename T> void Parmap<T>::worker_main(ThreadData* data)
 
 template <typename T> void Parmap<T>::PosixSynchro::master_signal()
 {
-  std::unique_lock<std::mutex> lk(ready_mutex);
+  std::unique_lock lk(ready_mutex);
   this->parmap.thread_counter = 1;
   this->parmap.work_round++;
   /* wake all workers */
@@ -321,14 +318,14 @@ template <typename T> void Parmap<T>::PosixSynchro::master_signal()
 
 template <typename T> void Parmap<T>::PosixSynchro::master_wait()
 {
-  std::unique_lock<std::mutex> lk(done_mutex);
+  std::unique_lock lk(done_mutex);
   /* wait for all workers to be ready */
   done_cond.wait(lk, [this]() { return this->parmap.thread_counter >= this->parmap.num_workers; });
 }
 
 template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
 {
-  std::unique_lock<std::mutex> lk(done_mutex);
+  std::unique_lock lk(done_mutex);
   this->parmap.thread_counter++;
   if (this->parmap.thread_counter == this->parmap.num_workers) {
     /* all workers have finished, wake the controller */
@@ -336,11 +333,11 @@ template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
   }
 }
 
-template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned round)
+template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned expected_round)
 {
-  std::unique_lock<std::mutex> lk(ready_mutex);
+  std::unique_lock lk(ready_mutex);
   /* wait for more work */
-  ready_cond.wait(lk, [this, round]() { return this->parmap.work_round == round; });
+  ready_cond.wait(lk, [this, expected_round]() { return this->parmap.work_round == expected_round; });
 }
 
 #if HAVE_FUTEX_H
@@ -383,13 +380,13 @@ template <typename T> void Parmap<T>::FutexSynchro::worker_signal()
   }
 }
 
-template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned round)
+template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned expected_round)
 {
-  unsigned work_round = this->parmap.work_round.load();
+  unsigned round = this->parmap.work_round.load();
   /* wait for more work */
-  while (work_round != round) {
-    futex_wait(&this->parmap.work_round, work_round);
-    work_round = this->parmap.work_round.load();
+  while (round != expected_round) {
+    futex_wait(&this->parmap.work_round, round);
+    round = this->parmap.work_round.load();
   }
 }
 #endif
@@ -421,7 +418,6 @@ template <typename T> void Parmap<T>::BusyWaitSynchro::worker_wait(unsigned roun
 }
 
 /** @} */
-}
-}
+} // namespace simgrid::xbt
 
 #endif