examples/cpp/platform-properties/s4u-platform-properties
examples/cpp/plugin-host-load/s4u-plugin-host-load
examples/cpp/plugin-link-load/s4u-plugin-link-load
+examples/cpp/plugin-prodcons/s4u-plugin-prodcons
examples/cpp/replay-comm/s4u-replay-comm
examples/cpp/replay-io/s4u-replay-io
examples/cpp/routing-get-clusters/s4u-routing-get-clusters
SimGrid (3.27.1) NOT RELEASED YET (v3.28 expected June 21. 2021, 03:32 UTC)
+New features:
+ - New plugin: Producer-Consumer with monitor. Just requires to include the
+ include/simgrid/plugins/ProducerConsumer.hpp header to be used. See the
+ associated example (examples/cpp/plugin-prodcons).
+
S4U:
- Fixed a bug where Activity::wait_for() killed the activity on timeout.
Explicitly cancel the activity to get back to previous behavior.
include examples/cpp/plugin-host-load/s4u-plugin-host-load.tesh
include examples/cpp/plugin-link-load/s4u-plugin-link-load.cpp
include examples/cpp/plugin-link-load/s4u-plugin-link-load.tesh
+include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.cpp
+include examples/cpp/plugin-prodcons/s4u-plugin-prodcons.tesh
include examples/cpp/replay-comm/s4u-replay-comm-split-p0.txt
include examples/cpp/replay-comm/s4u-replay-comm-split-p1.txt
include examples/cpp/replay-comm/s4u-replay-comm-split_d.xml
include include/simgrid/modelchecker.h
include include/simgrid/msg.h
include include/simgrid/mutex.h
+include include/simgrid/plugins/ProducerConsumer.hpp
include include/simgrid/plugins/dvfs.h
include include/simgrid/plugins/energy.h
include include/simgrid/plugins/file_system.h
network-wifi
io-async io-file-system io-file-remote io-disk-raw io-dependent
platform-failures platform-profile platform-properties
- plugin-host-load plugin-link-load
+ plugin-host-load plugin-link-load plugin-prodcons
replay-comm replay-io
routing-get-clusters
synchro-barrier synchro-condition-variable synchro-condition-variable-waituntil synchro-mutex synchro-semaphore
--- /dev/null
+/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/plugins/ProducerConsumer.hpp>
+#include <simgrid/s4u/Actor.hpp>
+#include <simgrid/s4u/Engine.hpp>
+#include <simgrid/s4u/Host.hpp>
+#include <xbt/random.hpp>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example");
+
+namespace sg4 = simgrid::s4u;
+
+static void ingester(int id, simgrid::plugin::ProducerConsumerPtr<int> pc)
+{
+ sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1));
+ for (int i = 0; i < 3; i++) {
+ int* data = new int(10 * id + i);
+ pc->put(data, 1.2125e6); // last for 0.01s
+ XBT_INFO("data sucessfully put: %d", *data);
+ sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ int* data = new int(10 * id + i);
+ pc->put_async(data, 1.2125e6); // last for 0.01s
+ XBT_INFO("data sucessfully put: %d", *data);
+ sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1));
+ }
+}
+
+static void retriever(int id, simgrid::plugin::ProducerConsumerPtr<int> pc)
+{
+ sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1));
+ for (int i = 0; i < 3; i++) {
+ int* data;
+ sg4::CommPtr comm = pc->get_async(&data);
+ comm->wait();
+ XBT_INFO("data sucessfully get: %d", *data);
+ delete data;
+ sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1));
+ }
+
+ for (int i = 0; i < 3; i++) {
+ int* data = pc->get();
+ XBT_INFO("data sucessfully get: %d", *data);
+ delete data;
+ sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1));
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ sg4::Engine e(&argc, argv);
+
+ // Platform creation
+ auto* cluster = sg4::create_star_zone("cluster");
+ for (int i = 0; i < 8; i++) {
+ std::string hostname = std::string("node-") + std::to_string(i) + ".simgrid.org";
+
+ const auto* host = cluster->create_host(hostname, "1Gf");
+
+ std::string linkname = std::string("cluster") + "_link_" + std::to_string(i);
+ auto* link_up = cluster->create_link(linkname + "_UP", "1Gbps");
+ auto* link_down = cluster->create_link(linkname + "_DOWN", "1Gbps");
+
+ cluster->add_route(host->get_netpoint(), nullptr, nullptr, nullptr, std::vector<sg4::Link*>{link_up}, false);
+ cluster->add_route(nullptr, host->get_netpoint(), nullptr, nullptr, std::vector<sg4::Link*>{link_down}, false);
+ }
+
+ auto* router = cluster->create_router("cluster_router");
+ cluster->add_route(router, nullptr, nullptr, nullptr, {});
+
+ simgrid::plugin::ProducerConsumerPtr<int> pc = simgrid::plugin::ProducerConsumer<int>::create(2);
+
+ XBT_INFO("Maximum number of queued data is %u", pc->get_max_queue_size());
+ XBT_INFO("Transfers are done in %s mode", pc->get_transfer_mode().c_str());
+
+ for (int i = 0; i < 3; i++) {
+ std::string hostname = std::string("node-") + std::to_string(i) + ".simgrid.org";
+ sg4::Actor::create("ingester-" + std::to_string(i), sg4::Host::by_name(hostname), &ingester, i, pc);
+
+ hostname = std::string("node-") + std::to_string(i + 3) + ".simgrid.org";
+ sg4::Actor::create("retriever-" + std::to_string(i), sg4::Host::by_name(hostname), &retriever, i, pc);
+ }
+
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+p This tests the ProducerConsumer plugin
+
+$ ${bindir:=.}/s4u-plugin-prodcons "--log=root.fmt:[%5.3r]%e[%11a]%e%m%n"
+> [0.000] [ maestro] Maximum number of queued data is 2
+> [0.000] [ maestro] Transfers are done in mailbox mode
+> [0.145] [ ingester-2] data sucessfully put: 20
+> [0.145] [retriever-0] data sucessfully get: 20
+> [0.825] [ ingester-0] data sucessfully put: 0
+> [0.825] [retriever-0] data sucessfully get: 0
+> [0.916] [ ingester-1] data sucessfully put: 10
+> [0.916] [retriever-1] data sucessfully get: 10
+> [1.218] [ ingester-1] data sucessfully put: 11
+> [1.218] [retriever-2] data sucessfully get: 11
+> [1.794] [ ingester-1] data sucessfully put: 12
+> [1.794] [retriever-2] data sucessfully get: 12
+> [2.340] [ ingester-1] data sucessfully put: 10
+> [2.350] [retriever-0] data sucessfully get: 10
+> [2.732] [ ingester-0] data sucessfully put: 1
+> [2.732] [retriever-1] data sucessfully get: 1
+> [5.765] [ ingester-1] data sucessfully put: 11
+> [5.775] [ ingester-2] data sucessfully put: 21
+> [5.775] [retriever-2] data sucessfully get: 21
+> [6.603] [ ingester-1] data sucessfully put: 12
+> [6.613] [ ingester-0] data sucessfully put: 2
+> [6.613] [retriever-1] data sucessfully get: 2
+> [7.172] [retriever-1] data sucessfully get: 11
+> [7.343] [retriever-0] data sucessfully get: 12
+> [7.570] [ ingester-0] data sucessfully put: 0
+> [8.638] [ ingester-2] data sucessfully put: 22
+> [8.638] [retriever-1] data sucessfully get: 22
+> [8.932] [retriever-1] data sucessfully get: 0
+> [8.935] [ ingester-2] data sucessfully put: 20
+> [9.747] [retriever-0] data sucessfully get: 20
+> [9.971] [ ingester-0] data sucessfully put: 1
+> [9.982] [retriever-0] data sucessfully get: 1
+> [10.200] [ ingester-2] data sucessfully put: 21
+> [10.638] [retriever-2] data sucessfully get: 21
+> [13.369] [ ingester-2] data sucessfully put: 22
+> [13.379] [retriever-2] data sucessfully get: 22
+> [13.634] [ ingester-0] data sucessfully put: 2
+> [14.396] [retriever-2] data sucessfully get: 2
--- /dev/null
+/* Copyright (c) 2021. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
+#define SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
+
+#include <simgrid/s4u/ConditionVariable.hpp>
+#include <simgrid/s4u/Mailbox.hpp>
+#include <simgrid/s4u/Mutex.hpp>
+#include <xbt/asserts.h>
+#include <xbt/log.h>
+
+#include <atomic>
+#include <climits>
+#include <queue>
+#include <string>
+
+XBT_LOG_NEW_CATEGORY(producer_consumer, "Producer-Consumer plugin logging category");
+
+/** Stock implementation of a generic monitored queue to solve the producer-consumer problem */
+
+namespace simgrid {
+namespace plugin {
+
+template <typename T> class ProducerConsumer;
+template <typename T> using ProducerConsumerPtr = boost::intrusive_ptr<ProducerConsumer<T>>;
+
+static unsigned long pc_id = 0;
+
+template <typename T> class ProducerConsumer {
+public:
+ /** This ProducerConsumer plugin can use two different transfer modes:
+ * - TransferMode::MAILBOX: this mode induces a s4u::Comm between the actors doing the calls to put() and get().
+ * If these actors are on the same host, this communication goes through the host's loopback and can thus be
+ * seen as a memory copy. Otherwise, data goes over the network.
+ * - TransferMode::QUEUE: data is internally stored in a std::queue. Putting and getting data to and from this
+ * data structure has a zero-cost in terms of simulated time.
+ * Both modes guarantee that the data is consumed in the order it has been produced. However, when data goes
+ * through the network, s4u::Comm are started in the right order, but may complete in a different order depending
+ * the characteristics of the different interconnections between host pairs.
+ */
+ enum class TransferMode { MAILBOX = 0, QUEUE };
+
+private:
+ std::string id;
+
+ /* Implementation of a Monitor to handle the data exchanges */
+ s4u::MutexPtr mutex_;
+ s4u::ConditionVariablePtr can_put_;
+ s4u::ConditionVariablePtr can_get_;
+
+ /* data containers for each of the transfer modes */
+ s4u::Mailbox* mbox_ = nullptr;
+ std::queue<T*> queue_;
+
+ unsigned int max_queue_size_ = 1;
+ TransferMode tmode_ = TransferMode::MAILBOX;
+
+ /* Refcounting management */
+ std::atomic_int_fast32_t refcount_{0};
+ friend void intrusive_ptr_add_ref(ProducerConsumer* pc) { pc->refcount_.fetch_add(1, std::memory_order_acq_rel); }
+
+ friend void intrusive_ptr_release(ProducerConsumer* pc)
+ {
+ if (pc->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ delete pc;
+ }
+ }
+
+ ProducerConsumer(unsigned int max_queue_size) : max_queue_size_(max_queue_size)
+ {
+ xbt_assert(max_queue_size > 0, "Max queue size of 0 is not allowed");
+
+ id = std::string("ProducerConsumer") + std::to_string(pc_id);
+ pc_id++;
+
+ mutex_ = s4u::Mutex::create();
+ can_put_ = s4u::ConditionVariable::create();
+ can_get_ = s4u::ConditionVariable::create();
+
+ if (tmode_ == TransferMode::MAILBOX)
+ mbox_ = s4u::Mailbox::by_name(id);
+ }
+ ~ProducerConsumer() = default;
+
+public:
+ /** Creation of the monitored queue. Its size can be bounded by passing a strictly positive value to 'max_queue_size'
+ * as parameter. Calling 'create()' means that the queue size is (virtually) infinite.
+ */
+ static ProducerConsumerPtr<T> create(unsigned int max_queue_size = UINT_MAX)
+ {
+ return ProducerConsumerPtr<T>(new ProducerConsumer<T>(max_queue_size));
+ }
+
+ /** This method is intended more to set the maximum queue size in a fluent way than changing the size during the
+ * utilization of the ProducerConsumer. Hence, the modification occurs in a critical section to prevent
+ * inconsistencies.
+ */
+ ProducerConsumer* set_max_queue_size(unsigned int max_queue_size)
+ {
+ std::unique_lock<s4u::Mutex> lock(*mutex_);
+ max_queue_size_ = max_queue_size;
+ return this;
+ }
+
+ unsigned int get_max_queue_size() { return max_queue_size_; }
+
+ /** The underlying data container (and transfer mode) can only be modified when the queue is empty.*/
+ ProducerConsumer* set_transfer_mode(TransferMode new_mode)
+ {
+ if (tmode_ == new_mode) /* No change, do nothing */
+ return this;
+
+ xbt_assert(empty(), "cannot change transfer mode when some data is in queue");
+ if (new_mode == TransferMode::MAILBOX) {
+ mbox_ = s4u::Mailbox::by_name(id);
+ } else {
+ mbox_ = nullptr;
+ }
+ tmode_ = new_mode;
+ return this;
+ }
+ std::string get_transfer_mode() { return tmode_ == TransferMode::MAILBOX ? "mailbox" : "queue"; }
+
+ /** Container-agnostic size() method */
+ unsigned int size() { return tmode_ == TransferMode::MAILBOX ? mbox_->size() : queue_.size(); }
+
+ /** Container-agnostic empty() method */
+ bool empty() { return tmode_ == TransferMode::MAILBOX ? mbox_->empty() : queue_.empty(); }
+
+ /** Asynchronous put() of a data item of a given size
+ * - TransferMode::MAILBOX: if put_async is called directly from user code, it can be considered to be done in a
+ * fire-and-forget mode. No need to save the s4u::CommPtr.
+ * - TransferMode::QUEUE: the data is simply pushed into the queue.
+ */
+ s4u::CommPtr put_async(T* data, size_t simulated_size_in_bytes)
+ {
+ std::unique_lock<s4u::Mutex> lock(*mutex_);
+ s4u::CommPtr comm = nullptr;
+ XBT_CVERB(producer_consumer, (size() < max_queue_size_) ? "can put" : "must wait");
+
+ while (size() >= max_queue_size_)
+ can_put_->wait(lock);
+ if (tmode_ == TransferMode::MAILBOX) {
+ comm = mbox_->put_async(data, simulated_size_in_bytes);
+ } else
+ queue_.push(data);
+ can_get_->notify_all();
+ return comm;
+ }
+
+ /** Synchronous put() of a data item of a given size
+ * - TransferMode::MAILBOX: the caller must wait for the induced communication with the getter of the data to be
+ * complete to continue with its execution. This wait is done outside of the monitor to prevent serialization.
+ * - TransferMode::QUEUE: the behavior is exactly the same as put_async: data is simply pushed into the queue.
+ */
+ void put(T* data, size_t simulated_size_in_bytes)
+ {
+ s4u::CommPtr comm = put_async(data, simulated_size_in_bytes);
+ if (comm) {
+ XBT_CDEBUG(producer_consumer, "Waiting for the data to be consumed");
+ comm->wait();
+ }
+ }
+
+ /** Asynchronous get() of a 'data'
+ * - TransferMode::MAILBOX: the caller is returned a s4u::CommPtr onto which it can wait when the data is really
+ * needed.
+ * - TransferMode::QUEUE: the data is simply popped from the queue and directly available. Better to call get() in
+ * this transfer mode.
+ */
+ s4u::CommPtr get_async(T** data)
+ {
+ std::unique_lock<s4u::Mutex> lock(*mutex_);
+ s4u::CommPtr comm = nullptr;
+ XBT_CVERB(producer_consumer, empty() ? "must wait" : "can get");
+ while (empty())
+ can_get_->wait(lock);
+ if (tmode_ == TransferMode::MAILBOX)
+ comm = mbox_->get_async<T>(data);
+ else {
+ *data = queue_.front();
+ queue_.pop();
+ }
+ can_put_->notify_all();
+
+ return comm;
+ }
+
+ /** Synchronous get() of a 'data'
+ * - TransferMode::MAILBOX: the caller waits (outside the monitor to prevent serialization) for the induced
+ * communication to be complete to continue with its execution.
+ * - TransferMode::QUEUE: the behavior is exactly the same as get_async: data is simply popped from the queue and
+ * directly available to the caller.
+ */
+ T* get()
+ {
+ T* data;
+ s4u::CommPtr comm = get_async(&data);
+ if (comm) {
+ XBT_CDEBUG(producer_consumer, "Waiting for the data to arrive");
+ comm->wait();
+ }
+ XBT_CDEBUG(producer_consumer, "data is available");
+ return data;
+ }
+};
+
+} // namespace plugin
+} // namespace simgrid
+
+#endif // SIMGRID_PLUGIN_PRODUCERCONSUMER_HPP
include/simgrid/plugins/file_system.h
include/simgrid/plugins/live_migration.h
include/simgrid/plugins/load.h
+ include/simgrid/plugins/ProducerConsumer.hpp
include/simgrid/smpi/smpi_replay.hpp
include/simgrid/instr.h
include/simgrid/mailbox.h