Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Merge branch 'doc_link' into 'master'
[simgrid.git] / include / simgrid / plugins / ProducerConsumer.hpp
index 4297a0e..02c54da 100644 (file)
@@ -9,6 +9,7 @@
 #include <simgrid/s4u/ConditionVariable.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
 #include <simgrid/s4u/Mutex.hpp>
+#include <simgrid/simix.h>
 #include <xbt/asserts.h>
 #include <xbt/log.h>
 
@@ -17,7 +18,7 @@
 #include <queue>
 #include <string>
 
-XBT_LOG_NEW_CATEGORY(producer_consumer, "Producer-Consumer plugin logging category");
+XBT_LOG_EXTERNAL_CATEGORY(producer_consumer);
 
 /** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
 
@@ -27,7 +28,7 @@ namespace plugin {
 template <typename T> class ProducerConsumer;
 template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
 
-static unsigned long pc_id = 0;
+XBT_PUBLIC_DATA unsigned long pc_id;
 
 template <typename T> class ProducerConsumer {
 public:
@@ -70,7 +71,7 @@ private:
     }
   }
 
-  ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
+  explicit ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
   {
     xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
 
@@ -106,7 +107,7 @@ public:
     return this;
   }
 
-  unsigned int get_max_queue_size() { return max_queue_size_; }
+  unsigned int get_max_queue_size() const { return max_queue_size_; }
 
   /** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/
   ProducerConsumer* set_transfer_mode(TransferMode new_mode)
@@ -123,7 +124,7 @@ public:
     tmode_ = new_mode;
     return this;
   }
-  std::string get_transfer_mode() { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
+  std::string get_transfer_mode() const { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
 
   /** Container-agnostic size() method */
   unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); }
@@ -145,7 +146,9 @@ public:
     while (size() >= max_queue_size_)
       can_put_->wait(lock);
     if (tmode_ == TransferMode::MAILBOX) {
-      comm = mbox_->put_async(data, simulated_size_in_bytes);
+      comm = mbox_->put_init(data, simulated_size_in_bytes)
+                 ->set_copy_data_callback(SIMIX_comm_copy_pointer_callback)
+                 ->start();
     } else
       queue_.push(data);
     can_get_->notify_all();
@@ -180,7 +183,10 @@ public:
     while (empty())
       can_get_->wait(lock);
     if (tmode_ == TransferMode::MAILBOX)
-      comm = mbox_->get_async<T>(data);
+      comm = mbox_->get_init()
+                 ->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*))
+                 ->set_copy_data_callback(SIMIX_comm_copy_pointer_callback)
+                 ->start();
     else {
       *data = queue_.front();
       queue_.pop();