Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
A few more sonar smells.
[simgrid.git] / src / include / xbt / parmap.hpp
index 839d029..48ce17e 100644 (file)
@@ -1,6 +1,6 @@
 /* A thread pool (C++ version).                                             */
 
-/* Copyright (c) 2004-2019 The SimGrid Team. All rights reserved.           */
+/* Copyright (c) 2004-2022 The SimGrid Team. All rights reserved.           */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -9,8 +9,8 @@
 #define XBT_PARMAP_HPP
 
 #include "src/internal_config.h" // HAVE_FUTEX_H
+#include "src/kernel/EngineImpl.hpp"
 #include "src/kernel/context/Context.hpp"
-#include "src/simix/smx_private.hpp" /* simix_global */
 
 #include <boost/optional.hpp>
 #include <condition_variable>
@@ -87,7 +87,7 @@ private:
      *
      * This function is called by each worker thread (not including the controller) when it has no more work to do.
      *
-     * @param round  the expected round number
+     * @param expected_round  the expected round number
      */
     virtual void worker_wait(unsigned) = 0;
 
@@ -96,12 +96,11 @@ private:
 
   class PosixSynchro : public Synchro {
   public:
-    explicit PosixSynchro(Parmap<T>& parmap);
-    ~PosixSynchro();
+    explicit PosixSynchro(Parmap<T>& parmap) : Synchro(parmap) {}
     void master_signal() override;
     void master_wait() override;
     void worker_signal() override;
-    void worker_wait(unsigned round) override;
+    void worker_wait(unsigned expected_round) override;
 
   private:
     std::condition_variable ready_cond;
@@ -138,16 +137,16 @@ private:
   Synchro* new_synchro(e_xbt_parmap_mode_t mode);
   void work();
 
-  bool destroying;                   /**< is the parmap being destroyed? */
-  std::atomic_uint work_round;       /**< index of the current round */
+  bool destroying = false;           /**< is the parmap being destroyed? */
+  std::atomic_uint work_round{0};    /**< index of the current round */
   std::vector<std::thread*> workers; /**< worker thread handlers */
   unsigned num_workers;     /**< total number of worker threads including the controller */
   Synchro* synchro;         /**< synchronization object */
 
   std::atomic_uint thread_counter{0};   /**< number of workers that have done the work */
-  std::function<void(T)> fun;           /**< function to run in parallel on each element of data */
-  const std::vector<T>* data = nullptr; /**< parameters to pass to fun in parallel */
-  std::atomic_uint index;               /**< index of the next element of data to pick */
+  std::function<void(T)> worker_fun;    /**< function to run in parallel on each element of data */
+  const std::vector<T>* common_data = nullptr; /**< parameters to pass to fun in parallel */
+  std::atomic_uint common_index{0};            /**< index of the next element of data to pick */
 };
 
 /**
@@ -155,23 +154,18 @@ private:
  * @param num_workers number of worker threads to create
  * @param mode how to synchronize the worker threads
  */
-template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
+template <typename T>
+Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_t mode)
+    : workers(num_workers), num_workers(num_workers), synchro(new_synchro(mode))
 {
   XBT_CDEBUG(xbt_parmap, "Create new parmap (%u workers)", num_workers);
 
-  /* Initialize the thread pool data structure */
-  this->destroying  = false;
-  this->work_round  = 0;
-  this->workers.resize(num_workers);
-  this->num_workers = num_workers;
-  this->synchro     = new_synchro(mode);
-
   /* Create the pool of worker threads (the caller of apply() will be worker[0]) */
-  this->workers[0] = nullptr;
-  XBT_ATTRIB_UNUSED unsigned int core_bind = 0;
+  workers[0] = nullptr;
 
   for (unsigned i = 1; i < num_workers; i++) {
-    this->workers[i] = new std::thread(worker_main, new ThreadData(*this, i));
+    auto* data = new ThreadData(*this, i);
+    workers[i] = new std::thread(worker_main, data);
 
     /* Bind the worker to a core if possible */
 #if HAVE_PTHREAD_SETAFFINITY
@@ -182,14 +176,11 @@ template <typename T> Parmap<T>::Parmap(unsigned num_workers, e_xbt_parmap_mode_
     cpu_set_t cpuset;
     size_t size = sizeof(cpu_set_t);
 #endif
-    pthread_t pthread = this->workers[i]->native_handle();
+    pthread_t pthread = workers[i]->native_handle();
+    int core_bind     = (i - 1) % std::thread::hardware_concurrency();
     CPU_ZERO(&cpuset);
     CPU_SET(core_bind, &cpuset);
     pthread_setaffinity_np(pthread, size, &cpuset);
-    if (core_bind != std::thread::hardware_concurrency() - 1)
-      core_bind++;
-    else
-      core_bind = 0;
 #endif
   }
 }
@@ -217,12 +208,12 @@ template <typename T> Parmap<T>::~Parmap()
 template <typename T> void Parmap<T>::apply(std::function<void(T)>&& fun, const std::vector<T>& data)
 {
   /* Assign resources to worker threads (we are maestro here)*/
-  this->fun   = std::move(fun);
-  this->data  = &data;
-  this->index = 0;
-  this->synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads)
-  this->work();                   // maestro works with its minions
-  this->synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
+  worker_fun   = std::move(fun);
+  common_data  = &data;
+  common_index = 0;
+  synchro->master_signal(); // maestro runs futex_wake to wake all the minions (the working threads)
+  work();                   // maestro works with its minions
+  synchro->master_wait();   // When there is no more work to do, then maestro waits for the last minion to stop
   XBT_CDEBUG(xbt_parmap, "Job done"); //   ... and proceeds
 }
 
@@ -235,9 +226,9 @@ template <typename T> void Parmap<T>::apply(std::function<void(T)>&& fun, const
  */
 template <typename T> boost::optional<T> Parmap<T>::next()
 {
-  unsigned index = this->index.fetch_add(1, std::memory_order_relaxed);
-  if (index < this->data->size())
-    return (*this->data)[index];
+  unsigned index = common_index.fetch_add(1, std::memory_order_relaxed);
+  if (index < common_data->size())
+    return (*common_data)[index];
   else
     return boost::none;
 }
@@ -247,11 +238,11 @@ template <typename T> boost::optional<T> Parmap<T>::next()
  */
 template <typename T> void Parmap<T>::work()
 {
-  unsigned length = this->data->size();
-  unsigned index  = this->index.fetch_add(1, std::memory_order_relaxed);
+  unsigned length = static_cast<unsigned>(common_data->size());
+  unsigned index  = common_index.fetch_add(1, std::memory_order_relaxed);
   while (index < length) {
-    this->fun((*this->data)[index]);
-    index = this->index.fetch_add(1, std::memory_order_relaxed);
+    worker_fun((*common_data)[index]);
+    index = common_index.fetch_add(1, std::memory_order_relaxed);
   }
 }
 
@@ -292,15 +283,16 @@ template <typename T> typename Parmap<T>::Synchro* Parmap<T>::new_synchro(e_xbt_
 /** @brief Main function of a worker thread */
 template <typename T> void Parmap<T>::worker_main(ThreadData* data)
 {
+  auto engine                       = simgrid::kernel::EngineImpl::get_instance();
   Parmap<T>& parmap     = data->parmap;
   unsigned round        = 0;
-  smx_context_t context = simix_global->context_factory->create_context(std::function<void()>(), nullptr);
+  kernel::context::Context* context = engine->get_context_factory()->create_context(std::function<void()>(), nullptr);
   kernel::context::Context::set_current(context);
 
   XBT_CDEBUG(xbt_parmap, "New worker thread created");
 
   /* Worker's main loop */
-  while (1) {
+  while (true) {
     round++; // New scheduling round
     parmap.synchro->worker_wait(round);
     if (parmap.destroying)
@@ -316,14 +308,6 @@ template <typename T> void Parmap<T>::worker_main(ThreadData* data)
   delete data;
 }
 
-template <typename T> Parmap<T>::PosixSynchro::PosixSynchro(Parmap<T>& parmap) : Synchro(parmap)
-{
-}
-
-template <typename T> Parmap<T>::PosixSynchro::~PosixSynchro()
-{
-}
-
 template <typename T> void Parmap<T>::PosixSynchro::master_signal()
 {
   std::unique_lock<std::mutex> lk(ready_mutex);
@@ -336,10 +320,8 @@ template <typename T> void Parmap<T>::PosixSynchro::master_signal()
 template <typename T> void Parmap<T>::PosixSynchro::master_wait()
 {
   std::unique_lock<std::mutex> lk(done_mutex);
-  while (this->parmap.thread_counter < this->parmap.num_workers) {
-    /* wait for all workers to be ready */
-    done_cond.wait(lk);
-  }
+  /* wait for all workers to be ready */
+  done_cond.wait(lk, [this]() { return this->parmap.thread_counter >= this->parmap.num_workers; });
 }
 
 template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
@@ -352,13 +334,11 @@ template <typename T> void Parmap<T>::PosixSynchro::worker_signal()
   }
 }
 
-template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned round)
+template <typename T> void Parmap<T>::PosixSynchro::worker_wait(unsigned expected_round)
 {
   std::unique_lock<std::mutex> lk(ready_mutex);
   /* wait for more work */
-  while (this->parmap.work_round != round) {
-    ready_cond.wait(lk);
-  }
+  ready_cond.wait(lk, [this, expected_round]() { return this->parmap.work_round == expected_round; });
 }
 
 #if HAVE_FUTEX_H
@@ -401,13 +381,13 @@ template <typename T> void Parmap<T>::FutexSynchro::worker_signal()
   }
 }
 
-template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned round)
+template <typename T> void Parmap<T>::FutexSynchro::worker_wait(unsigned expected_round)
 {
-  unsigned work_round = this->parmap.work_round.load();
+  unsigned round = this->parmap.work_round.load();
   /* wait for more work */
-  while (work_round != round) {
-    futex_wait(&this->parmap.work_round, work_round);
-    work_round = this->parmap.work_round.load();
+  while (round != expected_round) {
+    futex_wait(&this->parmap.work_round, round);
+    round = this->parmap.work_round.load();
   }
 }
 #endif