Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add Message queue abstraction
authorFred Suter <suterf@ornl.gov>
Thu, 19 Oct 2023 20:19:45 +0000 (16:19 -0400)
committerFred Suter <suterf@ornl.gov>
Tue, 24 Oct 2023 21:31:07 +0000 (17:31 -0400)
22 files changed:
MANIFEST.in
examples/cpp/CMakeLists.txt
examples/cpp/mess-wait/s4u-mess-wait.cpp [new file with mode: 0644]
examples/cpp/mess-wait/s4u-mess-wait.tesh [new file with mode: 0644]
include/simgrid/forward.h
include/simgrid/s4u.hpp
include/simgrid/s4u/Activity.hpp
include/simgrid/s4u/Engine.hpp
include/simgrid/s4u/Mess.hpp [new file with mode: 0644]
include/simgrid/s4u/MessageQueue.hpp [new file with mode: 0644]
src/kernel/EngineImpl.cpp
src/kernel/EngineImpl.hpp
src/kernel/activity/MessImpl.cpp [new file with mode: 0644]
src/kernel/activity/MessImpl.hpp [new file with mode: 0644]
src/kernel/activity/MessageQueueImpl.cpp [new file with mode: 0644]
src/kernel/activity/MessageQueueImpl.hpp [new file with mode: 0644]
src/kernel/actor/CommObserver.cpp
src/kernel/actor/CommObserver.hpp
src/s4u/s4u_Engine.cpp
src/s4u/s4u_Mess.cpp [new file with mode: 0644]
src/s4u/s4u_MessageQueue.cpp [new file with mode: 0644]
tools/cmake/DefinePackages.cmake

index 1b8ad27..2a24cda 100644 (file)
@@ -330,6 +330,8 @@ include examples/cpp/mc-failing-assert/s4u-mc-failing-assert-nodpor.tesh
 include examples/cpp/mc-failing-assert/s4u-mc-failing-assert-statequality.tesh
 include examples/cpp/mc-failing-assert/s4u-mc-failing-assert.cpp
 include examples/cpp/mc-failing-assert/s4u-mc-failing-assert.tesh
+include examples/cpp/mess-wait/s4u-mess-wait.cpp
+include examples/cpp/mess-wait/s4u-mess-wait.tesh
 include examples/cpp/network-factors/s4u-network-factors.cpp
 include examples/cpp/network-factors/s4u-network-factors.tesh
 include examples/cpp/network-nonlinear/s4u-network-nonlinear.cpp
@@ -348,8 +350,6 @@ include examples/cpp/network-ns3/s4u-network-ns3-timed.tesh
 include examples/cpp/network-ns3/s4u-network-ns3.cpp
 include examples/cpp/network-wifi/s4u-network-wifi.cpp
 include examples/cpp/network-wifi/s4u-network-wifi.tesh
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
-include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
 include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp
 include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.tesh
 include examples/cpp/platform-failures/s4u-platform-failures.cpp
@@ -380,6 +380,8 @@ include examples/cpp/replay-io/s4u-replay-io.txt
 include examples/cpp/replay-io/s4u-replay-io_d.xml
 include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.cpp
 include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.tesh
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp
+include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh
 include examples/cpp/synchro-barrier/s4u-mc-synchro-barrier.tesh
 include examples/cpp/synchro-barrier/s4u-synchro-barrier.cpp
 include examples/cpp/synchro-barrier/s4u-synchro-barrier.tesh
@@ -1966,6 +1968,7 @@ include include/simgrid/s4u/Host.hpp
 include include/simgrid/s4u/Io.hpp
 include include/simgrid/s4u/Link.hpp
 include include/simgrid/s4u/Mailbox.hpp
+include include/simgrid/s4u/MessageQueue.hpp
 include include/simgrid/s4u/Mutex.hpp
 include include/simgrid/s4u/NetZone.hpp
 include include/simgrid/s4u/Semaphore.hpp
@@ -2062,6 +2065,10 @@ include src/kernel/activity/IoImpl.cpp
 include src/kernel/activity/IoImpl.hpp
 include src/kernel/activity/MailboxImpl.cpp
 include src/kernel/activity/MailboxImpl.hpp
+include src/kernel/activity/MessImpl.cpp
+include src/kernel/activity/MessImpl.hpp
+include src/kernel/activity/MessageQueueImpl.cpp
+include src/kernel/activity/MessageQueueImpl.hpp
 include src/kernel/activity/MutexImpl.cpp
 include src/kernel/activity/MutexImpl.hpp
 include src/kernel/activity/SemaphoreImpl.cpp
@@ -2346,6 +2353,8 @@ include src/s4u/s4u_Host.cpp
 include src/s4u/s4u_Io.cpp
 include src/s4u/s4u_Link.cpp
 include src/s4u/s4u_Mailbox.cpp
+include src/s4u/s4u_Mess.cpp
+include src/s4u/s4u_MessageQueue.cpp
 include src/s4u/s4u_Mutex.cpp
 include src/s4u/s4u_Netzone.cpp
 include src/s4u/s4u_Semaphore.cpp
index b917540..7de7809 100644 (file)
@@ -170,6 +170,7 @@ foreach (example activityset-testany activityset-waitany activityset-waitall act
                  exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure exec-threads
                  maestro-set
                  mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert
+                 mess-wait
                  network-ns3 network-ns3-wifi network-wifi
                  io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent
                  task-dispatch task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load
diff --git a/examples/cpp/mess-wait/s4u-mess-wait.cpp b/examples/cpp/mess-wait/s4u-mess-wait.cpp
new file mode 100644 (file)
index 0000000..0e20a61
--- /dev/null
@@ -0,0 +1,79 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example shows how to use simgrid::s4u::this_actor::wait() to wait for a given communication.
+ *
+ * As for the other asynchronous examples, the sender initiate all the messages it wants to send and
+ * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently.
+ *
+ * The sender then loops until there is no ongoing communication.
+ */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+namespace sg4 = simgrid::s4u;
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_mess_wait, "Messages specific for this s4u example");
+
+static void sender(int messages_count)
+{
+  sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("control");
+
+  sg4::this_actor::sleep_for(0.5);
+
+  for (int i = 0; i < messages_count; i++) {
+    std::string msg_content = "Message " + std::to_string(i);
+    // Copy the data we send: the 'msg_content' variable is not a stable storage location.
+    // It will be destroyed when this actor leaves the loop, ie before the receiver gets the data
+    auto* payload = new std::string(msg_content);
+
+    /* Create a control message and put it in the message queue */
+    sg4::MessPtr mess = mqueue->put_async(payload);
+    XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mqueue->get_cname());
+    mess->wait();
+  }
+
+  /* Send message to let the receiver know that it should stop */
+  XBT_INFO("Send 'finalize' to 'receiver'");
+  mqueue->put(new std::string("finalize"), 0);
+}
+
+/* Receiver actor expects 1 argument: its ID */
+static void receiver()
+{
+  sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("control");
+
+  sg4::this_actor::sleep_for(1);
+
+  XBT_INFO("Wait for my first message");
+  for (bool cont = true; cont;) {
+    std::string* received;
+    sg4::MessPtr mess = mqueue->get_async<std::string>(&received);
+
+    sg4::this_actor::sleep_for(0.1);
+    mess->wait();
+
+    XBT_INFO("I got a '%s'.", received->c_str());
+    if (*received == "finalize")
+      cont = false; // If it's a finalize message, we're done.
+    delete received;
+  }
+}
+
+int main(int argc, char* argv[])
+{
+  sg4::Engine e(&argc, argv);
+
+  e.load_platform(argv[1]);
+
+  sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 3);
+  sg4::Actor::create("receiver", e.host_by_name("Fafard"), receiver);
+
+  e.run();
+
+  return 0;
+}
diff --git a/examples/cpp/mess-wait/s4u-mess-wait.tesh b/examples/cpp/mess-wait/s4u-mess-wait.tesh
new file mode 100644 (file)
index 0000000..3f62420
--- /dev/null
@@ -0,0 +1,12 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-mess-wait ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [  0.500000] (1:sender@Tremblay) Send 'Message 0' to 'control'
+> [  1.000000] (2:receiver@Fafard) Wait for my first message
+> [  1.100000] (2:receiver@Fafard) I got a 'Message 0'.
+> [  1.100000] (1:sender@Tremblay) Send 'Message 1' to 'control'
+> [  1.100000] (1:sender@Tremblay) Send 'Message 2' to 'control'
+> [  1.200000] (2:receiver@Fafard) I got a 'Message 1'.
+> [  1.300000] (1:sender@Tremblay) Send 'finalize' to 'receiver'
+> [  1.300000] (2:receiver@Fafard) I got a 'Message 2'.
+> [  1.400000] (2:receiver@Fafard) I got a 'finalize'.
\ No newline at end of file
index 2c1da4f..ff408d4 100644 (file)
@@ -76,6 +76,14 @@ class SplitDuplexLink;
 
 class Mailbox;
 
+class Mess;
+/** Smart pointer to a simgrid::s4u::Mess */
+using MessPtr = boost::intrusive_ptr<Mess>;
+XBT_PUBLIC void intrusive_ptr_release(Mess* c);
+XBT_PUBLIC void intrusive_ptr_add_ref(Mess* c);
+
+class MessageQueue;
+
 class Mutex;
 XBT_PUBLIC void intrusive_ptr_release(const Mutex* m);
 XBT_PUBLIC void intrusive_ptr_add_ref(const Mutex* m);
@@ -152,6 +160,8 @@ namespace activity {
   using ExecImplPtr = boost::intrusive_ptr<ExecImpl>;
   class IoImpl;
   using IoImplPtr = boost::intrusive_ptr<IoImpl>;
+  class MessImpl;
+  using MessImplPtr = boost::intrusive_ptr<MessImpl>;
   class MutexImpl;
   using MutexImplPtr = boost::intrusive_ptr<MutexImpl>;
   class MutexAcquisitionImpl;
@@ -170,6 +180,7 @@ namespace activity {
   using SleepImplPtr = boost::intrusive_ptr<SleepImpl>;
 
   class MailboxImpl;
+  class MessageQueueImpl;
 }
 namespace context {
 class Context;
@@ -232,6 +243,7 @@ using s4u_Link              = simgrid::s4u::Link;
 using s4u_File              = simgrid::s4u::File;
 using s4u_ConditionVariable = simgrid::s4u::ConditionVariable;
 using s4u_Mailbox           = simgrid::s4u::Mailbox;
+using s4u_MessageQueue      = simgrid::s4u::MessageQueue;
 using s4u_Mutex             = simgrid::s4u::Mutex;
 using s4u_Semaphore         = simgrid::s4u::Semaphore;
 using s4u_Disk              = simgrid::s4u::Disk;
@@ -252,6 +264,7 @@ typedef struct s4u_Link s4u_Link;
 typedef struct s4u_File s4u_File;
 typedef struct s4u_ConditionVariable s4u_ConditionVariable;
 typedef struct s4u_Mailbox s4u_Mailbox;
+typedef struct s4u_MessageQueue s4u_MessageQueue;
 typedef struct s4u_Mutex s4u_Mutex;
 typedef struct s4u_Semaphore s4u_Semaphore;
 typedef struct s4u_Disk s4u_Disk;
@@ -271,6 +284,8 @@ typedef s4u_ConditionVariable* sg_cond_t;
 typedef const s4u_ConditionVariable* const_sg_cond_t;
 typedef s4u_Mailbox* sg_mailbox_t;
 typedef const s4u_Mailbox* const_sg_mailbox_t;
+typedef s4u_MessageQueue* sg_messagequeue_t;
+typedef const s4u_MessageQueue* const_sg_messagequeue_t;
 typedef s4u_Mutex* sg_mutex_t;
 typedef const s4u_Mutex* const_sg_mutex_t;
 typedef s4u_Semaphore* sg_sem_t;
index b285322..8ddbf1a 100644 (file)
@@ -19,6 +19,8 @@
 #include <simgrid/s4u/Host.hpp>
 #include <simgrid/s4u/Link.hpp>
 #include <simgrid/s4u/Mailbox.hpp>
+#include <simgrid/s4u/Mess.hpp>
+#include <simgrid/s4u/MessageQueue.hpp>
 #include <simgrid/s4u/Mutex.hpp>
 #include <simgrid/s4u/NetZone.hpp>
 #include <simgrid/s4u/Semaphore.hpp>
index ef51321..8b9f485 100644 (file)
@@ -38,6 +38,7 @@ class XBT_PUBLIC Activity : public xbt::Extendable<Activity> {
   friend Comm;
   friend Exec;
   friend Io;
+  friend Mess;
   friend kernel::activity::ActivityImpl;
   friend std::vector<ActivityPtr> create_DAG_from_dot(const std::string& filename);
   friend std::vector<ActivityPtr> create_DAG_from_DAX(const std::string& filename);
index d92987d..fbef8cc 100644 (file)
@@ -155,6 +155,7 @@ public:
   Link* link_by_name_or_null(const std::string& name) const;
 
   Mailbox* mailbox_by_name_or_create(const std::string& name) const;
+  MessageQueue* message_queue_by_name_or_create(const std::string& name) const;
 
   size_t get_actor_count() const;
   std::vector<ActorPtr> get_all_actors() const;
diff --git a/include/simgrid/s4u/Mess.hpp b/include/simgrid/s4u/Mess.hpp
new file mode 100644 (file)
index 0000000..c0eaf9a
--- /dev/null
@@ -0,0 +1,66 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_MESS_HPP
+#define SIMGRID_S4U_MESS_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Activity.hpp>
+
+#include <string>
+#include <vector>
+
+namespace simgrid::s4u {
+
+class XBT_PUBLIC Mess : public Activity_T<Mess> {
+#ifndef DOXYGEN
+  friend MessageQueue; // Factory of messages
+  friend kernel::activity::MessImpl;
+#endif
+  MessageQueue* queue_  = nullptr;
+  void* payload_        = nullptr;
+  size_t dst_buff_size_ = 0;
+  void* dst_buff_       = nullptr;
+  kernel::activity::ActivityImplPtr pimpl_;
+
+  Mess() = default;
+  Mess* do_start() override;
+
+  static xbt::signal<void(Mess const&)> on_send;
+  xbt::signal<void(Mess const&)> on_this_send;
+  static xbt::signal<void(Mess const&)> on_recv;
+  xbt::signal<void(Mess const&)> on_this_recv;
+
+  /* These ensure that the on_completion signals are really thrown */
+  void fire_on_completion_for_real() const { Activity_T<Mess>::fire_on_completion(); }
+  void fire_on_this_completion_for_real() const { Activity_T<Mess>::fire_on_this_completion(); }
+
+public:
+#ifndef DOXYGEN
+  Mess(Mess const&) = delete;
+  Mess& operator=(Mess const&) = delete;
+#endif
+
+  MessPtr set_queue(MessageQueue* queue);
+  MessageQueue* get_queue() const { return queue_; }
+
+  /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully)
+   * terminated */
+  void* get_payload() const { return payload_; }
+  MessPtr set_payload(void* data);
+  MessPtr set_dst_data(void** buff, size_t size);
+  Actor* get_sender() const;
+  Actor* get_receiver() const;
+
+  bool is_assigned() const override { return true; };
+
+  Mess* wait_for(double timeout) override;
+
+  kernel::actor::ActorImpl* sender_   = nullptr;
+  kernel::actor::ActorImpl* receiver_ = nullptr;
+};
+} // namespace simgrid::s4u
+
+#endif /* SIMGRID_S4U_MESS_HPP */
diff --git a/include/simgrid/s4u/MessageQueue.hpp b/include/simgrid/s4u/MessageQueue.hpp
new file mode 100644 (file)
index 0000000..dc00941
--- /dev/null
@@ -0,0 +1,112 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_S4U_MESSAGEQUEUE_HPP
+#define SIMGRID_S4U_MESSAGEQUEUE_HPP
+
+#include <simgrid/forward.h>
+#include <simgrid/s4u/Mess.hpp>
+#include <smpi/forward.hpp>
+
+#include <string>
+
+namespace simgrid::s4u {
+
+class XBT_PUBLIC MessageQueue {
+#ifndef DOXYGEN
+  friend Mess;
+  friend kernel::activity::MessageQueueImpl;
+#endif
+
+  kernel::activity::MessageQueueImpl* const pimpl_;
+
+  explicit MessageQueue(kernel::activity::MessageQueueImpl * mqueue) : pimpl_(mqueue) {}
+  ~MessageQueue() = default;
+
+protected:
+  kernel::activity::MessageQueueImpl* get_impl() const { return pimpl_; }
+
+public:
+  /** @brief Retrieves the name of that message queue as a C++ string */
+  const std::string& get_name() const;
+  /** @brief Retrieves the name of that message queue as a C string */
+  const char* get_cname() const;
+
+  /** \static Retrieve the message queye associated to the given name. Message queues are created on demand. */
+  static MessageQueue* by_name(const std::string& name);
+
+  /** Returns whether the message queue contains queued messages */
+  bool empty() const;
+
+  /* Returns the number of queued messages */
+  size_t size() const;
+
+  /** Gets the first element in the queue (without dequeuing it), or nullptr if none is there */
+  kernel::activity::MessImplPtr front() const;
+
+  /** Creates (but don't start) a data transmission to that message queue */
+  MessPtr put_init();
+  /** Creates (but don't start) a data transmission to that message queue.
+   *
+   * Please note that if you send a pointer to some data, you must ensure that your data remains live until
+   * consumption, or the receiver will get a pointer to a garbled memory area.
+   */
+  MessPtr put_init(void* payload);
+  /** Creates and start a data transmission to that mailbox.
+   *
+   * Please note that if you send a pointer to some data, you must ensure that your data remains live until
+   * consumption, or the receiver will get a pointer to a garbled memory area.
+   */
+  MessPtr put_async(void* payload);
+
+  /** Blocking data transmission.
+   *
+   * Please note that if you send a pointer to some data, you must ensure that your data remains live until
+   * consumption, or the receiver will get a pointer to a garbled memory area.
+   */
+  void put(void* payload);
+  /** Blocking data transmission with timeout */
+  void put(void* payload, double timeout);
+
+  /** Creates (but don't start) a data reception onto that message queue. */
+  MessPtr get_init();
+  /** Creates and start an async data reception to that message queue */
+  template <typename T> MessPtr get_async(T** data);
+  /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to
+   * use Mess::get_payload once the messaging operation terminates */
+  MessPtr get_async();
+
+  /** Blocking data reception */
+  template <typename T> T* get();
+  template <typename T> std::unique_ptr<T> get_unique() { return std::unique_ptr<T>(get<T>()); }
+
+  /** Blocking data reception with timeout */
+  template <typename T> T* get(double timeout);
+  template <typename T> std::unique_ptr<T> get_unique(double timeout) { return std::unique_ptr<T>(get<T>(timeout)); }
+};
+
+template <typename T> MessPtr MessageQueue::get_async(T** data)
+{
+  MessPtr res = get_init()->set_dst_data(reinterpret_cast<void**>(data), sizeof(void*));
+  res->start();
+  return res;
+}
+
+template <typename T> T* MessageQueue::get()
+{
+  T* res = nullptr;
+  get_async<T>(&res)->wait();
+  return res;
+}
+
+template <typename T> T* MessageQueue::get(double timeout)
+{
+  T* res = nullptr;
+  get_async<T>(&res)->wait_for(timeout);
+  return res;
+}
+} // namespace simgrid::s4u
+
+#endif /* SIMGRID_S4U_MESSAGEQUEUE_HPP */
index 9fa412e..59da112 100644 (file)
@@ -155,6 +155,9 @@ EngineImpl::~EngineImpl()
   for (auto const& [_, mailbox] : mailboxes_)
     delete mailbox;
 
+  for (auto const& [_, queue] : mqueues_)
+    delete queue;
+
   /* Kill all actors (but maestro) */
   maestro_->kill_all();
   run_all_actors();
index 4c25e54..149c0c2 100644 (file)
@@ -16,6 +16,7 @@
 #include "src/kernel/activity/ExecImpl.hpp"
 #include "src/kernel/activity/IoImpl.hpp"
 #include "src/kernel/activity/MailboxImpl.hpp"
+#include "src/kernel/activity/MessageQueueImpl.hpp"
 #include "src/kernel/activity/SleepImpl.hpp"
 #include "src/kernel/activity/Synchro.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
@@ -33,6 +34,7 @@ namespace simgrid::kernel {
 class EngineImpl {
   std::unordered_map<std::string, routing::NetPoint*> netpoints_;
   std::unordered_map<std::string, activity::MailboxImpl*> mailboxes_;
+  std::unordered_map<std::string, activity::MessageQueueImpl*> mqueues_;
 
   std::unordered_map<std::string, actor::ActorCodeFactory> registered_functions; // Maps function names to actor code
   actor::ActorCodeFactory default_function; // Function to use as a fallback when the provided name matches nothing
diff --git a/src/kernel/activity/MessImpl.cpp b/src/kernel/activity/MessImpl.cpp
new file mode 100644 (file)
index 0000000..9f77c7d
--- /dev/null
@@ -0,0 +1,174 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/Exception.hpp>
+#include <simgrid/s4u/Host.hpp>
+
+#include "src/kernel/EngineImpl.hpp"
+#include "src/kernel/activity/MessImpl.hpp"
+#include "src/kernel/activity/MessageQueueImpl.hpp"
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mess, kernel, "Kernel message synchronization");
+
+namespace simgrid::kernel::activity {
+
+MessImpl::~MessImpl()
+{
+  if (queue_)
+    queue_->remove(this);
+}
+
+MessImpl& MessImpl::set_type(MessImplType type)
+{
+  type_ = type;
+  return *this;
+}
+
+MessImpl& MessImpl::set_queue(MessageQueueImpl* queue)
+{
+  queue_ = queue;
+  return *this;
+}
+
+MessImpl& MessImpl::set_payload(void* payload)
+{
+  payload_ = payload;
+  return *this;
+}
+
+MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size)
+{
+  dst_buff_      = buff;
+  dst_buff_size_ = size;
+  return *this;
+}
+
+MessImpl* MessImpl::start()
+{
+  if (get_state() == State::READY)
+    set_state(State::DONE);
+  return this;
+}
+
+ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer)
+{
+  auto* queue = observer->get_queue();
+  XBT_DEBUG("put from message queue %p", queue);
+
+  /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */
+  MessImplPtr this_mess(new MessImpl());
+  this_mess->set_type(MessImplType::PUT);
+
+  /* Look for message synchro matching our needs.
+   *
+   * If it is not found then push our communication into the rendez-vous point */
+  MessImplPtr other_mess = queue->find_matching_message(MessImplType::GET);
+
+  if (not other_mess) {
+    other_mess = std::move(this_mess);
+    queue->push(other_mess);
+  } else {
+    XBT_DEBUG("Get already pushed");
+    other_mess->set_state(State::READY);
+  }
+
+  observer->set_message(other_mess.get());
+  observer->get_issuer()->activities_.insert(other_mess);
+
+  /* Setup synchro */
+  other_mess->src_actor_ = observer->get_issuer();
+  other_mess->payload_ = observer->get_payload();
+  other_mess->start();
+
+  return other_mess;
+}
+
+ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer)
+{
+  MessImplPtr this_synchro(new MessImpl());
+  this_synchro->set_type(MessImplType::GET);
+
+  auto* queue = observer->get_queue();
+  XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_synchro.get());
+
+  MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT);
+
+  if (other_mess == nullptr) {
+    XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size());
+    other_mess = std::move(this_synchro);
+    queue->push(other_mess);
+  } else {
+    XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_mess.get());
+
+    other_mess->set_state(State::READY);
+  }
+
+  observer->get_issuer()->activities_.insert(other_mess);
+  observer->set_message(other_mess.get());
+
+  /* Setup synchro */
+  other_mess->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size());
+  other_mess->dst_actor_ = observer->get_issuer();
+
+  other_mess->start();
+
+  return other_mess;
+}
+
+void MessImpl::finish()
+{
+  XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
+            src_actor_.get(), dst_actor_.get());
+
+  if (get_iface()) {
+    const auto& piface = static_cast<const s4u::Mess&>(*get_iface());
+    set_iface(nullptr); // reset iface to protect against multiple trigger of the on_completion signals
+    piface.fire_on_completion_for_real();
+    piface.fire_on_this_completion_for_real();
+  }
+
+  /* Update synchro state */
+  if (get_state() == State::RUNNING) {
+    set_state(State::DONE);
+  }
+
+  /* If the synchro is still in a rendez-vous point then remove from it */
+  if (queue_)
+    queue_->remove(this);
+
+  if (get_state() == State::DONE && payload_ != nullptr)
+    *(void**)(dst_buff_) = payload_;
+
+  while (not simcalls_.empty()) {
+    actor::Simcall* simcall = simcalls_.front();
+    simcalls_.pop_front();
+
+    /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+     * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+     * simcall */
+
+    if (simcall->call_ == actor::Simcall::Type::NONE) // FIXME: maybe a better way to handle this case
+      continue;                                       // if actor handling comm is killed
+
+    handle_activity_waitany(simcall);
+
+    /* Check out for errors */
+
+    if (not simcall->issuer_->get_host()->is_on()) {
+      simcall->issuer_->set_wannadie();
+    } else {
+      // Do not answer to dying actors
+      if (not simcall->issuer_->wannadie()) {
+        set_exception(simcall->issuer_);
+        simcall->issuer_->simcall_answer();
+      }
+    }
+
+    simcall->issuer_->waiting_synchro_ = nullptr;
+    simcall->issuer_->activities_.erase(this);
+  }
+}
+
+} // namespace simgrid::kernel::activity
diff --git a/src/kernel/activity/MessImpl.hpp b/src/kernel/activity/MessImpl.hpp
new file mode 100644 (file)
index 0000000..1232c61
--- /dev/null
@@ -0,0 +1,48 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_KERNEL_ACTIVITY_MESS_HPP
+#define SIMGRID_KERNEL_ACTIVITY_MESS_HPP
+
+#include "src/kernel/activity/ActivityImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/CommObserver.hpp"
+
+namespace simgrid::kernel::activity {
+
+enum class MessImplType { PUT, GET };
+
+class XBT_PUBLIC MessImpl : public ActivityImpl_T<MessImpl> {
+  ~MessImpl() override;
+
+  MessageQueueImpl* queue_ = nullptr;
+  void* payload_           = nullptr;
+  MessImplType type_       = MessImplType::PUT;
+  unsigned char* dst_buff_ = nullptr;
+  size_t* dst_buff_size_   = nullptr;
+
+public:
+  MessImpl& set_type(MessImplType type);
+  MessImplType get_type() const { return type_; }
+  MessImpl& set_payload(void* payload);
+  void* get_payload() { return payload_; }
+
+  MessImpl& set_queue(MessageQueueImpl* queue);
+  MessageQueueImpl* get_queue() const { return queue_; }
+  MessImpl& set_dst_buff(unsigned char* buff, size_t* size);
+
+  static ActivityImplPtr iput(actor::MessIputSimcall* observer);
+  static ActivityImplPtr iget(actor::MessIgetSimcall* observer);
+
+  MessImpl* start();
+  void set_exception(actor::ActorImpl* issuer) override {};
+  void finish() override;
+
+  actor::ActorImplPtr src_actor_ = nullptr;
+  actor::ActorImplPtr dst_actor_ = nullptr;
+};
+} // namespace simgrid::kernel::activity
+
+#endif
diff --git a/src/kernel/activity/MessageQueueImpl.cpp b/src/kernel/activity/MessageQueueImpl.cpp
new file mode 100644 (file)
index 0000000..714f86b
--- /dev/null
@@ -0,0 +1,54 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "src/kernel/activity/MessageQueueImpl.hpp"
+
+#include <unordered_map>
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mq, kernel, "Message queue implementation");
+
+namespace simgrid::kernel::activity {
+
+unsigned MessageQueueImpl::next_id_ = 0;
+
+void MessageQueueImpl::push(const MessImplPtr& mess)
+{
+  mess->set_queue(this);
+  this->queue_.push_back(std::move(mess));
+}
+
+void MessageQueueImpl::remove(const MessImplPtr& mess)
+{
+  xbt_assert(mess->get_queue() == this, "Message %p is in queue %s, not queue %s", mess.get(),
+             (mess->get_queue() ? mess->get_queue()->get_cname() : "(null)"), get_cname());
+
+  mess->set_queue(nullptr);
+  auto it = std::find(queue_.begin(), queue_.end(), mess);
+  if (it != queue_.end())
+    queue_.erase(it);
+  else
+    xbt_die("Message %p not found in queue %s", mess.get(), get_cname());
+}
+
+MessImplPtr MessageQueueImpl::find_matching_message(MessImplType type)
+{
+  auto iter = std::find_if(queue_.begin(), queue_.end(), [&type](const MessImplPtr& mess)
+  {
+    return (mess->get_type() == type);
+  });
+  if (iter == queue_.end()) {
+    XBT_DEBUG("No matching message synchro found");
+    return nullptr;
+  }
+
+  const MessImplPtr& mess = *iter;
+  XBT_DEBUG("Found a matching message synchro %p", mess.get());
+  mess->set_queue(nullptr);
+  MessImplPtr mess_cpy = mess;
+  queue_.erase(iter);
+  return mess_cpy;
+}
+
+} // namespace simgrid::kernel::activity
diff --git a/src/kernel/activity/MessageQueueImpl.hpp b/src/kernel/activity/MessageQueueImpl.hpp
new file mode 100644 (file)
index 0000000..4bc010f
--- /dev/null
@@ -0,0 +1,52 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#ifndef SIMGRID_KERNEL_ACTIVITY_MESSAGEQUEUE_HPP
+#define SIMGRID_KERNEL_ACTIVITY_MESSAGEQUEUE_HPP
+
+#include "simgrid/s4u/Engine.hpp"
+#include "simgrid/s4u/MessageQueue.hpp"
+#include "src/kernel/activity/MessImpl.hpp"
+
+namespace simgrid::kernel::activity {
+
+/** @brief Implementation of the s4u::MessageQueue */
+
+class MessageQueueImpl {
+  s4u::MessageQueue piface_;
+  std::string name_;
+  std::deque<MessImplPtr> queue_;
+
+  friend s4u::Engine;
+  friend s4u::MessageQueue;
+  friend s4u::MessageQueue* s4u::Engine::message_queue_by_name_or_create(const std::string& name) const;
+  friend s4u::MessageQueue* s4u::MessageQueue::by_name(const std::string& name);
+
+  static unsigned next_id_; // Next ID to be given
+  const unsigned id_ = next_id_++;
+  explicit MessageQueueImpl(const std::string& name) : piface_(this), name_(name) {}
+  MessageQueueImpl(const MailboxImpl&) = delete;
+  MessageQueueImpl& operator=(const MailboxImpl&) = delete;
+
+public:
+  /** @brief Public interface */
+  unsigned get_id() const { return id_; }
+
+  const s4u::MessageQueue* get_iface() const { return &piface_; }
+  s4u::MessageQueue* get_iface() { return &piface_; }
+
+  const std::string& get_name() const { return name_; }
+  const char* get_cname() const { return name_.c_str(); }
+  void push(const MessImplPtr& mess);
+  void remove(const MessImplPtr& mess);
+  bool empty() const { return queue_.empty(); }
+  size_t size() const { return queue_.size(); }
+  const MessImplPtr& front() const { return queue_.front(); }
+
+  MessImplPtr find_matching_message(MessImplType type);
+};
+} // namespace simgrid::kernel::activity
+
+#endif
index 6f13e3e..7aa715d 100644 (file)
@@ -6,6 +6,7 @@
 #include "simgrid/s4u/Host.hpp"
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/activity/MailboxImpl.hpp"
+#include "src/kernel/activity/MessageQueueImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/kernel/actor/SimcallObserver.hpp"
 #include "src/mc/mc_config.hpp"
@@ -233,10 +234,35 @@ void CommIrecvSimcall::serialize(std::stringstream& stream) const
   XBT_DEBUG("RecvObserver comm:%p mbox:%u tag:%d", comm_, mbox_->get_id(), tag_);
   stream << ' ' << fun_call_;
 }
+
 std::string CommIrecvSimcall::to_string() const
 {
   return "CommAsyncRecv(comm_id: " + std::to_string(comm_->get_id()) + " mbox:" + std::to_string(mbox_->get_id()) +
          " tag: " + std::to_string(tag_) + ")";
 }
 
+void MessIputSimcall::serialize(std::stringstream& stream) const
+{
+  stream << mess_  << ' ' << queue_;
+  XBT_DEBUG("PutObserver mess:%p queue:%p", mess_, queue_);
+}
+
+std::string MessIputSimcall::to_string() const
+{
+  return "MessAsyncPut(queue:" + queue_->get_name() + ")";
+}
+
+void MessIgetSimcall::serialize(std::stringstream& stream) const
+{
+  stream << mess_ << ' ' << queue_;
+  XBT_DEBUG("GettObserver mess:%p queue:%p", mess_, queue_);
+}
+
+std::string MessIgetSimcall::to_string() const
+{
+  return "MessAsyncGet(queue:" + queue_->get_name() + ")";
+}
+
+
+
 } // namespace simgrid::kernel::actor
index ab03651..161e095 100644 (file)
@@ -186,6 +186,52 @@ public:
   auto const& get_copy_data_fun() const { return copy_data_fun_; }
 };
 
+class MessIputSimcall final : public SimcallObserver {
+  activity::MessageQueueImpl* queue_;
+  void* payload_;
+  activity::MessImpl* mess_ = {};
+
+public:
+  MessIputSimcall(
+      ActorImpl* actor, activity::MessageQueueImpl* queue, void* payload)
+      : SimcallObserver(actor)
+      , queue_(queue)
+      , payload_(payload)
+  {
+  }
+  void serialize(std::stringstream& stream) const override;
+  std::string to_string() const override;
+  activity::MessageQueueImpl* get_queue() const { return queue_; }
+  void* get_payload() const { return payload_; }
+  void set_message(activity::MessImpl* mess) { mess_ = mess; }
+};
+
+class MessIgetSimcall final : public SimcallObserver {
+  activity::MessageQueueImpl* queue_;
+  unsigned char* dst_buff_;
+  size_t* dst_buff_size_;
+  void* payload_;
+  activity::MessImpl* mess_ = {};
+
+public:
+  MessIgetSimcall(ActorImpl* actor, activity::MessageQueueImpl* queue, unsigned char* dst_buff, size_t* dst_buff_size,
+                  void* payload)
+      : SimcallObserver(actor)
+      , queue_(queue)
+      , dst_buff_(dst_buff)
+      , dst_buff_size_(dst_buff_size)
+      , payload_(payload)
+  {
+  }
+  void serialize(std::stringstream& stream) const override;
+  std::string to_string() const override;
+  activity::MessageQueueImpl* get_queue() const { return queue_; }
+  unsigned char* get_dst_buff() const { return dst_buff_; }
+  size_t* get_dst_buff_size() const { return dst_buff_size_; }
+  void* get_payload() const { return payload_; }
+  void set_message(activity::MessImpl* mess) { mess_ = mess; }
+};
+
 } // namespace simgrid::kernel::actor
 
 #endif
index dc7e77f..094e831 100644 (file)
@@ -400,6 +400,20 @@ Mailbox* Engine::mailbox_by_name_or_create(const std::string& name) const
   return mbox->get_iface();
 }
 
+MessageQueue* Engine::message_queue_by_name_or_create(const std::string& name) const
+{
+  /* two actors may have pushed the same mbox_create simcall at the same time */
+  kernel::activity::MessageQueueImpl* queue = kernel::actor::simcall_answered([&name, this] {
+    auto [m, inserted] = pimpl_->mqueues_.try_emplace(name, nullptr);
+    if (inserted) {
+      m->second = new kernel::activity::MessageQueueImpl(name);
+      XBT_DEBUG("Creating a message queue at %p with name %s", m->second, name.c_str());
+    }
+    return m->second;
+  });
+  return queue->get_iface();
+}
+
 /** @brief Returns the amount of links in the platform */
 size_t Engine::get_link_count() const
 {
diff --git a/src/s4u/s4u_Mess.cpp b/src/s4u/s4u_Mess.cpp
new file mode 100644 (file)
index 0000000..b16b387
--- /dev/null
@@ -0,0 +1,153 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <cmath>
+#include <simgrid/Exception.hpp>
+#include <simgrid/s4u/ActivitySet.hpp>
+#include <simgrid/s4u/Mess.hpp>
+#include <simgrid/s4u/Engine.hpp>
+#include <simgrid/s4u/MessageQueue.hpp>
+
+#include "src/kernel/activity/MessImpl.hpp"
+#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
+
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mess, s4u_activity, "S4U asynchronous messaging");
+
+namespace simgrid::s4u {
+xbt::signal<void(Mess const&)> Mess::on_send;
+xbt::signal<void(Mess const&)> Mess::on_recv;
+
+MessPtr Mess::set_queue(MessageQueue* queue)
+{
+  queue_ = queue;
+  return this;
+}
+
+MessPtr Mess::set_payload(void* payload)
+{
+  payload_ = payload;
+  return this;
+}
+
+MessPtr Mess::set_dst_data(void** buff, size_t size)
+{
+  xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
+             __FUNCTION__);
+
+  dst_buff_      = buff;
+  dst_buff_size_ = size;
+  return this;
+}
+
+Actor* Mess::get_sender() const
+{
+  kernel::actor::ActorImplPtr sender = nullptr;
+  if (pimpl_)
+    sender = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->src_actor_;
+  return sender ? sender->get_ciface() : nullptr;
+}
+
+Actor* Mess::get_receiver() const
+{
+  kernel::actor::ActorImplPtr receiver = nullptr;
+  if (pimpl_)
+    receiver = boost::static_pointer_cast<kernel::activity::MessImpl>(pimpl_)->dst_actor_;
+  return receiver ? receiver->get_ciface() : nullptr;
+}
+
+Mess* Mess::do_start()
+{
+  xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
+             "You cannot use %s() once your message exchange has started (not implemented)", __FUNCTION__);
+
+  auto myself = kernel::actor::ActorImpl::self();
+  if (myself == sender_) {
+    on_send(*this);
+    on_this_send(*this);
+    kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
+    pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
+                                             &observer);
+  } else if (myself == receiver_) {
+    on_recv(*this);
+    on_this_recv(*this);
+    kernel::actor::MessIgetSimcall observer{receiver_,
+                                            queue_->get_impl(),
+                                            static_cast<unsigned char*>(dst_buff_),
+                                            &dst_buff_size_,
+                                            get_payload()};
+    pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
+                                             &observer);
+  } else {
+    xbt_die("Cannot start a message exchange before specifying whether we are the sender or the receiver");
+  }
+
+  pimpl_->set_iface(this);
+  pimpl_->set_actor(sender_);
+  // Only throw the signal when both sides are here and the status is READY
+  if (pimpl_->get_state() != kernel::activity::State::WAITING) {
+      fire_on_start();
+      fire_on_this_start();
+  }
+  state_ = State::STARTED;
+  return this;
+}
+
+Mess* Mess::wait_for(double timeout)
+{
+  XBT_DEBUG("Calling Mess::wait_for with state %s", get_state_str());
+  kernel::actor::ActorImpl* issuer = nullptr;
+  switch (state_) {
+    case State::FINISHED:
+      break;
+    case State::FAILED:
+      throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
+    case State::INITED:
+    case State::STARTING:
+      if (get_payload() != nullptr) {
+        on_send(*this);
+        on_this_send(*this);
+        kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()};
+        pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); },
+                                                 &observer);
+      } else { // Receiver
+        on_recv(*this);
+        on_this_recv(*this);
+        kernel::actor::MessIgetSimcall observer{receiver_,
+                                                queue_->get_impl(),
+                                                static_cast<unsigned char*>(dst_buff_),
+                                                &dst_buff_size_,
+                                                get_payload()};
+        pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); },
+                                                 &observer);
+      }
+      break;
+    case State::STARTED:
+      try {
+        issuer = kernel::actor::ActorImpl::self();
+        kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"};
+        if (kernel::actor::simcall_blocking(
+                [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+                &observer)) {
+          throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+        }
+      } catch (const NetworkFailureException& e) {
+        issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
+        complete(State::FAILED);
+        e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
+      }
+      break;
+
+    case State::CANCELED:
+      throw CancelException(XBT_THROW_POINT, "Message canceled");
+
+    default:
+      THROW_IMPOSSIBLE;
+  }
+  complete(State::FINISHED);
+  return this;
+}
+
+} // namespace simgrid::s4u
diff --git a/src/s4u/s4u_MessageQueue.cpp b/src/s4u/s4u_MessageQueue.cpp
new file mode 100644 (file)
index 0000000..8fe34ac
--- /dev/null
@@ -0,0 +1,98 @@
+/* Copyright (c) 2023. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include <simgrid/s4u/Engine.hpp>
+#include <simgrid/s4u/Mess.hpp>
+#include <simgrid/s4u/MessageQueue.hpp>
+
+#include "src/kernel/activity/MessageQueueImpl.hpp"
+
+XBT_LOG_EXTERNAL_CATEGORY(s4u);
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mqueue, s4u, "S4U Message Queues");
+
+namespace simgrid::s4u {
+
+const std::string& MessageQueue::get_name() const
+{
+  return pimpl_->get_name();
+}
+
+const char* MessageQueue::get_cname() const
+{
+  return pimpl_->get_cname();
+}
+
+MessageQueue* MessageQueue::by_name(const std::string& name)
+{
+  return Engine::get_instance()->message_queue_by_name_or_create(name);
+}
+
+bool MessageQueue::empty() const
+{
+  return pimpl_->empty();
+}
+
+size_t MessageQueue::size() const
+{
+  return pimpl_->size();
+}
+
+kernel::activity::MessImplPtr MessageQueue::front() const
+{
+  return pimpl_->empty() ? nullptr : pimpl_->front();
+}
+
+MessPtr MessageQueue::put_init()
+{
+  MessPtr res(new Mess());
+  res->set_queue(this);
+  res->sender_ = kernel::actor::ActorImpl::self();
+  return res;
+}
+
+MessPtr MessageQueue::put_init(void* payload)
+{
+  return put_init()->set_payload(payload);
+}
+
+MessPtr MessageQueue::put_async(void* payload)
+{
+  xbt_assert(payload != nullptr, "You cannot send nullptr");
+  MessPtr res = put_init(payload);
+  res->start();
+  return res;
+}
+
+void MessageQueue::put(void* payload)
+{
+  xbt_assert(payload != nullptr, "You cannot send nullptr");
+
+  put_async(payload)->wait();
+}
+
+/** Blocking send with timeout */
+void MessageQueue::put(void* payload, double timeout)
+{
+  xbt_assert(payload != nullptr, "You cannot send nullptr");
+
+  put_init()->set_payload(payload)->start()->wait_for(timeout);
+}
+
+MessPtr MessageQueue::get_init()
+{
+  MessPtr res(new Mess());
+  res->set_queue(this);
+  res->receiver_ = kernel::actor::ActorImpl::self();
+  return res;
+}
+
+MessPtr MessageQueue::get_async()
+{
+  MessPtr res = get_init()->set_payload(nullptr);
+  res->start();
+  return res;
+}
+
+} // namespace simgrid::s4u
index cee3e25..e4ce5a2 100644 (file)
@@ -322,6 +322,10 @@ set(KERNEL_SRC
   src/kernel/activity/IoImpl.hpp
   src/kernel/activity/MailboxImpl.cpp
   src/kernel/activity/MailboxImpl.hpp
+  src/kernel/activity/MessImpl.cpp
+  src/kernel/activity/MessImpl.hpp
+  src/kernel/activity/MessageQueueImpl.cpp
+  src/kernel/activity/MessageQueueImpl.hpp
   src/kernel/activity/MutexImpl.cpp
   src/kernel/activity/MutexImpl.hpp
   src/kernel/activity/SemaphoreImpl.cpp
@@ -472,6 +476,8 @@ set(S4U_SRC
   src/s4u/s4u_Io.cpp
   src/s4u/s4u_Link.cpp
   src/s4u/s4u_Mailbox.cpp
+  src/s4u/s4u_Mess.cpp
+  src/s4u/s4u_MessageQueue.cpp
   src/s4u/s4u_Mutex.cpp
   src/s4u/s4u_Netzone.cpp
   src/s4u/s4u_Semaphore.cpp
@@ -695,6 +701,7 @@ set(headers_to_install
   include/simgrid/s4u/Io.hpp
   include/simgrid/s4u/Link.hpp
   include/simgrid/s4u/Mailbox.hpp
+  include/simgrid/s4u/MessageQueue.hpp
   include/simgrid/s4u/Mutex.hpp
   include/simgrid/s4u/NetZone.hpp
   include/simgrid/s4u/Semaphore.hpp