Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
introduce a new plugin: stock implementation of a Producer-Consumer with a monitor
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Wed, 19 May 2021 16:27:18 +0000 (18:27 +0200)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Wed, 19 May 2021 16:27:29 +0000 (18:27 +0200)
.gitignore
ChangeLog
MANIFEST.in
examples/cpp/CMakeLists.txt
examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp [new file with mode: 0644]
examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh [new file with mode: 0644]
include/simgrid/plugins/ProducerConsumer.hpp [new file with mode: 0644]
tools/cmake/DefinePackages.cmake

index ac8438a..d57fe08 100644 (file)
@@ -219,6 +219,7 @@ examples/cpp/platform-profile/s4u-platform-profile
 examples/cpp/platform-properties/s4u-platform-properties
 examples/cpp/plugin-host-load/s4u-plugin-host-load
 examples/cpp/plugin-link-load/s4u-plugin-link-load
+examples/cpp/plugin-prodcons/s4u-plugin-prodcons
 examples/cpp/replay-comm/s4u-replay-comm
 examples/cpp/replay-io/s4u-replay-io
 examples/cpp/routing-get-clusters/s4u-routing-get-clusters
index 194b7e2..41eea74 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -2,6 +2,11 @@
 
 SimGrid (3.27.1) NOT RELEASED YET (v3.28 expected June 21. 2021, 03:32 UTC)
 
+New features:
+ - New plugin: Producer-Consumer with monitor. Just requires to include the 
+   include/simgrid/plugins/ProducerConsumer.hpp header to be used. See the 
+   associated example (examples/cpp/plugin-prodcons).
+
 S4U:
  - Fixed a bug where Activity::wait_for() killed the activity on timeout.
    Explicitly cancel the activity to get back to previous behavior.
index 5225d57..8416dc1 100644 (file)
@@ -299,6 +299,8 @@ include examples/cpp/plugin-host-load/s4u-plugin-host-load.cpp
 include examples/cpp/plugin-host-load/s4u-plugin-host-load.tesh
 include examples/cpp/plugin-link-load/s4u-plugin-link-load.cpp
 include examples/cpp/plugin-link-load/s4u-plugin-link-load.tesh
+include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
+include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh
 include examples/cpp/replay-comm/s4u-replay-comm-split-p0.txt
 include examples/cpp/replay-comm/s4u-replay-comm-split-p1.txt
 include examples/cpp/replay-comm/s4u-replay-comm-split_d.xml
@@ -1994,6 +1996,7 @@ include include/simgrid/mailbox.h
 include include/simgrid/modelchecker.h
 include include/simgrid/msg.h
 include include/simgrid/mutex.h
+include include/simgrid/plugins/ProducerConsumer.hpp
 include include/simgrid/plugins/dvfs.h
 include include/simgrid/plugins/energy.h
 include include/simgrid/plugins/file_system.h
index ad691e1..1c58818 100644 (file)
@@ -76,7 +76,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                                 network-wifi
                  io-async io-file-system io-file-remote io-disk-raw io-dependent
                  platform-failures platform-profile platform-properties
-                 plugin-host-load plugin-link-load
+                 plugin-host-load plugin-link-load plugin-prodcons
                  replay-comm replay-io
                  routing-get-clusters
                  synchro-barrier synchro-condition-variable synchro-condition-variable-waituntil synchro-mutex synchro-semaphore
diff --git a/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp b/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
new file mode 100644 (file)
index 0000000..0faa27d
--- /dev/null
@@ -0,0 +1,92 @@
+/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/plugins/ProducerConsumer.hpp>
+#include <simgrid/s4u/Actor.hpp>
+#include <simgrid/s4u/Engine.hpp>
+#include <simgrid/s4u/Host.hpp>
+#include <xbt/random.hpp>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example");
+
+namespace sg4 = simgrid::s4u;
+
+static void ingester(int id, simgrid::plugin::ProducerConsumerPtr<int> pc)
+{
+  sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1));
+  for (int i = 0; i < 3; i++) {
+    int* data = new int(10 * id + i);
+    pc->put(data, 1.2125e6); // last for 0.01s
+    XBT_INFO("data sucessfully put: %d", *data);
+    sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1));
+  }
+
+  for (int i = 0; i < 3; i++) {
+    int* data = new int(10 * id + i);
+    pc->put_async(data, 1.2125e6); // last for 0.01s
+    XBT_INFO("data sucessfully put: %d", *data);
+    sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1));
+  }
+}
+
+static void retriever(int id, simgrid::plugin::ProducerConsumerPtr<int> pc)
+{
+  sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1));
+  for (int i = 0; i < 3; i++) {
+    int* data;
+    sg4::CommPtr comm = pc->get_async(&data);
+    comm->wait();
+    XBT_INFO("data sucessfully get: %d", *data);
+    delete data;
+    sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1));
+  }
+
+  for (int i = 0; i < 3; i++) {
+    int* data = pc->get();
+    XBT_INFO("data sucessfully get: %d", *data);
+    delete data;
+    sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1));
+  }
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+
+  // Platform creation
+  auto* cluster = sg4::create_star_zone("cluster");
+  for (int i = 0; i < 8; i++) {
+    std::string hostname = std::string("node-") + std::to_string(i) + ".simgrid.org";
+
+    const auto* host = cluster->create_host(hostname, "1Gf");
+
+    std::string linkname = std::string("cluster") + "_link_" + std::to_string(i);
+    auto* link_up        = cluster->create_link(linkname + "_UP", "1Gbps");
+    auto* link_down      = cluster->create_link(linkname + "_DOWN", "1Gbps");
+
+    cluster->add_route(host->get_netpoint(), nullptr, nullptr, nullptr, std::vector<sg4::Link*>{link_up}, false);
+    cluster->add_route(nullptr, host->get_netpoint(), nullptr, nullptr, std::vector<sg4::Link*>{link_down}, false);
+  }
+
+  auto* router = cluster->create_router("cluster_router");
+  cluster->add_route(router, nullptr, nullptr, nullptr, {});
+
+  simgrid::plugin::ProducerConsumerPtr<int> pc = simgrid::plugin::ProducerConsumer<int>::create(2);
+
+  XBT_INFO("Maximum number of queued data is %u", pc->get_max_queue_size());
+  XBT_INFO("Transfers are done in %s mode", pc->get_transfer_mode().c_str());
+
+  for (int i = 0; i < 3; i++) {
+    std::string hostname = std::string("node-") + std::to_string(i) + ".simgrid.org";
+    sg4::Actor::create("ingester-" + std::to_string(i), sg4::Host::by_name(hostname), &ingester, i, pc);
+
+    hostname = std::string("node-") + std::to_string(i + 3) + ".simgrid.org";
+    sg4::Actor::create("retriever-" + std::to_string(i), sg4::Host::by_name(hostname), &retriever, i, pc);
+  }
+
+  e.run();
+
+  return 0;
+}
diff --git a/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh b/examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh
new file mode 100644 (file)
index 0000000..aea58a2
--- /dev/null
@@ -0,0 +1,43 @@
+#!/usr/bin/env tesh
+
+p This tests the ProducerConsumer plugin
+
+$ ${bindir:=.}/s4u-plugin-prodcons "--log=root.fmt:[%5.3r]%e[%11a]%e%m%n"
+> [0.000] [    maestro] Maximum number of queued data is 2
+> [0.000] [    maestro] Transfers are done in mailbox mode
+> [0.145] [ ingester-2] data sucessfully put: 20
+> [0.145] [retriever-0] data sucessfully get: 20
+> [0.825] [ ingester-0] data sucessfully put: 0
+> [0.825] [retriever-0] data sucessfully get: 0
+> [0.916] [ ingester-1] data sucessfully put: 10
+> [0.916] [retriever-1] data sucessfully get: 10
+> [1.218] [ ingester-1] data sucessfully put: 11
+> [1.218] [retriever-2] data sucessfully get: 11
+> [1.794] [ ingester-1] data sucessfully put: 12
+> [1.794] [retriever-2] data sucessfully get: 12
+> [2.340] [ ingester-1] data sucessfully put: 10
+> [2.350] [retriever-0] data sucessfully get: 10
+> [2.732] [ ingester-0] data sucessfully put: 1
+> [2.732] [retriever-1] data sucessfully get: 1
+> [5.765] [ ingester-1] data sucessfully put: 11
+> [5.775] [ ingester-2] data sucessfully put: 21
+> [5.775] [retriever-2] data sucessfully get: 21
+> [6.603] [ ingester-1] data sucessfully put: 12
+> [6.613] [ ingester-0] data sucessfully put: 2
+> [6.613] [retriever-1] data sucessfully get: 2
+> [7.172] [retriever-1] data sucessfully get: 11
+> [7.343] [retriever-0] data sucessfully get: 12
+> [7.570] [ ingester-0] data sucessfully put: 0
+> [8.638] [ ingester-2] data sucessfully put: 22
+> [8.638] [retriever-1] data sucessfully get: 22
+> [8.932] [retriever-1] data sucessfully get: 0
+> [8.935] [ ingester-2] data sucessfully put: 20
+> [9.747] [retriever-0] data sucessfully get: 20
+> [9.971] [ ingester-0] data sucessfully put: 1
+> [9.982] [retriever-0] data sucessfully get: 1
+> [10.200] [ ingester-2] data sucessfully put: 21
+> [10.638] [retriever-2] data sucessfully get: 21
+> [13.369] [ ingester-2] data sucessfully put: 22
+> [13.379] [retriever-2] data sucessfully get: 22
+> [13.634] [ ingester-0] data sucessfully put: 2
+> [14.396] [retriever-2] data sucessfully get: 2
diff --git a/include/simgrid/plugins/ProducerConsumer.hpp b/include/simgrid/plugins/ProducerConsumer.hpp
new file mode 100644 (file)
index 0000000..4297a0e
--- /dev/null
@@ -0,0 +1,215 @@
+/* Copyright (c) 2021. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
+#define SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
+
+#include <simgrid/s4u/ConditionVariable.hpp>
+#include <simgrid/s4u/Mailbox.hpp>
+#include <simgrid/s4u/Mutex.hpp>
+#include <xbt/asserts.h>
+#include <xbt/log.h>
+
+#include <atomic>
+#include <climits>
+#include <queue>
+#include <string>
+
+XBT_LOG_NEW_CATEGORY(producer_consumer, "Producer-Consumer plugin logging category");
+
+/** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
+
+namespace simgrid {
+namespace plugin {
+
+template <typename T> class ProducerConsumer;
+template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
+
+static unsigned long pc_id = 0;
+
+template <typename T> class ProducerConsumer {
+public:
+  /** This ProducerConsumer plugin can use two different transfer modes:
+   *   - TransferMode::MAILBOX: this mode induces a s4u::Comm between the actors doing the calls to put() and get().
+   *     If these actors are on the same host, this communication goes through the host's loopback and can thus be
+   *     seen as a memory copy. Otherwise, data goes over the network.
+   *   - TransferMode::QUEUE: data is internally stored in a std::queue. Putting and getting data to and from this
+   *     data structure has a zero-cost in terms of simulated time.
+   *  Both modes guarantee that the data is consumed in the order it has been produced. However, when data goes
+   *  through the network, s4u::Comm are started in the right order, but may complete in a different order depending
+   *  the characteristics of the different interconnections between host pairs.
+   */
+  enum class TransferMode { MAILBOX = 0, QUEUE };
+
+private:
+  std::string id;
+
+  /* Implementation of a Monitor to handle the data exchanges */
+  s4u::MutexPtr mutex_;
+  s4u::ConditionVariablePtr can_put_;
+  s4u::ConditionVariablePtr can_get_;
+
+  /* data containers for each of the transfer modes */
+  s4u::Mailbox* mbox_ = nullptr;
+  std::queue<T*> queue_;
+
+  unsigned int max_queue_size_ = 1;
+  TransferMode tmode_          = TransferMode::MAILBOX;
+
+  /* Refcounting management */
+  std::atomic_int_fast32_t refcount_{0};
+  friend void intrusive_ptr_add_ref(ProducerConsumer* pc) { pc->refcount_.fetch_add(1, std::memory_order_acq_rel); }
+
+  friend void intrusive_ptr_release(ProducerConsumer* pc)
+  {
+    if (pc->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+      std::atomic_thread_fence(std::memory_order_acquire);
+      delete pc;
+    }
+  }
+
+  ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
+  {
+    xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
+
+    id = std::string("ProducerConsumer") + std::to_string(pc_id);
+    pc_id++;
+
+    mutex_   = s4u::Mutex::create();
+    can_put_ = s4u::ConditionVariable::create();
+    can_get_ = s4u::ConditionVariable::create();
+
+    if (tmode_ == TransferMode::MAILBOX)
+      mbox_ = s4u::Mailbox::by_name(id);
+  }
+  ~ProducerConsumer() = default;
+
+public:
+  /** Creation of the monitored queue. Its size can be bounded by passing a strictly positive value to 'max_queue_size'
+   *  as parameter. Calling 'create()' means that the queue size is (virtually) infinite.
+   */
+  static ProducerConsumerPtr<T> create(unsigned int max_queue_size = UINT_MAX)
+  {
+    return ProducerConsumerPtr<T>(new ProducerConsumer<T>(max_queue_size));
+  }
+
+  /** This method is intended more to set the maximum queue size in a fluent way than changing the size during the
+   *  utilization of the ProducerConsumer. Hence, the modification occurs in a critical section to prevent
+   *  inconsistencies.
+   */
+  ProducerConsumer* set_max_queue_size(unsigned int max_queue_size)
+  {
+    std::unique_lock<s4u::Mutex> lock(*mutex_);
+    max_queue_size_ = max_queue_size;
+    return this;
+  }
+
+  unsigned int get_max_queue_size() { return max_queue_size_; }
+
+  /** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/
+  ProducerConsumer* set_transfer_mode(TransferMode new_mode)
+  {
+    if (tmode_ == new_mode) /* No change, do nothing */
+      return this;
+
+    xbt_assert(empty(), "cannot change transfer mode when some data is in queue");
+    if (new_mode == TransferMode::MAILBOX) {
+      mbox_ = s4u::Mailbox::by_name(id);
+    } else {
+      mbox_ = nullptr;
+    }
+    tmode_ = new_mode;
+    return this;
+  }
+  std::string get_transfer_mode() { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
+
+  /** Container-agnostic size() method */
+  unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); }
+
+  /** Container-agnostic empty() method */
+  bool empty() { return tmode_ == TransferMode::MAILBOX ? mbox_->empty() : queue_.empty(); }
+
+  /** Asynchronous put() of a data item of a given size
+   *  - TransferMode::MAILBOX: if put_async is called directly from user code, it can be considered to be done in a
+   *    fire-and-forget mode. No need to save the s4u::CommPtr.
+   *  - TransferMode::QUEUE: the data is simply pushed into the queue.
+   */
+  s4u::CommPtr put_async(T* data, size_t simulated_size_in_bytes)
+  {
+    std::unique_lock<s4u::Mutex> lock(*mutex_);
+    s4u::CommPtr comm = nullptr;
+    XBT_CVERB(producer_consumer, (size() < max_queue_size_) ? "can put" : "must wait");
+
+    while (size() >= max_queue_size_)
+      can_put_->wait(lock);
+    if (tmode_ == TransferMode::MAILBOX) {
+      comm = mbox_->put_async(data, simulated_size_in_bytes);
+    } else
+      queue_.push(data);
+    can_get_->notify_all();
+    return comm;
+  }
+
+  /** Synchronous put() of a data item of a given size
+   *  - TransferMode::MAILBOX: the caller must wait for the induced communication with the getter of the data to be
+   *    complete to continue with its execution. This wait is done outside of the monitor to prevent serialization.
+   *  - TransferMode::QUEUE: the behavior is exactly the same as put_async: data is simply pushed into the queue.
+   */
+  void put(T* data, size_t simulated_size_in_bytes)
+  {
+    s4u::CommPtr comm = put_async(data, simulated_size_in_bytes);
+    if (comm) {
+      XBT_CDEBUG(producer_consumer, "Waiting for the data to be consumed");
+      comm->wait();
+    }
+  }
+
+  /** Asynchronous get() of a 'data'
+   *  - TransferMode::MAILBOX: the caller is returned a s4u::CommPtr onto which it can wait when the data is really
+   *    needed.
+   *  - TransferMode::QUEUE: the data is simply popped from the queue and directly available. Better to call get() in
+   *    this transfer mode.
+   */
+  s4u::CommPtr get_async(T** data)
+  {
+    std::unique_lock<s4u::Mutex> lock(*mutex_);
+    s4u::CommPtr comm = nullptr;
+    XBT_CVERB(producer_consumer, empty() ? "must wait" : "can get");
+    while (empty())
+      can_get_->wait(lock);
+    if (tmode_ == TransferMode::MAILBOX)
+      comm = mbox_->get_async<T>(data);
+    else {
+      *data = queue_.front();
+      queue_.pop();
+    }
+    can_put_->notify_all();
+
+    return comm;
+  }
+
+  /** Synchronous get() of a 'data'
+   *  - TransferMode::MAILBOX: the caller waits (outside the monitor to prevent serialization) for the induced
+   *    communication to be complete to continue with its execution.
+   *  - TransferMode::QUEUE: the behavior is exactly the same as get_async: data is simply popped from the queue and
+   *    directly available to the caller.
+   */
+  T* get()
+  {
+    T* data;
+    s4u::CommPtr comm = get_async(&data);
+    if (comm) {
+      XBT_CDEBUG(producer_consumer, "Waiting for the data to arrive");
+      comm->wait();
+    }
+    XBT_CDEBUG(producer_consumer, "data is available");
+    return data;
+  }
+};
+
+} // namespace plugin
+} // namespace simgrid
+
+#endif // SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
index f36808b..d756d58 100644 (file)
@@ -680,6 +680,7 @@ set(headers_to_install
   include/simgrid/plugins/file_system.h
   include/simgrid/plugins/live_migration.h
   include/simgrid/plugins/load.h
+  include/simgrid/plugins/ProducerConsumer.hpp
   include/simgrid/smpi/smpi_replay.hpp
   include/simgrid/instr.h
   include/simgrid/mailbox.h