-/* Copyright (c) 2021. The SimGrid Team. All rights reserved. */
+/* Copyright (c) 2021-2023. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#ifndef SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
#define SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
+#include <simgrid/s4u/Comm.hpp>
#include <simgrid/s4u/ConditionVariable.hpp>
#include <simgrid/s4u/Mailbox.hpp>
#include <simgrid/s4u/Mutex.hpp>
/** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
-namespace simgrid {
-namespace plugin {
+namespace simgrid::plugin {
template <typename T> class ProducerConsumer;
template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
-XBT_PUBLIC_DATA unsigned long pc_id;
+class ProducerConsumerId {
+private:
+ static unsigned long pc_id;
+
+protected:
+ const std::string id = "ProducerConsumer" + std::to_string(pc_id);
+ ProducerConsumerId() { ++pc_id; }
+};
-template <typename T> class ProducerConsumer {
+template <typename T> class ProducerConsumer : public ProducerConsumerId {
public:
/** This ProducerConsumer plugin can use two different transfer modes:
* - TransferMode::MAILBOX: this mode induces a s4u::Comm between the actors doing the calls to put() and get().
enum class TransferMode { MAILBOX = 0, QUEUE };
private:
- std::string id;
-
/* Implementation of a Monitor to handle the data exchanges */
s4u::MutexPtr mutex_;
s4u::ConditionVariablePtr can_put_;
{
xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
- id = std::string("ProducerConsumer") + std::to_string(pc_id);
- pc_id++;
-
mutex_ = s4u::Mutex::create();
can_put_ = s4u::ConditionVariable::create();
can_get_ = s4u::ConditionVariable::create();
*/
ProducerConsumer* set_max_queue_size(unsigned int max_queue_size)
{
- std::unique_lock<s4u::Mutex> lock(*mutex_);
+ const std::scoped_lock lock(*mutex_);
max_queue_size_ = max_queue_size;
return this;
}
*/
s4u::CommPtr put_async(T* data, size_t simulated_size_in_bytes)
{
- std::unique_lock<s4u::Mutex> lock(*mutex_);
+ std::unique_lock lock(*mutex_);
s4u::CommPtr comm = nullptr;
XBT_CVERB(producer_consumer, (size() < max_queue_size_) ? "can put" : "must wait");
while (size() >= max_queue_size_)
can_put_->wait(lock);
if (tmode_ == TransferMode::MAILBOX) {
- comm = mbox_->put_async(data, simulated_size_in_bytes);
+ comm = mbox_->put_init(data, simulated_size_in_bytes)
+ ->start();
} else
queue_.push(data);
can_get_->notify_all();
*/
s4u::CommPtr get_async(T** data)
{
- std::unique_lock<s4u::Mutex> lock(*mutex_);
+ std::unique_lock lock(*mutex_);
s4u::CommPtr comm = nullptr;
XBT_CVERB(producer_consumer, empty() ? "must wait" : "can get");
while (empty())
can_get_->wait(lock);
if (tmode_ == TransferMode::MAILBOX)
- comm = mbox_->get_async<T>(data);
+ comm = mbox_->get_init()
+ ->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*))
+ ->start();
else {
*data = queue_.front();
queue_.pop();
T* get()
{
T* data;
- s4u::CommPtr comm = get_async(&data);
- if (comm) {
+ if (s4u::CommPtr comm = get_async(&data)) {
XBT_CDEBUG(producer_consumer, "Waiting for the data to arrive");
comm->wait();
}
}
};
-} // namespace plugin
-} // namespace simgrid
+} // namespace simgrid::plugin
#endif // SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP