From 6e0d0f6c1580aceb13c060bbf95cbe9000cf6bbd Mon Sep 17 00:00:00 2001 From: Fred Suter Date: Thu, 19 Oct 2023 16:19:45 -0400 Subject: [PATCH] add Message queue abstraction --- MANIFEST.in | 13 +- examples/cpp/CMakeLists.txt | 1 + examples/cpp/mess-wait/s4u-mess-wait.cpp | 79 ++++++++++ examples/cpp/mess-wait/s4u-mess-wait.tesh | 12 ++ include/simgrid/forward.h | 15 ++ include/simgrid/s4u.hpp | 2 + include/simgrid/s4u/Activity.hpp | 1 + include/simgrid/s4u/Engine.hpp | 1 + include/simgrid/s4u/Mess.hpp | 66 ++++++++ include/simgrid/s4u/MessageQueue.hpp | 112 ++++++++++++++ src/kernel/EngineImpl.cpp | 3 + src/kernel/EngineImpl.hpp | 2 + src/kernel/activity/MessImpl.cpp | 174 ++++++++++++++++++++++ src/kernel/activity/MessImpl.hpp | 48 ++++++ src/kernel/activity/MessageQueueImpl.cpp | 54 +++++++ src/kernel/activity/MessageQueueImpl.hpp | 52 +++++++ src/kernel/actor/CommObserver.cpp | 26 ++++ src/kernel/actor/CommObserver.hpp | 46 ++++++ src/s4u/s4u_Engine.cpp | 14 ++ src/s4u/s4u_Mess.cpp | 153 +++++++++++++++++++ src/s4u/s4u_MessageQueue.cpp | 98 ++++++++++++ tools/cmake/DefinePackages.cmake | 7 + 22 files changed, 977 insertions(+), 2 deletions(-) create mode 100644 examples/cpp/mess-wait/s4u-mess-wait.cpp create mode 100644 examples/cpp/mess-wait/s4u-mess-wait.tesh create mode 100644 include/simgrid/s4u/Mess.hpp create mode 100644 include/simgrid/s4u/MessageQueue.hpp create mode 100644 src/kernel/activity/MessImpl.cpp create mode 100644 src/kernel/activity/MessImpl.hpp create mode 100644 src/kernel/activity/MessageQueueImpl.cpp create mode 100644 src/kernel/activity/MessageQueueImpl.hpp create mode 100644 src/s4u/s4u_Mess.cpp create mode 100644 src/s4u/s4u_MessageQueue.cpp diff --git a/MANIFEST.in b/MANIFEST.in index 1b8ad273d6..2a24cda650 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -330,6 +330,8 @@ include examples/cpp/mc-failing-assert/s4u-mc-failing-assert-nodpor.tesh include examples/cpp/mc-failing-assert/s4u-mc-failing-assert-statequality.tesh include examples/cpp/mc-failing-assert/s4u-mc-failing-assert.cpp include examples/cpp/mc-failing-assert/s4u-mc-failing-assert.tesh +include examples/cpp/mess-wait/s4u-mess-wait.cpp +include examples/cpp/mess-wait/s4u-mess-wait.tesh include examples/cpp/network-factors/s4u-network-factors.cpp include examples/cpp/network-factors/s4u-network-factors.tesh include examples/cpp/network-nonlinear/s4u-network-nonlinear.cpp @@ -348,8 +350,6 @@ include examples/cpp/network-ns3/s4u-network-ns3-timed.tesh include examples/cpp/network-ns3/s4u-network-ns3.cpp include examples/cpp/network-wifi/s4u-network-wifi.cpp include examples/cpp/network-wifi/s4u-network-wifi.tesh -include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp -include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.cpp include examples/cpp/platform-comm-serialize/s4u-platform-comm-serialize.tesh include examples/cpp/platform-failures/s4u-platform-failures.cpp @@ -380,6 +380,8 @@ include examples/cpp/replay-io/s4u-replay-io.txt include examples/cpp/replay-io/s4u-replay-io_d.xml include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.cpp include examples/cpp/routing-get-clusters/s4u-routing-get-clusters.tesh +include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.cpp +include examples/cpp/solar-panel-simple/s4u-solar-panel-simple.tesh include examples/cpp/synchro-barrier/s4u-mc-synchro-barrier.tesh include examples/cpp/synchro-barrier/s4u-synchro-barrier.cpp include examples/cpp/synchro-barrier/s4u-synchro-barrier.tesh @@ -1966,6 +1968,7 @@ include include/simgrid/s4u/Host.hpp include include/simgrid/s4u/Io.hpp include include/simgrid/s4u/Link.hpp include include/simgrid/s4u/Mailbox.hpp +include include/simgrid/s4u/MessageQueue.hpp include include/simgrid/s4u/Mutex.hpp include include/simgrid/s4u/NetZone.hpp include include/simgrid/s4u/Semaphore.hpp @@ -2062,6 +2065,10 @@ include src/kernel/activity/IoImpl.cpp include src/kernel/activity/IoImpl.hpp include src/kernel/activity/MailboxImpl.cpp include src/kernel/activity/MailboxImpl.hpp +include src/kernel/activity/MessImpl.cpp +include src/kernel/activity/MessImpl.hpp +include src/kernel/activity/MessageQueueImpl.cpp +include src/kernel/activity/MessageQueueImpl.hpp include src/kernel/activity/MutexImpl.cpp include src/kernel/activity/MutexImpl.hpp include src/kernel/activity/SemaphoreImpl.cpp @@ -2346,6 +2353,8 @@ include src/s4u/s4u_Host.cpp include src/s4u/s4u_Io.cpp include src/s4u/s4u_Link.cpp include src/s4u/s4u_Mailbox.cpp +include src/s4u/s4u_Mess.cpp +include src/s4u/s4u_MessageQueue.cpp include src/s4u/s4u_Mutex.cpp include src/s4u/s4u_Netzone.cpp include src/s4u/s4u_Semaphore.cpp diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index b917540cf1..7de7809a0e 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -170,6 +170,7 @@ foreach (example activityset-testany activityset-waitany activityset-waitall act exec-ptask-multicore exec-ptask-multicore-latency exec-cpu-nonlinear exec-cpu-factors exec-failure exec-threads maestro-set mc-bugged1 mc-bugged1-liveness mc-bugged2 mc-bugged2-liveness mc-centralized-mutex mc-electric-fence mc-failing-assert + mess-wait network-ns3 network-ns3-wifi network-wifi io-async io-priority io-degradation io-file-system io-file-remote io-disk-raw io-dependent task-dispatch task-io task-microservice task-parallelism task-simple task-storm task-switch-host task-variable-load diff --git a/examples/cpp/mess-wait/s4u-mess-wait.cpp b/examples/cpp/mess-wait/s4u-mess-wait.cpp new file mode 100644 index 0000000000..0e20a61ba2 --- /dev/null +++ b/examples/cpp/mess-wait/s4u-mess-wait.cpp @@ -0,0 +1,79 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +/* This example shows how to use simgrid::s4u::this_actor::wait() to wait for a given communication. + * + * As for the other asynchronous examples, the sender initiate all the messages it wants to send and + * pack the resulting simgrid::s4u::CommPtr objects in a vector. All messages thus occurs concurrently. + * + * The sender then loops until there is no ongoing communication. + */ + +#include "simgrid/s4u.hpp" +#include +#include +#include +namespace sg4 = simgrid::s4u; + +XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_mess_wait, "Messages specific for this s4u example"); + +static void sender(int messages_count) +{ + sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("control"); + + sg4::this_actor::sleep_for(0.5); + + for (int i = 0; i < messages_count; i++) { + std::string msg_content = "Message " + std::to_string(i); + // Copy the data we send: the 'msg_content' variable is not a stable storage location. + // It will be destroyed when this actor leaves the loop, ie before the receiver gets the data + auto* payload = new std::string(msg_content); + + /* Create a control message and put it in the message queue */ + sg4::MessPtr mess = mqueue->put_async(payload); + XBT_INFO("Send '%s' to '%s'", msg_content.c_str(), mqueue->get_cname()); + mess->wait(); + } + + /* Send message to let the receiver know that it should stop */ + XBT_INFO("Send 'finalize' to 'receiver'"); + mqueue->put(new std::string("finalize"), 0); +} + +/* Receiver actor expects 1 argument: its ID */ +static void receiver() +{ + sg4::MessageQueue* mqueue = sg4::MessageQueue::by_name("control"); + + sg4::this_actor::sleep_for(1); + + XBT_INFO("Wait for my first message"); + for (bool cont = true; cont;) { + std::string* received; + sg4::MessPtr mess = mqueue->get_async(&received); + + sg4::this_actor::sleep_for(0.1); + mess->wait(); + + XBT_INFO("I got a '%s'.", received->c_str()); + if (*received == "finalize") + cont = false; // If it's a finalize message, we're done. + delete received; + } +} + +int main(int argc, char* argv[]) +{ + sg4::Engine e(&argc, argv); + + e.load_platform(argv[1]); + + sg4::Actor::create("sender", e.host_by_name("Tremblay"), sender, 3); + sg4::Actor::create("receiver", e.host_by_name("Fafard"), receiver); + + e.run(); + + return 0; +} diff --git a/examples/cpp/mess-wait/s4u-mess-wait.tesh b/examples/cpp/mess-wait/s4u-mess-wait.tesh new file mode 100644 index 0000000000..3f62420b28 --- /dev/null +++ b/examples/cpp/mess-wait/s4u-mess-wait.tesh @@ -0,0 +1,12 @@ +#!/usr/bin/env tesh + +$ ${bindir:=.}/s4u-mess-wait ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n" +> [ 0.500000] (1:sender@Tremblay) Send 'Message 0' to 'control' +> [ 1.000000] (2:receiver@Fafard) Wait for my first message +> [ 1.100000] (2:receiver@Fafard) I got a 'Message 0'. +> [ 1.100000] (1:sender@Tremblay) Send 'Message 1' to 'control' +> [ 1.100000] (1:sender@Tremblay) Send 'Message 2' to 'control' +> [ 1.200000] (2:receiver@Fafard) I got a 'Message 1'. +> [ 1.300000] (1:sender@Tremblay) Send 'finalize' to 'receiver' +> [ 1.300000] (2:receiver@Fafard) I got a 'Message 2'. +> [ 1.400000] (2:receiver@Fafard) I got a 'finalize'. \ No newline at end of file diff --git a/include/simgrid/forward.h b/include/simgrid/forward.h index 2c1da4f7f1..ff408d4765 100644 --- a/include/simgrid/forward.h +++ b/include/simgrid/forward.h @@ -76,6 +76,14 @@ class SplitDuplexLink; class Mailbox; +class Mess; +/** Smart pointer to a simgrid::s4u::Mess */ +using MessPtr = boost::intrusive_ptr; +XBT_PUBLIC void intrusive_ptr_release(Mess* c); +XBT_PUBLIC void intrusive_ptr_add_ref(Mess* c); + +class MessageQueue; + class Mutex; XBT_PUBLIC void intrusive_ptr_release(const Mutex* m); XBT_PUBLIC void intrusive_ptr_add_ref(const Mutex* m); @@ -152,6 +160,8 @@ namespace activity { using ExecImplPtr = boost::intrusive_ptr; class IoImpl; using IoImplPtr = boost::intrusive_ptr; + class MessImpl; + using MessImplPtr = boost::intrusive_ptr; class MutexImpl; using MutexImplPtr = boost::intrusive_ptr; class MutexAcquisitionImpl; @@ -170,6 +180,7 @@ namespace activity { using SleepImplPtr = boost::intrusive_ptr; class MailboxImpl; + class MessageQueueImpl; } namespace context { class Context; @@ -232,6 +243,7 @@ using s4u_Link = simgrid::s4u::Link; using s4u_File = simgrid::s4u::File; using s4u_ConditionVariable = simgrid::s4u::ConditionVariable; using s4u_Mailbox = simgrid::s4u::Mailbox; +using s4u_MessageQueue = simgrid::s4u::MessageQueue; using s4u_Mutex = simgrid::s4u::Mutex; using s4u_Semaphore = simgrid::s4u::Semaphore; using s4u_Disk = simgrid::s4u::Disk; @@ -252,6 +264,7 @@ typedef struct s4u_Link s4u_Link; typedef struct s4u_File s4u_File; typedef struct s4u_ConditionVariable s4u_ConditionVariable; typedef struct s4u_Mailbox s4u_Mailbox; +typedef struct s4u_MessageQueue s4u_MessageQueue; typedef struct s4u_Mutex s4u_Mutex; typedef struct s4u_Semaphore s4u_Semaphore; typedef struct s4u_Disk s4u_Disk; @@ -271,6 +284,8 @@ typedef s4u_ConditionVariable* sg_cond_t; typedef const s4u_ConditionVariable* const_sg_cond_t; typedef s4u_Mailbox* sg_mailbox_t; typedef const s4u_Mailbox* const_sg_mailbox_t; +typedef s4u_MessageQueue* sg_messagequeue_t; +typedef const s4u_MessageQueue* const_sg_messagequeue_t; typedef s4u_Mutex* sg_mutex_t; typedef const s4u_Mutex* const_sg_mutex_t; typedef s4u_Semaphore* sg_sem_t; diff --git a/include/simgrid/s4u.hpp b/include/simgrid/s4u.hpp index b2853229c2..8ddbf1aac3 100644 --- a/include/simgrid/s4u.hpp +++ b/include/simgrid/s4u.hpp @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include #include #include diff --git a/include/simgrid/s4u/Activity.hpp b/include/simgrid/s4u/Activity.hpp index ef51321b4b..8b9f48508d 100644 --- a/include/simgrid/s4u/Activity.hpp +++ b/include/simgrid/s4u/Activity.hpp @@ -38,6 +38,7 @@ class XBT_PUBLIC Activity : public xbt::Extendable { friend Comm; friend Exec; friend Io; + friend Mess; friend kernel::activity::ActivityImpl; friend std::vector create_DAG_from_dot(const std::string& filename); friend std::vector create_DAG_from_DAX(const std::string& filename); diff --git a/include/simgrid/s4u/Engine.hpp b/include/simgrid/s4u/Engine.hpp index d92987dfa5..fbef8cccc9 100644 --- a/include/simgrid/s4u/Engine.hpp +++ b/include/simgrid/s4u/Engine.hpp @@ -155,6 +155,7 @@ public: Link* link_by_name_or_null(const std::string& name) const; Mailbox* mailbox_by_name_or_create(const std::string& name) const; + MessageQueue* message_queue_by_name_or_create(const std::string& name) const; size_t get_actor_count() const; std::vector get_all_actors() const; diff --git a/include/simgrid/s4u/Mess.hpp b/include/simgrid/s4u/Mess.hpp new file mode 100644 index 0000000000..c0eaf9a5e3 --- /dev/null +++ b/include/simgrid/s4u/Mess.hpp @@ -0,0 +1,66 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_S4U_MESS_HPP +#define SIMGRID_S4U_MESS_HPP + +#include +#include + +#include +#include + +namespace simgrid::s4u { + +class XBT_PUBLIC Mess : public Activity_T { +#ifndef DOXYGEN + friend MessageQueue; // Factory of messages + friend kernel::activity::MessImpl; +#endif + MessageQueue* queue_ = nullptr; + void* payload_ = nullptr; + size_t dst_buff_size_ = 0; + void* dst_buff_ = nullptr; + kernel::activity::ActivityImplPtr pimpl_; + + Mess() = default; + Mess* do_start() override; + + static xbt::signal on_send; + xbt::signal on_this_send; + static xbt::signal on_recv; + xbt::signal on_this_recv; + + /* These ensure that the on_completion signals are really thrown */ + void fire_on_completion_for_real() const { Activity_T::fire_on_completion(); } + void fire_on_this_completion_for_real() const { Activity_T::fire_on_this_completion(); } + +public: +#ifndef DOXYGEN + Mess(Mess const&) = delete; + Mess& operator=(Mess const&) = delete; +#endif + + MessPtr set_queue(MessageQueue* queue); + MessageQueue* get_queue() const { return queue_; } + + /** Retrieve the payload associated to the communication. You can only do that once the comm is (gracefully) + * terminated */ + void* get_payload() const { return payload_; } + MessPtr set_payload(void* data); + MessPtr set_dst_data(void** buff, size_t size); + Actor* get_sender() const; + Actor* get_receiver() const; + + bool is_assigned() const override { return true; }; + + Mess* wait_for(double timeout) override; + + kernel::actor::ActorImpl* sender_ = nullptr; + kernel::actor::ActorImpl* receiver_ = nullptr; +}; +} // namespace simgrid::s4u + +#endif /* SIMGRID_S4U_MESS_HPP */ diff --git a/include/simgrid/s4u/MessageQueue.hpp b/include/simgrid/s4u/MessageQueue.hpp new file mode 100644 index 0000000000..dc00941216 --- /dev/null +++ b/include/simgrid/s4u/MessageQueue.hpp @@ -0,0 +1,112 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_S4U_MESSAGEQUEUE_HPP +#define SIMGRID_S4U_MESSAGEQUEUE_HPP + +#include +#include +#include + +#include + +namespace simgrid::s4u { + +class XBT_PUBLIC MessageQueue { +#ifndef DOXYGEN + friend Mess; + friend kernel::activity::MessageQueueImpl; +#endif + + kernel::activity::MessageQueueImpl* const pimpl_; + + explicit MessageQueue(kernel::activity::MessageQueueImpl * mqueue) : pimpl_(mqueue) {} + ~MessageQueue() = default; + +protected: + kernel::activity::MessageQueueImpl* get_impl() const { return pimpl_; } + +public: + /** @brief Retrieves the name of that message queue as a C++ string */ + const std::string& get_name() const; + /** @brief Retrieves the name of that message queue as a C string */ + const char* get_cname() const; + + /** \static Retrieve the message queye associated to the given name. Message queues are created on demand. */ + static MessageQueue* by_name(const std::string& name); + + /** Returns whether the message queue contains queued messages */ + bool empty() const; + + /* Returns the number of queued messages */ + size_t size() const; + + /** Gets the first element in the queue (without dequeuing it), or nullptr if none is there */ + kernel::activity::MessImplPtr front() const; + + /** Creates (but don't start) a data transmission to that message queue */ + MessPtr put_init(); + /** Creates (but don't start) a data transmission to that message queue. + * + * Please note that if you send a pointer to some data, you must ensure that your data remains live until + * consumption, or the receiver will get a pointer to a garbled memory area. + */ + MessPtr put_init(void* payload); + /** Creates and start a data transmission to that mailbox. + * + * Please note that if you send a pointer to some data, you must ensure that your data remains live until + * consumption, or the receiver will get a pointer to a garbled memory area. + */ + MessPtr put_async(void* payload); + + /** Blocking data transmission. + * + * Please note that if you send a pointer to some data, you must ensure that your data remains live until + * consumption, or the receiver will get a pointer to a garbled memory area. + */ + void put(void* payload); + /** Blocking data transmission with timeout */ + void put(void* payload, double timeout); + + /** Creates (but don't start) a data reception onto that message queue. */ + MessPtr get_init(); + /** Creates and start an async data reception to that message queue */ + template MessPtr get_async(T** data); + /** Creates and start an async data reception to that mailbox. Since the data location is not provided, you'll have to + * use Mess::get_payload once the messaging operation terminates */ + MessPtr get_async(); + + /** Blocking data reception */ + template T* get(); + template std::unique_ptr get_unique() { return std::unique_ptr(get()); } + + /** Blocking data reception with timeout */ + template T* get(double timeout); + template std::unique_ptr get_unique(double timeout) { return std::unique_ptr(get(timeout)); } +}; + +template MessPtr MessageQueue::get_async(T** data) +{ + MessPtr res = get_init()->set_dst_data(reinterpret_cast(data), sizeof(void*)); + res->start(); + return res; +} + +template T* MessageQueue::get() +{ + T* res = nullptr; + get_async(&res)->wait(); + return res; +} + +template T* MessageQueue::get(double timeout) +{ + T* res = nullptr; + get_async(&res)->wait_for(timeout); + return res; +} +} // namespace simgrid::s4u + +#endif /* SIMGRID_S4U_MESSAGEQUEUE_HPP */ diff --git a/src/kernel/EngineImpl.cpp b/src/kernel/EngineImpl.cpp index 9fa412e6ad..59da112542 100644 --- a/src/kernel/EngineImpl.cpp +++ b/src/kernel/EngineImpl.cpp @@ -155,6 +155,9 @@ EngineImpl::~EngineImpl() for (auto const& [_, mailbox] : mailboxes_) delete mailbox; + for (auto const& [_, queue] : mqueues_) + delete queue; + /* Kill all actors (but maestro) */ maestro_->kill_all(); run_all_actors(); diff --git a/src/kernel/EngineImpl.hpp b/src/kernel/EngineImpl.hpp index 4c25e54720..149c0c2a36 100644 --- a/src/kernel/EngineImpl.hpp +++ b/src/kernel/EngineImpl.hpp @@ -16,6 +16,7 @@ #include "src/kernel/activity/ExecImpl.hpp" #include "src/kernel/activity/IoImpl.hpp" #include "src/kernel/activity/MailboxImpl.hpp" +#include "src/kernel/activity/MessageQueueImpl.hpp" #include "src/kernel/activity/SleepImpl.hpp" #include "src/kernel/activity/Synchro.hpp" #include "src/kernel/actor/ActorImpl.hpp" @@ -33,6 +34,7 @@ namespace simgrid::kernel { class EngineImpl { std::unordered_map netpoints_; std::unordered_map mailboxes_; + std::unordered_map mqueues_; std::unordered_map registered_functions; // Maps function names to actor code actor::ActorCodeFactory default_function; // Function to use as a fallback when the provided name matches nothing diff --git a/src/kernel/activity/MessImpl.cpp b/src/kernel/activity/MessImpl.cpp new file mode 100644 index 0000000000..9f77c7da8e --- /dev/null +++ b/src/kernel/activity/MessImpl.cpp @@ -0,0 +1,174 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include + +#include "src/kernel/EngineImpl.hpp" +#include "src/kernel/activity/MessImpl.hpp" +#include "src/kernel/activity/MessageQueueImpl.hpp" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mess, kernel, "Kernel message synchronization"); + +namespace simgrid::kernel::activity { + +MessImpl::~MessImpl() +{ + if (queue_) + queue_->remove(this); +} + +MessImpl& MessImpl::set_type(MessImplType type) +{ + type_ = type; + return *this; +} + +MessImpl& MessImpl::set_queue(MessageQueueImpl* queue) +{ + queue_ = queue; + return *this; +} + +MessImpl& MessImpl::set_payload(void* payload) +{ + payload_ = payload; + return *this; +} + +MessImpl& MessImpl::set_dst_buff(unsigned char* buff, size_t* size) +{ + dst_buff_ = buff; + dst_buff_size_ = size; + return *this; +} + +MessImpl* MessImpl::start() +{ + if (get_state() == State::READY) + set_state(State::DONE); + return this; +} + +ActivityImplPtr MessImpl::iput(actor::MessIputSimcall* observer) +{ + auto* queue = observer->get_queue(); + XBT_DEBUG("put from message queue %p", queue); + + /* Prepare a synchro describing us, so that it gets passed to the user-provided filter of other side */ + MessImplPtr this_mess(new MessImpl()); + this_mess->set_type(MessImplType::PUT); + + /* Look for message synchro matching our needs. + * + * If it is not found then push our communication into the rendez-vous point */ + MessImplPtr other_mess = queue->find_matching_message(MessImplType::GET); + + if (not other_mess) { + other_mess = std::move(this_mess); + queue->push(other_mess); + } else { + XBT_DEBUG("Get already pushed"); + other_mess->set_state(State::READY); + } + + observer->set_message(other_mess.get()); + observer->get_issuer()->activities_.insert(other_mess); + + /* Setup synchro */ + other_mess->src_actor_ = observer->get_issuer(); + other_mess->payload_ = observer->get_payload(); + other_mess->start(); + + return other_mess; +} + +ActivityImplPtr MessImpl::iget(actor::MessIgetSimcall* observer) +{ + MessImplPtr this_synchro(new MessImpl()); + this_synchro->set_type(MessImplType::GET); + + auto* queue = observer->get_queue(); + XBT_DEBUG("get from message queue %p. this_synchro=%p", queue, this_synchro.get()); + + MessImplPtr other_mess = queue->find_matching_message(MessImplType::PUT); + + if (other_mess == nullptr) { + XBT_DEBUG("Get pushed first (%zu comm enqueued so far)", queue->size()); + other_mess = std::move(this_synchro); + queue->push(other_mess); + } else { + XBT_DEBUG("Match my %p with the existing %p", this_synchro.get(), other_mess.get()); + + other_mess->set_state(State::READY); + } + + observer->get_issuer()->activities_.insert(other_mess); + observer->set_message(other_mess.get()); + + /* Setup synchro */ + other_mess->set_dst_buff(observer->get_dst_buff(), observer->get_dst_buff_size()); + other_mess->dst_actor_ = observer->get_issuer(); + + other_mess->start(); + + return other_mess; +} + +void MessImpl::finish() +{ + XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(), + src_actor_.get(), dst_actor_.get()); + + if (get_iface()) { + const auto& piface = static_cast(*get_iface()); + set_iface(nullptr); // reset iface to protect against multiple trigger of the on_completion signals + piface.fire_on_completion_for_real(); + piface.fire_on_this_completion_for_real(); + } + + /* Update synchro state */ + if (get_state() == State::RUNNING) { + set_state(State::DONE); + } + + /* If the synchro is still in a rendez-vous point then remove from it */ + if (queue_) + queue_->remove(this); + + if (get_state() == State::DONE && payload_ != nullptr) + *(void**)(dst_buff_) = payload_; + + while (not simcalls_.empty()) { + actor::Simcall* simcall = simcalls_.front(); + simcalls_.pop_front(); + + /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany + * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the + * simcall */ + + if (simcall->call_ == actor::Simcall::Type::NONE) // FIXME: maybe a better way to handle this case + continue; // if actor handling comm is killed + + handle_activity_waitany(simcall); + + /* Check out for errors */ + + if (not simcall->issuer_->get_host()->is_on()) { + simcall->issuer_->set_wannadie(); + } else { + // Do not answer to dying actors + if (not simcall->issuer_->wannadie()) { + set_exception(simcall->issuer_); + simcall->issuer_->simcall_answer(); + } + } + + simcall->issuer_->waiting_synchro_ = nullptr; + simcall->issuer_->activities_.erase(this); + } +} + +} // namespace simgrid::kernel::activity diff --git a/src/kernel/activity/MessImpl.hpp b/src/kernel/activity/MessImpl.hpp new file mode 100644 index 0000000000..1232c61cef --- /dev/null +++ b/src/kernel/activity/MessImpl.hpp @@ -0,0 +1,48 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_KERNEL_ACTIVITY_MESS_HPP +#define SIMGRID_KERNEL_ACTIVITY_MESS_HPP + +#include "src/kernel/activity/ActivityImpl.hpp" +#include "src/kernel/actor/ActorImpl.hpp" +#include "src/kernel/actor/CommObserver.hpp" + +namespace simgrid::kernel::activity { + +enum class MessImplType { PUT, GET }; + +class XBT_PUBLIC MessImpl : public ActivityImpl_T { + ~MessImpl() override; + + MessageQueueImpl* queue_ = nullptr; + void* payload_ = nullptr; + MessImplType type_ = MessImplType::PUT; + unsigned char* dst_buff_ = nullptr; + size_t* dst_buff_size_ = nullptr; + +public: + MessImpl& set_type(MessImplType type); + MessImplType get_type() const { return type_; } + MessImpl& set_payload(void* payload); + void* get_payload() { return payload_; } + + MessImpl& set_queue(MessageQueueImpl* queue); + MessageQueueImpl* get_queue() const { return queue_; } + MessImpl& set_dst_buff(unsigned char* buff, size_t* size); + + static ActivityImplPtr iput(actor::MessIputSimcall* observer); + static ActivityImplPtr iget(actor::MessIgetSimcall* observer); + + MessImpl* start(); + void set_exception(actor::ActorImpl* issuer) override {}; + void finish() override; + + actor::ActorImplPtr src_actor_ = nullptr; + actor::ActorImplPtr dst_actor_ = nullptr; +}; +} // namespace simgrid::kernel::activity + +#endif diff --git a/src/kernel/activity/MessageQueueImpl.cpp b/src/kernel/activity/MessageQueueImpl.cpp new file mode 100644 index 0000000000..714f86bdd6 --- /dev/null +++ b/src/kernel/activity/MessageQueueImpl.cpp @@ -0,0 +1,54 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include "src/kernel/activity/MessageQueueImpl.hpp" + +#include + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_mq, kernel, "Message queue implementation"); + +namespace simgrid::kernel::activity { + +unsigned MessageQueueImpl::next_id_ = 0; + +void MessageQueueImpl::push(const MessImplPtr& mess) +{ + mess->set_queue(this); + this->queue_.push_back(std::move(mess)); +} + +void MessageQueueImpl::remove(const MessImplPtr& mess) +{ + xbt_assert(mess->get_queue() == this, "Message %p is in queue %s, not queue %s", mess.get(), + (mess->get_queue() ? mess->get_queue()->get_cname() : "(null)"), get_cname()); + + mess->set_queue(nullptr); + auto it = std::find(queue_.begin(), queue_.end(), mess); + if (it != queue_.end()) + queue_.erase(it); + else + xbt_die("Message %p not found in queue %s", mess.get(), get_cname()); +} + +MessImplPtr MessageQueueImpl::find_matching_message(MessImplType type) +{ + auto iter = std::find_if(queue_.begin(), queue_.end(), [&type](const MessImplPtr& mess) + { + return (mess->get_type() == type); + }); + if (iter == queue_.end()) { + XBT_DEBUG("No matching message synchro found"); + return nullptr; + } + + const MessImplPtr& mess = *iter; + XBT_DEBUG("Found a matching message synchro %p", mess.get()); + mess->set_queue(nullptr); + MessImplPtr mess_cpy = mess; + queue_.erase(iter); + return mess_cpy; +} + +} // namespace simgrid::kernel::activity diff --git a/src/kernel/activity/MessageQueueImpl.hpp b/src/kernel/activity/MessageQueueImpl.hpp new file mode 100644 index 0000000000..4bc010f956 --- /dev/null +++ b/src/kernel/activity/MessageQueueImpl.hpp @@ -0,0 +1,52 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#ifndef SIMGRID_KERNEL_ACTIVITY_MESSAGEQUEUE_HPP +#define SIMGRID_KERNEL_ACTIVITY_MESSAGEQUEUE_HPP + +#include "simgrid/s4u/Engine.hpp" +#include "simgrid/s4u/MessageQueue.hpp" +#include "src/kernel/activity/MessImpl.hpp" + +namespace simgrid::kernel::activity { + +/** @brief Implementation of the s4u::MessageQueue */ + +class MessageQueueImpl { + s4u::MessageQueue piface_; + std::string name_; + std::deque queue_; + + friend s4u::Engine; + friend s4u::MessageQueue; + friend s4u::MessageQueue* s4u::Engine::message_queue_by_name_or_create(const std::string& name) const; + friend s4u::MessageQueue* s4u::MessageQueue::by_name(const std::string& name); + + static unsigned next_id_; // Next ID to be given + const unsigned id_ = next_id_++; + explicit MessageQueueImpl(const std::string& name) : piface_(this), name_(name) {} + MessageQueueImpl(const MailboxImpl&) = delete; + MessageQueueImpl& operator=(const MailboxImpl&) = delete; + +public: + /** @brief Public interface */ + unsigned get_id() const { return id_; } + + const s4u::MessageQueue* get_iface() const { return &piface_; } + s4u::MessageQueue* get_iface() { return &piface_; } + + const std::string& get_name() const { return name_; } + const char* get_cname() const { return name_.c_str(); } + void push(const MessImplPtr& mess); + void remove(const MessImplPtr& mess); + bool empty() const { return queue_.empty(); } + size_t size() const { return queue_.size(); } + const MessImplPtr& front() const { return queue_.front(); } + + MessImplPtr find_matching_message(MessImplType type); +}; +} // namespace simgrid::kernel::activity + +#endif diff --git a/src/kernel/actor/CommObserver.cpp b/src/kernel/actor/CommObserver.cpp index 6f13e3ea46..7aa715de01 100644 --- a/src/kernel/actor/CommObserver.cpp +++ b/src/kernel/actor/CommObserver.cpp @@ -6,6 +6,7 @@ #include "simgrid/s4u/Host.hpp" #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/activity/MailboxImpl.hpp" +#include "src/kernel/activity/MessageQueueImpl.hpp" #include "src/kernel/actor/ActorImpl.hpp" #include "src/kernel/actor/SimcallObserver.hpp" #include "src/mc/mc_config.hpp" @@ -233,10 +234,35 @@ void CommIrecvSimcall::serialize(std::stringstream& stream) const XBT_DEBUG("RecvObserver comm:%p mbox:%u tag:%d", comm_, mbox_->get_id(), tag_); stream << ' ' << fun_call_; } + std::string CommIrecvSimcall::to_string() const { return "CommAsyncRecv(comm_id: " + std::to_string(comm_->get_id()) + " mbox:" + std::to_string(mbox_->get_id()) + " tag: " + std::to_string(tag_) + ")"; } +void MessIputSimcall::serialize(std::stringstream& stream) const +{ + stream << mess_ << ' ' << queue_; + XBT_DEBUG("PutObserver mess:%p queue:%p", mess_, queue_); +} + +std::string MessIputSimcall::to_string() const +{ + return "MessAsyncPut(queue:" + queue_->get_name() + ")"; +} + +void MessIgetSimcall::serialize(std::stringstream& stream) const +{ + stream << mess_ << ' ' << queue_; + XBT_DEBUG("GettObserver mess:%p queue:%p", mess_, queue_); +} + +std::string MessIgetSimcall::to_string() const +{ + return "MessAsyncGet(queue:" + queue_->get_name() + ")"; +} + + + } // namespace simgrid::kernel::actor diff --git a/src/kernel/actor/CommObserver.hpp b/src/kernel/actor/CommObserver.hpp index ab036510ee..161e095e05 100644 --- a/src/kernel/actor/CommObserver.hpp +++ b/src/kernel/actor/CommObserver.hpp @@ -186,6 +186,52 @@ public: auto const& get_copy_data_fun() const { return copy_data_fun_; } }; +class MessIputSimcall final : public SimcallObserver { + activity::MessageQueueImpl* queue_; + void* payload_; + activity::MessImpl* mess_ = {}; + +public: + MessIputSimcall( + ActorImpl* actor, activity::MessageQueueImpl* queue, void* payload) + : SimcallObserver(actor) + , queue_(queue) + , payload_(payload) + { + } + void serialize(std::stringstream& stream) const override; + std::string to_string() const override; + activity::MessageQueueImpl* get_queue() const { return queue_; } + void* get_payload() const { return payload_; } + void set_message(activity::MessImpl* mess) { mess_ = mess; } +}; + +class MessIgetSimcall final : public SimcallObserver { + activity::MessageQueueImpl* queue_; + unsigned char* dst_buff_; + size_t* dst_buff_size_; + void* payload_; + activity::MessImpl* mess_ = {}; + +public: + MessIgetSimcall(ActorImpl* actor, activity::MessageQueueImpl* queue, unsigned char* dst_buff, size_t* dst_buff_size, + void* payload) + : SimcallObserver(actor) + , queue_(queue) + , dst_buff_(dst_buff) + , dst_buff_size_(dst_buff_size) + , payload_(payload) + { + } + void serialize(std::stringstream& stream) const override; + std::string to_string() const override; + activity::MessageQueueImpl* get_queue() const { return queue_; } + unsigned char* get_dst_buff() const { return dst_buff_; } + size_t* get_dst_buff_size() const { return dst_buff_size_; } + void* get_payload() const { return payload_; } + void set_message(activity::MessImpl* mess) { mess_ = mess; } +}; + } // namespace simgrid::kernel::actor #endif diff --git a/src/s4u/s4u_Engine.cpp b/src/s4u/s4u_Engine.cpp index dc7e77f5ed..094e8311d1 100644 --- a/src/s4u/s4u_Engine.cpp +++ b/src/s4u/s4u_Engine.cpp @@ -400,6 +400,20 @@ Mailbox* Engine::mailbox_by_name_or_create(const std::string& name) const return mbox->get_iface(); } +MessageQueue* Engine::message_queue_by_name_or_create(const std::string& name) const +{ + /* two actors may have pushed the same mbox_create simcall at the same time */ + kernel::activity::MessageQueueImpl* queue = kernel::actor::simcall_answered([&name, this] { + auto [m, inserted] = pimpl_->mqueues_.try_emplace(name, nullptr); + if (inserted) { + m->second = new kernel::activity::MessageQueueImpl(name); + XBT_DEBUG("Creating a message queue at %p with name %s", m->second, name.c_str()); + } + return m->second; + }); + return queue->get_iface(); +} + /** @brief Returns the amount of links in the platform */ size_t Engine::get_link_count() const { diff --git a/src/s4u/s4u_Mess.cpp b/src/s4u/s4u_Mess.cpp new file mode 100644 index 0000000000..b16b387fdb --- /dev/null +++ b/src/s4u/s4u_Mess.cpp @@ -0,0 +1,153 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include +#include +#include +#include +#include + +#include "src/kernel/activity/MessImpl.hpp" +#include "src/kernel/actor/ActorImpl.hpp" +#include "src/kernel/actor/SimcallObserver.hpp" + +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mess, s4u_activity, "S4U asynchronous messaging"); + +namespace simgrid::s4u { +xbt::signal Mess::on_send; +xbt::signal Mess::on_recv; + +MessPtr Mess::set_queue(MessageQueue* queue) +{ + queue_ = queue; + return this; +} + +MessPtr Mess::set_payload(void* payload) +{ + payload_ = payload; + return this; +} + +MessPtr Mess::set_dst_data(void** buff, size_t size) +{ + xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)", + __FUNCTION__); + + dst_buff_ = buff; + dst_buff_size_ = size; + return this; +} + +Actor* Mess::get_sender() const +{ + kernel::actor::ActorImplPtr sender = nullptr; + if (pimpl_) + sender = boost::static_pointer_cast(pimpl_)->src_actor_; + return sender ? sender->get_ciface() : nullptr; +} + +Actor* Mess::get_receiver() const +{ + kernel::actor::ActorImplPtr receiver = nullptr; + if (pimpl_) + receiver = boost::static_pointer_cast(pimpl_)->dst_actor_; + return receiver ? receiver->get_ciface() : nullptr; +} + +Mess* Mess::do_start() +{ + xbt_assert(get_state() == State::INITED || get_state() == State::STARTING, + "You cannot use %s() once your message exchange has started (not implemented)", __FUNCTION__); + + auto myself = kernel::actor::ActorImpl::self(); + if (myself == sender_) { + on_send(*this); + on_this_send(*this); + kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()}; + pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); }, + &observer); + } else if (myself == receiver_) { + on_recv(*this); + on_this_recv(*this); + kernel::actor::MessIgetSimcall observer{receiver_, + queue_->get_impl(), + static_cast(dst_buff_), + &dst_buff_size_, + get_payload()}; + pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); }, + &observer); + } else { + xbt_die("Cannot start a message exchange before specifying whether we are the sender or the receiver"); + } + + pimpl_->set_iface(this); + pimpl_->set_actor(sender_); + // Only throw the signal when both sides are here and the status is READY + if (pimpl_->get_state() != kernel::activity::State::WAITING) { + fire_on_start(); + fire_on_this_start(); + } + state_ = State::STARTED; + return this; +} + +Mess* Mess::wait_for(double timeout) +{ + XBT_DEBUG("Calling Mess::wait_for with state %s", get_state_str()); + kernel::actor::ActorImpl* issuer = nullptr; + switch (state_) { + case State::FINISHED: + break; + case State::FAILED: + throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication"); + case State::INITED: + case State::STARTING: + if (get_payload() != nullptr) { + on_send(*this); + on_this_send(*this); + kernel::actor::MessIputSimcall observer{sender_, queue_->get_impl(), get_payload()}; + pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iput(&observer); }, + &observer); + } else { // Receiver + on_recv(*this); + on_this_recv(*this); + kernel::actor::MessIgetSimcall observer{receiver_, + queue_->get_impl(), + static_cast(dst_buff_), + &dst_buff_size_, + get_payload()}; + pimpl_ = kernel::actor::simcall_answered([&observer] { return kernel::activity::MessImpl::iget(&observer); }, + &observer); + } + break; + case State::STARTED: + try { + issuer = kernel::actor::ActorImpl::self(); + kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "Wait"}; + if (kernel::actor::simcall_blocking( + [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); }, + &observer)) { + throw TimeoutException(XBT_THROW_POINT, "Timeouted"); + } + } catch (const NetworkFailureException& e) { + issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr + complete(State::FAILED); + e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode."); + } + break; + + case State::CANCELED: + throw CancelException(XBT_THROW_POINT, "Message canceled"); + + default: + THROW_IMPOSSIBLE; + } + complete(State::FINISHED); + return this; +} + +} // namespace simgrid::s4u diff --git a/src/s4u/s4u_MessageQueue.cpp b/src/s4u/s4u_MessageQueue.cpp new file mode 100644 index 0000000000..8fe34ac8a2 --- /dev/null +++ b/src/s4u/s4u_MessageQueue.cpp @@ -0,0 +1,98 @@ +/* Copyright (c) 2023. The SimGrid Team. All rights reserved. */ + +/* This program is free software; you can redistribute it and/or modify it + * under the terms of the license (GNU LGPL) which comes with this package. */ + +#include +#include +#include + +#include "src/kernel/activity/MessageQueueImpl.hpp" + +XBT_LOG_EXTERNAL_CATEGORY(s4u); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_mqueue, s4u, "S4U Message Queues"); + +namespace simgrid::s4u { + +const std::string& MessageQueue::get_name() const +{ + return pimpl_->get_name(); +} + +const char* MessageQueue::get_cname() const +{ + return pimpl_->get_cname(); +} + +MessageQueue* MessageQueue::by_name(const std::string& name) +{ + return Engine::get_instance()->message_queue_by_name_or_create(name); +} + +bool MessageQueue::empty() const +{ + return pimpl_->empty(); +} + +size_t MessageQueue::size() const +{ + return pimpl_->size(); +} + +kernel::activity::MessImplPtr MessageQueue::front() const +{ + return pimpl_->empty() ? nullptr : pimpl_->front(); +} + +MessPtr MessageQueue::put_init() +{ + MessPtr res(new Mess()); + res->set_queue(this); + res->sender_ = kernel::actor::ActorImpl::self(); + return res; +} + +MessPtr MessageQueue::put_init(void* payload) +{ + return put_init()->set_payload(payload); +} + +MessPtr MessageQueue::put_async(void* payload) +{ + xbt_assert(payload != nullptr, "You cannot send nullptr"); + MessPtr res = put_init(payload); + res->start(); + return res; +} + +void MessageQueue::put(void* payload) +{ + xbt_assert(payload != nullptr, "You cannot send nullptr"); + + put_async(payload)->wait(); +} + +/** Blocking send with timeout */ +void MessageQueue::put(void* payload, double timeout) +{ + xbt_assert(payload != nullptr, "You cannot send nullptr"); + + put_init()->set_payload(payload)->start()->wait_for(timeout); +} + +MessPtr MessageQueue::get_init() +{ + MessPtr res(new Mess()); + res->set_queue(this); + res->receiver_ = kernel::actor::ActorImpl::self(); + return res; +} + +MessPtr MessageQueue::get_async() +{ + MessPtr res = get_init()->set_payload(nullptr); + res->start(); + return res; +} + +} // namespace simgrid::s4u diff --git a/tools/cmake/DefinePackages.cmake b/tools/cmake/DefinePackages.cmake index cee3e25ea5..e4ce5a258d 100644 --- a/tools/cmake/DefinePackages.cmake +++ b/tools/cmake/DefinePackages.cmake @@ -322,6 +322,10 @@ set(KERNEL_SRC src/kernel/activity/IoImpl.hpp src/kernel/activity/MailboxImpl.cpp src/kernel/activity/MailboxImpl.hpp + src/kernel/activity/MessImpl.cpp + src/kernel/activity/MessImpl.hpp + src/kernel/activity/MessageQueueImpl.cpp + src/kernel/activity/MessageQueueImpl.hpp src/kernel/activity/MutexImpl.cpp src/kernel/activity/MutexImpl.hpp src/kernel/activity/SemaphoreImpl.cpp @@ -472,6 +476,8 @@ set(S4U_SRC src/s4u/s4u_Io.cpp src/s4u/s4u_Link.cpp src/s4u/s4u_Mailbox.cpp + src/s4u/s4u_Mess.cpp + src/s4u/s4u_MessageQueue.cpp src/s4u/s4u_Mutex.cpp src/s4u/s4u_Netzone.cpp src/s4u/s4u_Semaphore.cpp @@ -695,6 +701,7 @@ set(headers_to_install include/simgrid/s4u/Io.hpp include/simgrid/s4u/Link.hpp include/simgrid/s4u/Mailbox.hpp + include/simgrid/s4u/MessageQueue.hpp include/simgrid/s4u/Mutex.hpp include/simgrid/s4u/NetZone.hpp include/simgrid/s4u/Semaphore.hpp -- 2.20.1