#include <simgrid/s4u/ConditionVariable.hpp>
#include <simgrid/s4u/Mailbox.hpp>
#include <simgrid/s4u/Mutex.hpp>
+#include <simgrid/simix.h>
#include <xbt/asserts.h>
#include <xbt/log.h>
#include <queue>
#include <string>
-XBT_LOG_NEW_CATEGORY(producer_consumer, "Producer-Consumer plugin logging category");
+XBT_LOG_EXTERNAL_CATEGORY(producer_consumer);
/** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
template <typename T> class ProducerConsumer;
template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
-static unsigned long pc_id = 0;
+XBT_PUBLIC_DATA unsigned long pc_id;
template <typename T> class ProducerConsumer {
public:
}
}
- ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
+ explicit ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
{
xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
return this;
}
- unsigned int get_max_queue_size() { return max_queue_size_; }
+ unsigned int get_max_queue_size() const { return max_queue_size_; }
/** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/
ProducerConsumer* set_transfer_mode(TransferMode new_mode)
tmode_ = new_mode;
return this;
}
- std::string get_transfer_mode() { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
+ std::string get_transfer_mode() const { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
/** Container-agnostic size() method */
unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); }
while (size() >= max_queue_size_)
can_put_->wait(lock);
if (tmode_ == TransferMode::MAILBOX) {
- comm = mbox_->put_async(data, simulated_size_in_bytes);
+ comm = mbox_->put_init(data, simulated_size_in_bytes)
+ ->set_copy_data_callback(SIMIX_comm_copy_pointer_callback)
+ ->start();
} else
queue_.push(data);
can_get_->notify_all();
while (empty())
can_get_->wait(lock);
if (tmode_ == TransferMode::MAILBOX)
- comm = mbox_->get_async<T>(data);
+ comm = mbox_->get_init()
+ ->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*))
+ ->set_copy_data_callback(SIMIX_comm_copy_pointer_callback)
+ ->start();
else {
*data = queue_.front();
queue_.pop();