From 159bd689add924b1387a9caa0a095ba77a716aba Mon Sep 17 00:00:00 2001 From: Martin Quinson Date: Tue, 4 Apr 2023 23:45:27 +0200 Subject: [PATCH] Make the MC protocol work on top of STREAM sockets MacOSX does not have AF_UNIX + SOCK_SEQPACKET so we need SOCK_STREAM for that architecture. I'll make it SEQPACKET on Linux+FreeBSD and STREAM on Mac in the next commit, but I wanted to code and debug the SOCK_STREAM version locally. The problem with the SOCK_STREAM version is that messages are not segmented anymore: each recv() gets everything that got posted on the socket already, so I have to implement a buffer in which I can reinject the extraneous bytes received, so that they can be used in the next recv(). The real fun begins when I have half of the expected message in buffer and the other half must be taken from the network. Or even better: the second half was not posted yet so reading from the network is blocking for ever. This can happen when I receive any message in a 512-bytes buffer, but I already had a full but shorter message in buffer. I have to try reading from the socket with the DONT_WAIT flag, detect that there is nothing to read, and use the buffer content only. --- src/mc/api/RemoteApp.cpp | 8 ++--- src/mc/remote/AppSide.cpp | 13 ++----- src/mc/remote/Channel.cpp | 62 ++++++++++++++++++++++++++++----- src/mc/remote/Channel.hpp | 10 ++++-- src/mc/remote/CheckerSide.cpp | 65 ++++++++++++++++++++++++----------- src/mc/remote/CheckerSide.hpp | 2 +- src/mc/remote/mc_protocol.h | 2 +- 7 files changed, 114 insertions(+), 48 deletions(-) diff --git a/src/mc/api/RemoteApp.cpp b/src/mc/api/RemoteApp.cpp index 78dbdf8376..86e55f5fac 100644 --- a/src/mc/api/RemoteApp.cpp +++ b/src/mc/api/RemoteApp.cpp @@ -48,8 +48,8 @@ RemoteApp::RemoteApp(const std::vector& args, bool need_memory_introspect xbt_die("SimGrid was compiled without MC support."); #endif } else { - master_socket_ = socket(AF_LOCAL, - SOCK_SEQPACKET + master_socket_ = socket(AF_UNIX, + SOCK_STREAM #ifdef SOCK_CLOEXEC | SOCK_CLOEXEC /* MacOSX does not have it */ #endif @@ -58,7 +58,7 @@ RemoteApp::RemoteApp(const std::vector& args, bool need_memory_introspect xbt_assert(master_socket_ != -1, "Cannot create the master socket: %s", strerror(errno)); struct sockaddr_un serv_addr = {}; - serv_addr.sun_family = AF_LOCAL; + serv_addr.sun_family = AF_UNIX; snprintf(serv_addr.sun_path, 64, "/tmp/simgrid-mc-%d", getpid()); strcpy(master_socket_name, serv_addr.sun_path); auto addr_size = offsetof(struct sockaddr_un, sun_path) + strlen(serv_addr.sun_path); @@ -122,7 +122,7 @@ void RemoteApp::get_actors_status(std::map& whereto) const xbt_assert(answer_size != -1, "Could not receive message"); xbt_assert(answer_size == sizeof answer, "Broken message (size=%zd; expected %zu)", answer_size, sizeof answer); xbt_assert(answer.type == MessageType::ACTORS_STATUS_REPLY_COUNT, - "Received unexpected message %s (%i); expected MessageType::ACTORS_STATUS_REPLY_COUNT (%i)", + "%d Received unexpected message %s (%i); expected MessageType::ACTORS_STATUS_REPLY_COUNT (%i)", getpid(), to_c_str(answer.type), (int)answer.type, (int)MessageType::ACTORS_STATUS_REPLY_COUNT); // Message sanity checks diff --git a/src/mc/remote/AppSide.cpp b/src/mc/remote/AppSide.cpp index d6e0188a9a..df1e06d93d 100644 --- a/src/mc/remote/AppSide.cpp +++ b/src/mc/remote/AppSide.cpp @@ -60,13 +60,6 @@ AppSide* AppSide::get() int fd = xbt_str_parse_int(fd_env, "Not a number in variable '" MC_ENV_SOCKET_FD "'"); XBT_DEBUG("Model-checked application found socket FD %i", fd); - // Check the socket type/validity: - int type; - socklen_t socklen = sizeof(type); - xbt_assert(getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &socklen) == 0, "Could not check socket type"); - xbt_assert(type == SOCK_SEQPACKET, "Unexpected socket type %i", type); - XBT_DEBUG("Model-checked application found expected socket type"); - instance_ = std::make_unique(fd); // Wait for the model-checker: @@ -162,8 +155,8 @@ void AppSide::handle_fork(const s_mc_message_int_t* msg) xbt_assert(pid >= 0, "Could not fork application sub-process: %s.", strerror(errno)); if (pid == 0) { // Child - int sock = socket(AF_LOCAL, - SOCK_SEQPACKET + int sock = socket(AF_UNIX, + SOCK_STREAM #ifdef SOCK_CLOEXEC | SOCK_CLOEXEC /* MacOSX does not have it */ #endif @@ -171,7 +164,7 @@ void AppSide::handle_fork(const s_mc_message_int_t* msg) 0); struct sockaddr_un addr = {}; - addr.sun_family = AF_LOCAL; + addr.sun_family = AF_UNIX; snprintf(addr.sun_path, 64, "/tmp/simgrid-mc-%lu", static_cast(msg->value)); auto addr_size = offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path); diff --git a/src/mc/remote/Channel.cpp b/src/mc/remote/Channel.cpp index 616041231b..ac202ed39d 100644 --- a/src/mc/remote/Channel.cpp +++ b/src/mc/remote/Channel.cpp @@ -1,5 +1,4 @@ -/* Copyright (c) 2015-2023. The SimGrid Team. - * All rights reserved. */ +/* Copyright (c) 2015-2023. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -13,9 +12,18 @@ #include #include -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_Channel, mc, "MC interprocess communication"); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_channel, mc, "MC interprocess communication"); namespace simgrid::mc { +Channel::Channel(int sock, Channel const& other) : socket_(sock) +{ + if (not other.buffer_.empty()) { + ssize_t size = other.buffer_.size(); + XBT_DEBUG("Adopt %d bytes buffered by father channel.", (int)size); + buffer_.resize(size); + memcpy(buffer_.data(), other.buffer_.data(), size); + } +} Channel::~Channel() { @@ -43,16 +51,54 @@ int Channel::send(const void* message, size_t size) const return 0; } -ssize_t Channel::receive(void* message, size_t size) const +ssize_t Channel::receive(const void* message, size_t size, int flags) { - ssize_t res = recv(this->socket_, message, size, 0); - xbt_assert(res != -1, "Channel::receive failure: %s", strerror(errno)); + size_t bufsize = buffer_.size(); + ssize_t copied = 0; + void* whereto = const_cast(message); + size_t todo = size; + if (bufsize > 0) { + XBT_DEBUG("%d %zu bytes (of %zu expected) are already in buffer", getpid(), bufsize, size); + if (bufsize >= size) { + copied = size; + memcpy(whereto, buffer_.data(), size); + memcpy(static_cast(buffer_.data()), buffer_.data() + size, bufsize - size); + buffer_.resize(bufsize - size); + todo = 0; + } else { + copied = bufsize; + memcpy(whereto, buffer_.data(), bufsize); + buffer_.clear(); + todo -= bufsize; + whereto = static_cast(whereto) + bufsize; + } + } + ssize_t res = 0; + if (todo > 0) { + errno = 0; + res = recv(this->socket_, whereto, todo, flags); + xbt_assert(res != -1 || errno == EAGAIN, "Channel::receive failure: %s", strerror(errno)); + if (res == -1) { + res = 0; + } + } + XBT_DEBUG("%d Wanted %d; Got %d from buffer and %d from network", getpid(), (int)size, (int)copied, (int)res); + res += copied; if (static_cast(res) >= sizeof(int) && is_valid_MessageType(*static_cast(message))) { - XBT_DEBUG("Receive %s (requested %zu; received %zd at %p)", to_c_str(*static_cast(message)), - size, res, message); + XBT_DEBUG("%d Receive %s (requested %zu; received %zd at %p)", getpid(), + to_c_str(*static_cast(message)), size, res, message); } else { XBT_DEBUG("Receive %zd bytes", res); } return res; } + +void Channel::reinject(const char* data, size_t size) +{ + xbt_assert(size > 0, "Cannot reinject less than one char (size: %lu)", size); + auto prev_size = buffer_.size(); + XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, prev_size); + buffer_.resize(prev_size + size); + memcpy(buffer_.data() + prev_size, data, size); +} } // namespace simgrid::mc diff --git a/src/mc/remote/Channel.hpp b/src/mc/remote/Channel.hpp index c9e1aeb770..7f326ad106 100644 --- a/src/mc/remote/Channel.hpp +++ b/src/mc/remote/Channel.hpp @@ -20,10 +20,12 @@ namespace simgrid::mc { class Channel { int socket_ = -1; template static constexpr bool messageType() { return std::is_class_v && std::is_trivial_v; } + std::vector buffer_; public: Channel() = default; explicit Channel(int sock) : socket_(sock) {} + Channel(int sock, Channel const& other); ~Channel(); // No copy: @@ -44,11 +46,13 @@ public: } // Receive - ssize_t receive(void* message, size_t size) const; - template typename std::enable_if_t(), ssize_t> receive(M& m) const + ssize_t receive(const void* message, size_t size, int flags = 0); + template typename std::enable_if_t(), ssize_t> receive(M& m) { - return this->receive(&m, sizeof(M)); + return this->receive(&m, sizeof(M), 0); } + void reinject(const char* data, size_t size); + bool has_pending_data() const { return not buffer_.empty(); } // Socket handling int get_socket() const { return socket_; } diff --git a/src/mc/remote/CheckerSide.cpp b/src/mc/remote/CheckerSide.cpp index 3b89940d3e..b88d7e2ba0 100644 --- a/src/mc/remote/CheckerSide.cpp +++ b/src/mc/remote/CheckerSide.cpp @@ -116,6 +116,7 @@ static void wait_application_process(pid_t pid) "If you run from within a docker, adding `--cap-add SYS_PTRACE` to the docker line may help. " "If it does not help, please report this bug.", errno); + XBT_DEBUG("%d ptrace correctly setup.", getpid()); } void CheckerSide::setup_events(bool socket_only) @@ -128,18 +129,23 @@ void CheckerSide::setup_events(bool socket_only) [](evutil_socket_t, short events, void* arg) { auto checker = static_cast(arg); if (events == EV_READ) { - std::array buffer; - ssize_t size = recv(checker->get_channel().get_socket(), buffer.data(), buffer.size(), MSG_DONTWAIT); - if (size == -1) { - XBT_ERROR("Channel::receive failure: %s", strerror(errno)); - if (errno != EAGAIN) - throw simgrid::xbt::errno_error(); - } + do { + + std::array buffer; + ssize_t size = checker->get_channel().receive(buffer.data(), buffer.size(), MSG_DONTWAIT); + if (size == -1) { + XBT_ERROR("Channel::receive failure: %s", strerror(errno)); + if (errno != EAGAIN) + throw simgrid::xbt::errno_error(); + } - if (size == 0) // The app closed the socket. It must be dead by now. - checker->handle_waitpid(); - else if (not checker->handle_message(buffer.data(), size)) - checker->break_loop(); + if (size == 0) // The app closed the socket. It must be dead by now. + checker->handle_waitpid(); + else if (not checker->handle_message(buffer.data(), size)) { + checker->break_loop(); + break; + } + } while (checker->get_channel().has_pending_data()); } else { xbt_die("Unexpected event"); } @@ -173,11 +179,10 @@ CheckerSide::CheckerSide(const std::vector& args, bool need_memory_info) { XBT_DEBUG("Create a CheckerSide. Needs_meminfo: %s", need_memory_info ? "YES" : "no"); - // Create an AF_LOCAL socketpair used for exchanging messages between the model-checker process (ancestor) + // Create an AF_UNIX socketpair used for exchanging messages between the model-checker process (ancestor) // and the application process (child) int sockets[2]; - xbt_assert(socketpair(AF_LOCAL, SOCK_SEQPACKET, 0, sockets) != -1, "Could not create socketpair: %s", - strerror(errno)); + xbt_assert(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != -1, "Could not create socketpair: %s", strerror(errno)); pid_ = fork(); xbt_assert(pid_ >= 0, "Could not fork application process"); @@ -230,7 +235,7 @@ CheckerSide::~CheckerSide() /* This constructor is called when cloning a checkerside to get its application to fork away */ CheckerSide::CheckerSide(int socket, CheckerSide* child_checker) - : channel_(socket), running_(true), child_checker_(child_checker) + : channel_(socket, child_checker->channel_), running_(true), child_checker_(child_checker) { setup_events(true); // We already have a signal handled in that case @@ -275,7 +280,7 @@ void CheckerSide::finalize(bool terminate_asap) (int)answer.type, (int)MessageType::FINALIZE_REPLY); } -void CheckerSide::dispatch_events() const +void CheckerSide::dispatch_events() { event_base_dispatch(base_.get()); } @@ -288,15 +293,17 @@ void CheckerSide::break_loop() const bool CheckerSide::handle_message(const char* buffer, ssize_t size) { s_mc_message_t base_message; + ssize_t consumed; xbt_assert(size >= (ssize_t)sizeof(base_message), "Broken message. Got only %ld bytes.", size); memcpy(&base_message, buffer, sizeof(base_message)); switch (base_message.type) { case MessageType::IGNORE_HEAP: { + consumed = sizeof(s_mc_message_ignore_heap_t); #if SIMGRID_HAVE_STATEFUL_MC if (remote_memory_ != nullptr) { s_mc_message_ignore_heap_t message; - xbt_assert(size == sizeof(message), "Broken message"); + xbt_assert(size >= static_cast(sizeof(message)), "Broken message"); memcpy(&message, buffer, sizeof(message)); IgnoredHeapRegion region; @@ -312,10 +319,11 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size) } case MessageType::UNIGNORE_HEAP: { + consumed = sizeof(s_mc_message_ignore_memory_t); #if SIMGRID_HAVE_STATEFUL_MC if (remote_memory_ != nullptr) { s_mc_message_ignore_memory_t message; - xbt_assert(size == sizeof(message), "Broken message"); + xbt_assert(size == static_cast(sizeof(message)), "Broken message"); memcpy(&message, buffer, sizeof(message)); get_remote_memory()->unignore_heap((void*)message.addr, message.size); } else @@ -325,10 +333,11 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size) } case MessageType::IGNORE_MEMORY: { + consumed = sizeof(s_mc_message_ignore_memory_t); #if SIMGRID_HAVE_STATEFUL_MC if (remote_memory_ != nullptr) { s_mc_message_ignore_memory_t message; - xbt_assert(size == sizeof(message), "Broken message"); + xbt_assert(size >= static_cast(sizeof(message)), "Broken message"); memcpy(&message, buffer, sizeof(message)); get_remote_memory()->ignore_region(message.addr, message.size); } else @@ -338,10 +347,11 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size) } case MessageType::STACK_REGION: { + consumed = sizeof(s_mc_message_stack_region_t); #if SIMGRID_HAVE_STATEFUL_MC if (remote_memory_ != nullptr) { s_mc_message_stack_region_t message; - xbt_assert(size == sizeof(message), "Broken message"); + xbt_assert(size >= static_cast(sizeof(message)), "Broken message"); memcpy(&message, buffer, sizeof(message)); get_remote_memory()->stack_areas().push_back(message.stack_region); } else @@ -351,9 +361,10 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size) } case MessageType::REGISTER_SYMBOL: { + consumed = sizeof(s_mc_message_register_symbol_t); #if SIMGRID_HAVE_STATEFUL_MC s_mc_message_register_symbol_t message; - xbt_assert(size == sizeof(message), "Broken message"); + xbt_assert(size >= static_cast(sizeof(message)), "Broken message"); memcpy(&message, buffer, sizeof(message)); xbt_assert(not message.callback, "Support for client-side function proposition is not implemented."); XBT_DEBUG("Received symbol: %s", message.name.data()); @@ -366,15 +377,27 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size) } case MessageType::WAITING: + consumed = sizeof(s_mc_message_t); + if (size > consumed) { + XBT_DEBUG("%d reinject %d bytes after a %s message", getpid(), (int)(size - consumed), + to_c_str(base_message.type)); + channel_.reinject(&buffer[consumed], size - consumed); + } + return false; case MessageType::ASSERTION_FAILED: + consumed = sizeof(s_mc_message_t); Exploration::get_instance()->report_assertion_failure(); break; default: xbt_die("Unexpected message from the application"); } + if (size > consumed) { + XBT_DEBUG("%d reinject %d bytes after a %s message", getpid(), (int)(size - consumed), to_c_str(base_message.type)); + channel_.reinject(&buffer[consumed], size - consumed); + } return true; } diff --git a/src/mc/remote/CheckerSide.hpp b/src/mc/remote/CheckerSide.hpp index 84c70d1252..9d9997314e 100644 --- a/src/mc/remote/CheckerSide.hpp +++ b/src/mc/remote/CheckerSide.hpp @@ -52,7 +52,7 @@ public: Channel& get_channel() { return channel_; } bool handle_message(const char* buffer, ssize_t size); - void dispatch_events() const; + void dispatch_events(); void break_loop() const; void wait_for_requests(); diff --git a/src/mc/remote/mc_protocol.h b/src/mc/remote/mc_protocol.h index 920f43f379..5da8aef608 100644 --- a/src/mc/remote/mc_protocol.h +++ b/src/mc/remote/mc_protocol.h @@ -36,7 +36,7 @@ constexpr unsigned SIMCALL_SERIALIZATION_BUFFER_SIZE = 2048; /** Basic structure for a MC message * - * The current version of the client/server protocol sends C structures over `AF_LOCAL` + * The current version of the client/server protocol sends C structures over `AF_UNIX` * `SOCK_SEQPACKET` sockets. This means that the protocol is ABI/architecture specific: * we currently can't model-check a x86 process from a x86_64 process. * -- 2.20.1