Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'master' of https://framagit.org/mwapl/simgrid
[simgrid.git] / include / simgrid / plugins / ProducerConsumer.hpp
index 4297a0e..d4438e4 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2021. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2021-2023. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -6,6 +6,7 @@
 #ifndef SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
 #define SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
 
+#include <simgrid/s4u/Comm.hpp>
 #include <simgrid/s4u/ConditionVariable.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
 #include <simgrid/s4u/Mutex.hpp>
 #include <queue>
 #include <string>
 
-XBT_LOG_NEW_CATEGORY(producer_consumer, "Producer-Consumer plugin logging category");
+XBT_LOG_EXTERNAL_CATEGORY(producer_consumer);
 
 /** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
 
-namespace simgrid {
-namespace plugin {
+namespace simgrid::plugin {
 
 template <typename T> class ProducerConsumer;
 template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
 
-static unsigned long pc_id = 0;
+class ProducerConsumerId {
+private:
+  static unsigned long pc_id;
+
+protected:
+  const std::string id = "ProducerConsumer" + std::to_string(pc_id);
+  ProducerConsumerId() { ++pc_id; }
+};
 
-template <typename T> class ProducerConsumer {
+template <typename T> class ProducerConsumer : public ProducerConsumerId {
 public:
   /** This ProducerConsumer plugin can use two different transfer modes:
    *   - TransferMode::MAILBOX: this mode induces a s4u::Comm between the actors doing the calls to put() and get().
@@ -44,8 +51,6 @@ public:
   enum class TransferMode { MAILBOX = 0, QUEUE };
 
 private:
-  std::string id;
-
   /* Implementation of a Monitor to handle the data exchanges */
   s4u::MutexPtr mutex_;
   s4u::ConditionVariablePtr can_put_;
@@ -70,13 +75,10 @@ private:
     }
   }
 
-  ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
+  explicit ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
   {
     xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
 
-    id = std::string("ProducerConsumer") + std::to_string(pc_id);
-    pc_id++;
-
     mutex_   = s4u::Mutex::create();
     can_put_ = s4u::ConditionVariable::create();
     can_get_ = s4u::ConditionVariable::create();
@@ -101,12 +103,12 @@ public:
    */
   ProducerConsumer* set_max_queue_size(unsigned int max_queue_size)
   {
-    std::unique_lock<s4u::Mutex> lock(*mutex_);
+    const std::scoped_lock lock(*mutex_);
     max_queue_size_ = max_queue_size;
     return this;
   }
 
-  unsigned int get_max_queue_size() { return max_queue_size_; }
+  unsigned int get_max_queue_size() const { return max_queue_size_; }
 
   /** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/
   ProducerConsumer* set_transfer_mode(TransferMode new_mode)
@@ -123,7 +125,7 @@ public:
     tmode_ = new_mode;
     return this;
   }
-  std::string get_transfer_mode() { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
+  std::string get_transfer_mode() const { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
 
   /** Container-agnostic size() method */
   unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); }
@@ -138,14 +140,15 @@ public:
    */
   s4u::CommPtr put_async(T* data, size_t simulated_size_in_bytes)
   {
-    std::unique_lock<s4u::Mutex> lock(*mutex_);
+    std::unique_lock lock(*mutex_);
     s4u::CommPtr comm = nullptr;
     XBT_CVERB(producer_consumer, (size() < max_queue_size_) ? "can put" : "must wait");
 
     while (size() >= max_queue_size_)
       can_put_->wait(lock);
     if (tmode_ == TransferMode::MAILBOX) {
-      comm = mbox_->put_async(data, simulated_size_in_bytes);
+      comm = mbox_->put_init(data, simulated_size_in_bytes)
+                 ->start();
     } else
       queue_.push(data);
     can_get_->notify_all();
@@ -174,13 +177,15 @@ public:
    */
   s4u::CommPtr get_async(T** data)
   {
-    std::unique_lock<s4u::Mutex> lock(*mutex_);
+    std::unique_lock lock(*mutex_);
     s4u::CommPtr comm = nullptr;
     XBT_CVERB(producer_consumer, empty() ? "must wait" : "can get");
     while (empty())
       can_get_->wait(lock);
     if (tmode_ == TransferMode::MAILBOX)
-      comm = mbox_->get_async<T>(data);
+      comm = mbox_->get_init()
+                 ->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*))
+                 ->start();
     else {
       *data = queue_.front();
       queue_.pop();
@@ -199,8 +204,7 @@ public:
   T* get()
   {
     T* data;
-    s4u::CommPtr comm = get_async(&data);
-    if (comm) {
+    if (s4u::CommPtr comm = get_async(&data)) {
       XBT_CDEBUG(producer_consumer, "Waiting for the data to arrive");
       comm->wait();
     }
@@ -209,7 +213,6 @@ public:
   }
 };
 
-} // namespace plugin
-} // namespace simgrid
+} // namespace simgrid::plugin
 
 #endif // SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP