1 /* Copyright (c) 2021-2022. The SimGrid Team. All rights reserved. */
3 /* This program is free software; you can redistribute it and/or modify it
4 * under the terms of the license (GNU LGPL) which comes with this package. */
6 #ifndef SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
7 #define SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
9 #include <simgrid/s4u/Comm.hpp>
10 #include <simgrid/s4u/ConditionVariable.hpp>
11 #include <simgrid/s4u/Mailbox.hpp>
12 #include <simgrid/s4u/Mutex.hpp>
13 #include <xbt/asserts.h>
21 XBT_LOG_EXTERNAL_CATEGORY(producer_consumer);
23 /** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
28 template <typename T> class ProducerConsumer;
29 template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
31 XBT_PUBLIC_DATA unsigned long pc_id;
33 template <typename T> class ProducerConsumer {
35 /** This ProducerConsumer plugin can use two different transfer modes:
36 * - TransferMode::MAILBOX: this mode induces a s4u::Comm between the actors doing the calls to put() and get().
37 * If these actors are on the same host, this communication goes through the host's loopback and can thus be
38 * seen as a memory copy. Otherwise, data goes over the network.
39 * - TransferMode::QUEUE: data is internally stored in a std::queue. Putting and getting data to and from this
40 * data structure has a zero-cost in terms of simulated time.
41 * Both modes guarantee that the data is consumed in the order it has been produced. However, when data goes
42 * through the network, s4u::Comm are started in the right order, but may complete in a different order depending
43 * the characteristics of the different interconnections between host pairs.
45 enum class TransferMode { MAILBOX = 0, QUEUE };
50 /* Implementation of a Monitor to handle the data exchanges */
52 s4u::ConditionVariablePtr can_put_;
53 s4u::ConditionVariablePtr can_get_;
55 /* data containers for each of the transfer modes */
56 s4u::Mailbox* mbox_ = nullptr;
57 std::queue<T*> queue_;
59 unsigned int max_queue_size_ = 1;
60 TransferMode tmode_ = TransferMode::MAILBOX;
62 /* Refcounting management */
63 std::atomic_int_fast32_t refcount_{0};
64 friend void intrusive_ptr_add_ref(ProducerConsumer* pc) { pc->refcount_.fetch_add(1, std::memory_order_acq_rel); }
66 friend void intrusive_ptr_release(ProducerConsumer* pc)
68 if (pc->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
69 std::atomic_thread_fence(std::memory_order_acquire);
74 explicit ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
76 xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
78 id = std::string("ProducerConsumer") + std::to_string(pc_id);
81 mutex_ = s4u::Mutex::create();
82 can_put_ = s4u::ConditionVariable::create();
83 can_get_ = s4u::ConditionVariable::create();
85 if (tmode_ == TransferMode::MAILBOX)
86 mbox_ = s4u::Mailbox::by_name(id);
88 ~ProducerConsumer() = default;
91 /** Creation of the monitored queue. Its size can be bounded by passing a strictly positive value to 'max_queue_size'
92 * as parameter. Calling 'create()' means that the queue size is (virtually) infinite.
94 static ProducerConsumerPtr<T> create(unsigned int max_queue_size = UINT_MAX)
96 return ProducerConsumerPtr<T>(new ProducerConsumer<T>(max_queue_size));
99 /** This method is intended more to set the maximum queue size in a fluent way than changing the size during the
100 * utilization of the ProducerConsumer. Hence, the modification occurs in a critical section to prevent
103 ProducerConsumer* set_max_queue_size(unsigned int max_queue_size)
105 std::unique_lock<s4u::Mutex> lock(*mutex_);
106 max_queue_size_ = max_queue_size;
110 unsigned int get_max_queue_size() const { return max_queue_size_; }
112 /** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/
113 ProducerConsumer* set_transfer_mode(TransferMode new_mode)
115 if (tmode_ == new_mode) /* No change, do nothing */
118 xbt_assert(empty(), "cannot change transfer mode when some data is in queue");
119 if (new_mode == TransferMode::MAILBOX) {
120 mbox_ = s4u::Mailbox::by_name(id);
127 std::string get_transfer_mode() const { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
129 /** Container-agnostic size() method */
130 unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); }
132 /** Container-agnostic empty() method */
133 bool empty() { return tmode_ == TransferMode::MAILBOX ? mbox_->empty() : queue_.empty(); }
135 /** Asynchronous put() of a data item of a given size
136 * - TransferMode::MAILBOX: if put_async is called directly from user code, it can be considered to be done in a
137 * fire-and-forget mode. No need to save the s4u::CommPtr.
138 * - TransferMode::QUEUE: the data is simply pushed into the queue.
140 s4u::CommPtr put_async(T* data, size_t simulated_size_in_bytes)
142 std::unique_lock<s4u::Mutex> lock(*mutex_);
143 s4u::CommPtr comm = nullptr;
144 XBT_CVERB(producer_consumer, (size() < max_queue_size_) ? "can put" : "must wait");
146 while (size() >= max_queue_size_)
147 can_put_->wait(lock);
148 if (tmode_ == TransferMode::MAILBOX) {
149 comm = mbox_->put_init(data, simulated_size_in_bytes)
150 ->set_copy_data_callback(s4u::Comm::copy_pointer_callback)
154 can_get_->notify_all();
158 /** Synchronous put() of a data item of a given size
159 * - TransferMode::MAILBOX: the caller must wait for the induced communication with the getter of the data to be
160 * complete to continue with its execution. This wait is done outside of the monitor to prevent serialization.
161 * - TransferMode::QUEUE: the behavior is exactly the same as put_async: data is simply pushed into the queue.
163 void put(T* data, size_t simulated_size_in_bytes)
165 s4u::CommPtr comm = put_async(data, simulated_size_in_bytes);
167 XBT_CDEBUG(producer_consumer, "Waiting for the data to be consumed");
172 /** Asynchronous get() of a 'data'
173 * - TransferMode::MAILBOX: the caller is returned a s4u::CommPtr onto which it can wait when the data is really
175 * - TransferMode::QUEUE: the data is simply popped from the queue and directly available. Better to call get() in
176 * this transfer mode.
178 s4u::CommPtr get_async(T** data)
180 std::unique_lock<s4u::Mutex> lock(*mutex_);
181 s4u::CommPtr comm = nullptr;
182 XBT_CVERB(producer_consumer, empty() ? "must wait" : "can get");
184 can_get_->wait(lock);
185 if (tmode_ == TransferMode::MAILBOX)
186 comm = mbox_->get_init()
187 ->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*))
188 ->set_copy_data_callback(s4u::Comm::copy_pointer_callback)
191 *data = queue_.front();
194 can_put_->notify_all();
199 /** Synchronous get() of a 'data'
200 * - TransferMode::MAILBOX: the caller waits (outside the monitor to prevent serialization) for the induced
201 * communication to be complete to continue with its execution.
202 * - TransferMode::QUEUE: the behavior is exactly the same as get_async: data is simply popped from the queue and
203 * directly available to the caller.
208 if (s4u::CommPtr comm = get_async(&data)) {
209 XBT_CDEBUG(producer_consumer, "Waiting for the data to arrive");
212 XBT_CDEBUG(producer_consumer, "data is available");
217 } // namespace plugin
218 } // namespace simgrid
220 #endif // SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP