Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make pc_id a static member of ProducerConsumer.
[simgrid.git] / include / simgrid / plugins / ProducerConsumer.hpp
1 /* Copyright (c) 2021-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #ifndef SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
7 #define SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
8
9 #include <simgrid/s4u/Comm.hpp>
10 #include <simgrid/s4u/ConditionVariable.hpp>
11 #include <simgrid/s4u/Mailbox.hpp>
12 #include <simgrid/s4u/Mutex.hpp>
13 #include <xbt/asserts.h>
14 #include <xbt/log.h>
15
16 #include <atomic>
17 #include <climits>
18 #include <queue>
19 #include <string>
20
21 XBT_LOG_EXTERNAL_CATEGORY(producer_consumer);
22
23 /** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
24
25 namespace simgrid {
26 namespace plugin {
27
28 template <typename T> class ProducerConsumer;
29 template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
30
31 class ProducerConsumerId {
32 private:
33   static unsigned long pc_id;
34
35 protected:
36   const std::string id = "ProducerConsumer" + std::to_string(pc_id);
37   ProducerConsumerId() { ++pc_id; }
38 };
39
40 template <typename T> class ProducerConsumer : public ProducerConsumerId {
41 public:
42   /** This ProducerConsumer plugin can use two different transfer modes:
43    *   - TransferMode::MAILBOX: this mode induces a s4u::Comm between the actors doing the calls to put() and get().
44    *     If these actors are on the same host, this communication goes through the host's loopback and can thus be
45    *     seen as a memory copy. Otherwise, data goes over the network.
46    *   - TransferMode::QUEUE: data is internally stored in a std::queue. Putting and getting data to and from this
47    *     data structure has a zero-cost in terms of simulated time.
48    *  Both modes guarantee that the data is consumed in the order it has been produced. However, when data goes
49    *  through the network, s4u::Comm are started in the right order, but may complete in a different order depending
50    *  the characteristics of the different interconnections between host pairs.
51    */
52   enum class TransferMode { MAILBOX = 0, QUEUE };
53
54 private:
55   /* Implementation of a Monitor to handle the data exchanges */
56   s4u::MutexPtr mutex_;
57   s4u::ConditionVariablePtr can_put_;
58   s4u::ConditionVariablePtr can_get_;
59
60   /* data containers for each of the transfer modes */
61   s4u::Mailbox* mbox_ = nullptr;
62   std::queue<T*> queue_;
63
64   unsigned int max_queue_size_ = 1;
65   TransferMode tmode_          = TransferMode::MAILBOX;
66
67   /* Refcounting management */
68   std::atomic_int_fast32_t refcount_{0};
69   friend void intrusive_ptr_add_ref(ProducerConsumer* pc) { pc->refcount_.fetch_add(1, std::memory_order_acq_rel); }
70
71   friend void intrusive_ptr_release(ProducerConsumer* pc)
72   {
73     if (pc->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
74       std::atomic_thread_fence(std::memory_order_acquire);
75       delete pc;
76     }
77   }
78
79   explicit ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
80   {
81     xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
82
83     mutex_   = s4u::Mutex::create();
84     can_put_ = s4u::ConditionVariable::create();
85     can_get_ = s4u::ConditionVariable::create();
86
87     if (tmode_ == TransferMode::MAILBOX)
88       mbox_ = s4u::Mailbox::by_name(id);
89   }
90   ~ProducerConsumer() = default;
91
92 public:
93   /** Creation of the monitored queue. Its size can be bounded by passing a strictly positive value to 'max_queue_size'
94    *  as parameter. Calling 'create()' means that the queue size is (virtually) infinite.
95    */
96   static ProducerConsumerPtr<T> create(unsigned int max_queue_size = UINT_MAX)
97   {
98     return ProducerConsumerPtr<T>(new ProducerConsumer<T>(max_queue_size));
99   }
100
101   /** This method is intended more to set the maximum queue size in a fluent way than changing the size during the
102    *  utilization of the ProducerConsumer. Hence, the modification occurs in a critical section to prevent
103    *  inconsistencies.
104    */
105   ProducerConsumer* set_max_queue_size(unsigned int max_queue_size)
106   {
107     std::unique_lock<s4u::Mutex> lock(*mutex_);
108     max_queue_size_ = max_queue_size;
109     return this;
110   }
111
112   unsigned int get_max_queue_size() const { return max_queue_size_; }
113
114   /** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/
115   ProducerConsumer* set_transfer_mode(TransferMode new_mode)
116   {
117     if (tmode_ == new_mode) /* No change, do nothing */
118       return this;
119
120     xbt_assert(empty(), "cannot change transfer mode when some data is in queue");
121     if (new_mode == TransferMode::MAILBOX) {
122       mbox_ = s4u::Mailbox::by_name(id);
123     } else {
124       mbox_ = nullptr;
125     }
126     tmode_ = new_mode;
127     return this;
128   }
129   std::string get_transfer_mode() const { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
130
131   /** Container-agnostic size() method */
132   unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); }
133
134   /** Container-agnostic empty() method */
135   bool empty() { return tmode_ == TransferMode::MAILBOX ? mbox_->empty() : queue_.empty(); }
136
137   /** Asynchronous put() of a data item of a given size
138    *  - TransferMode::MAILBOX: if put_async is called directly from user code, it can be considered to be done in a
139    *    fire-and-forget mode. No need to save the s4u::CommPtr.
140    *  - TransferMode::QUEUE: the data is simply pushed into the queue.
141    */
142   s4u::CommPtr put_async(T* data, size_t simulated_size_in_bytes)
143   {
144     std::unique_lock<s4u::Mutex> lock(*mutex_);
145     s4u::CommPtr comm = nullptr;
146     XBT_CVERB(producer_consumer, (size() < max_queue_size_) ? "can put" : "must wait");
147
148     while (size() >= max_queue_size_)
149       can_put_->wait(lock);
150     if (tmode_ == TransferMode::MAILBOX) {
151       comm = mbox_->put_init(data, simulated_size_in_bytes)
152                  ->set_copy_data_callback(s4u::Comm::copy_pointer_callback)
153                  ->start();
154     } else
155       queue_.push(data);
156     can_get_->notify_all();
157     return comm;
158   }
159
160   /** Synchronous put() of a data item of a given size
161    *  - TransferMode::MAILBOX: the caller must wait for the induced communication with the getter of the data to be
162    *    complete to continue with its execution. This wait is done outside of the monitor to prevent serialization.
163    *  - TransferMode::QUEUE: the behavior is exactly the same as put_async: data is simply pushed into the queue.
164    */
165   void put(T* data, size_t simulated_size_in_bytes)
166   {
167     s4u::CommPtr comm = put_async(data, simulated_size_in_bytes);
168     if (comm) {
169       XBT_CDEBUG(producer_consumer, "Waiting for the data to be consumed");
170       comm->wait();
171     }
172   }
173
174   /** Asynchronous get() of a 'data'
175    *  - TransferMode::MAILBOX: the caller is returned a s4u::CommPtr onto which it can wait when the data is really
176    *    needed.
177    *  - TransferMode::QUEUE: the data is simply popped from the queue and directly available. Better to call get() in
178    *    this transfer mode.
179    */
180   s4u::CommPtr get_async(T** data)
181   {
182     std::unique_lock<s4u::Mutex> lock(*mutex_);
183     s4u::CommPtr comm = nullptr;
184     XBT_CVERB(producer_consumer, empty() ? "must wait" : "can get");
185     while (empty())
186       can_get_->wait(lock);
187     if (tmode_ == TransferMode::MAILBOX)
188       comm = mbox_->get_init()
189                  ->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*))
190                  ->set_copy_data_callback(s4u::Comm::copy_pointer_callback)
191                  ->start();
192     else {
193       *data = queue_.front();
194       queue_.pop();
195     }
196     can_put_->notify_all();
197
198     return comm;
199   }
200
201   /** Synchronous get() of a 'data'
202    *  - TransferMode::MAILBOX: the caller waits (outside the monitor to prevent serialization) for the induced
203    *    communication to be complete to continue with its execution.
204    *  - TransferMode::QUEUE: the behavior is exactly the same as get_async: data is simply popped from the queue and
205    *    directly available to the caller.
206    */
207   T* get()
208   {
209     T* data;
210     if (s4u::CommPtr comm = get_async(&data)) {
211       XBT_CDEBUG(producer_consumer, "Waiting for the data to arrive");
212       comm->wait();
213     }
214     XBT_CDEBUG(producer_consumer, "data is available");
215     return data;
216   }
217 };
218
219 } // namespace plugin
220 } // namespace simgrid
221
222 #endif // SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP