Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make the MC protocol work on top of STREAM sockets
authorMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 4 Apr 2023 21:45:27 +0000 (23:45 +0200)
committerMartin Quinson <martin.quinson@ens-rennes.fr>
Tue, 4 Apr 2023 22:05:45 +0000 (00:05 +0200)
MacOSX does not have AF_UNIX + SOCK_SEQPACKET so we need SOCK_STREAM
for that architecture. I'll make it SEQPACKET on Linux+FreeBSD and
STREAM on Mac in the next commit, but I wanted to code and debug the
SOCK_STREAM version locally.

The problem with the SOCK_STREAM version is that messages are not
segmented anymore: each recv() gets everything that got posted on the
socket already, so I have to implement a buffer in which I can
reinject the extraneous bytes received, so that they can be used in
the next recv(). The real fun begins when I have half of the expected
message in buffer and the other half must be taken from the network.

Or even better: the second half was not posted yet so reading from the
network is blocking for ever. This can happen when I receive any
message in a 512-bytes buffer, but I already had a full but shorter
message in buffer. I have to try reading from the socket with the
DONT_WAIT flag, detect that there is nothing to read, and use the
buffer content only.

src/mc/api/RemoteApp.cpp
src/mc/remote/AppSide.cpp
src/mc/remote/Channel.cpp
src/mc/remote/Channel.hpp
src/mc/remote/CheckerSide.cpp
src/mc/remote/CheckerSide.hpp
src/mc/remote/mc_protocol.h

index 78dbdf8..86e55f5 100644 (file)
@@ -48,8 +48,8 @@ RemoteApp::RemoteApp(const std::vector<char*>& args, bool need_memory_introspect
     xbt_die("SimGrid was compiled without MC support.");
 #endif
   } else {
-    master_socket_ = socket(AF_LOCAL,
-                            SOCK_SEQPACKET
+    master_socket_ = socket(AF_UNIX,
+                            SOCK_STREAM
 #ifdef SOCK_CLOEXEC
                                 | SOCK_CLOEXEC /* MacOSX does not have it */
 #endif
@@ -58,7 +58,7 @@ RemoteApp::RemoteApp(const std::vector<char*>& args, bool need_memory_introspect
     xbt_assert(master_socket_ != -1, "Cannot create the master socket: %s", strerror(errno));
 
     struct sockaddr_un serv_addr = {};
-    serv_addr.sun_family         = AF_LOCAL;
+    serv_addr.sun_family         = AF_UNIX;
     snprintf(serv_addr.sun_path, 64, "/tmp/simgrid-mc-%d", getpid());
     strcpy(master_socket_name, serv_addr.sun_path);
     auto addr_size = offsetof(struct sockaddr_un, sun_path) + strlen(serv_addr.sun_path);
@@ -122,7 +122,7 @@ void RemoteApp::get_actors_status(std::map<aid_t, ActorState>& whereto) const
   xbt_assert(answer_size != -1, "Could not receive message");
   xbt_assert(answer_size == sizeof answer, "Broken message (size=%zd; expected %zu)", answer_size, sizeof answer);
   xbt_assert(answer.type == MessageType::ACTORS_STATUS_REPLY_COUNT,
-             "Received unexpected message %s (%i); expected MessageType::ACTORS_STATUS_REPLY_COUNT (%i)",
+             "%d Received unexpected message %s (%i); expected MessageType::ACTORS_STATUS_REPLY_COUNT (%i)", getpid(),
              to_c_str(answer.type), (int)answer.type, (int)MessageType::ACTORS_STATUS_REPLY_COUNT);
 
   // Message sanity checks
index d6e0188..df1e06d 100644 (file)
@@ -60,13 +60,6 @@ AppSide* AppSide::get()
   int fd             = xbt_str_parse_int(fd_env, "Not a number in variable '" MC_ENV_SOCKET_FD "'");
   XBT_DEBUG("Model-checked application found socket FD %i", fd);
 
-  // Check the socket type/validity:
-  int type;
-  socklen_t socklen = sizeof(type);
-  xbt_assert(getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &socklen) == 0, "Could not check socket type");
-  xbt_assert(type == SOCK_SEQPACKET, "Unexpected socket type %i", type);
-  XBT_DEBUG("Model-checked application found expected socket type");
-
   instance_ = std::make_unique<simgrid::mc::AppSide>(fd);
 
   // Wait for the model-checker:
@@ -162,8 +155,8 @@ void AppSide::handle_fork(const s_mc_message_int_t* msg)
   xbt_assert(pid >= 0, "Could not fork application sub-process: %s.", strerror(errno));
 
   if (pid == 0) { // Child
-    int sock = socket(AF_LOCAL,
-                      SOCK_SEQPACKET
+    int sock = socket(AF_UNIX,
+                      SOCK_STREAM
 #ifdef SOCK_CLOEXEC
                           | SOCK_CLOEXEC /* MacOSX does not have it */
 #endif
@@ -171,7 +164,7 @@ void AppSide::handle_fork(const s_mc_message_int_t* msg)
                       0);
 
     struct sockaddr_un addr = {};
-    addr.sun_family         = AF_LOCAL;
+    addr.sun_family         = AF_UNIX;
     snprintf(addr.sun_path, 64, "/tmp/simgrid-mc-%lu", static_cast<unsigned long>(msg->value));
     auto addr_size = offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path);
 
index 6160412..ac202ed 100644 (file)
@@ -1,5 +1,4 @@
-/* Copyright (c) 2015-2023. The SimGrid Team.
- * All rights reserved.                                                     */
+/* Copyright (c) 2015-2023. The SimGrid Team.  All rights reserved.         */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 #include <sys/types.h>
 #include <unistd.h>
 
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_Channel, mc, "MC interprocess communication");
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_channel, mc, "MC interprocess communication");
 
 namespace simgrid::mc {
+Channel::Channel(int sock, Channel const& other) : socket_(sock)
+{
+  if (not other.buffer_.empty()) {
+    ssize_t size = other.buffer_.size();
+    XBT_DEBUG("Adopt %d bytes buffered by father channel.", (int)size);
+    buffer_.resize(size);
+    memcpy(buffer_.data(), other.buffer_.data(), size);
+  }
+}
 
 Channel::~Channel()
 {
@@ -43,16 +51,54 @@ int Channel::send(const void* message, size_t size) const
   return 0;
 }
 
-ssize_t Channel::receive(void* message, size_t size) const
+ssize_t Channel::receive(const void* message, size_t size, int flags)
 {
-  ssize_t res = recv(this->socket_, message, size, 0);
-  xbt_assert(res != -1, "Channel::receive failure: %s", strerror(errno));
+  size_t bufsize = buffer_.size();
+  ssize_t copied = 0;
+  void* whereto  = const_cast<void*>(message);
+  size_t todo    = size;
+  if (bufsize > 0) {
+    XBT_DEBUG("%d %zu bytes (of %zu expected) are already in buffer", getpid(), bufsize, size);
+    if (bufsize >= size) {
+      copied = size;
+      memcpy(whereto, buffer_.data(), size);
+      memcpy(static_cast<void*>(buffer_.data()), buffer_.data() + size, bufsize - size);
+      buffer_.resize(bufsize - size);
+      todo = 0;
+    } else {
+      copied = bufsize;
+      memcpy(whereto, buffer_.data(), bufsize);
+      buffer_.clear();
+      todo -= bufsize;
+      whereto = static_cast<char*>(whereto) + bufsize;
+    }
+  }
+  ssize_t res = 0;
+  if (todo > 0) {
+    errno = 0;
+    res   = recv(this->socket_, whereto, todo, flags);
+    xbt_assert(res != -1 || errno == EAGAIN, "Channel::receive failure: %s", strerror(errno));
+    if (res == -1) {
+      res = 0;
+    }
+  }
+  XBT_DEBUG("%d Wanted %d; Got %d from buffer and %d from network", getpid(), (int)size, (int)copied, (int)res);
+  res += copied;
   if (static_cast<size_t>(res) >= sizeof(int) && is_valid_MessageType(*static_cast<const int*>(message))) {
-    XBT_DEBUG("Receive %s (requested %zu; received %zd at %p)", to_c_str(*static_cast<const MessageType*>(message)),
-              size, res, message);
+    XBT_DEBUG("%d Receive %s (requested %zu; received %zd at %p)", getpid(),
+              to_c_str(*static_cast<const MessageType*>(message)), size, res, message);
   } else {
     XBT_DEBUG("Receive %zd bytes", res);
   }
   return res;
 }
+
+void Channel::reinject(const char* data, size_t size)
+{
+  xbt_assert(size > 0, "Cannot reinject less than one char (size: %lu)", size);
+  auto prev_size = buffer_.size();
+  XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, prev_size);
+  buffer_.resize(prev_size + size);
+  memcpy(buffer_.data() + prev_size, data, size);
+}
 } // namespace simgrid::mc
index c9e1aeb..7f326ad 100644 (file)
@@ -20,10 +20,12 @@ namespace simgrid::mc {
 class Channel {
   int socket_ = -1;
   template <class M> static constexpr bool messageType() { return std::is_class_v<M> && std::is_trivial_v<M>; }
+  std::vector<char> buffer_;
 
 public:
   Channel() = default;
   explicit Channel(int sock) : socket_(sock) {}
+  Channel(int sock, Channel const& other);
   ~Channel();
 
   // No copy:
@@ -44,11 +46,13 @@ public:
   }
 
   // Receive
-  ssize_t receive(void* message, size_t size) const;
-  template <class M> typename std::enable_if_t<messageType<M>(), ssize_t> receive(M& m) const
+  ssize_t receive(const void* message, size_t size, int flags = 0);
+  template <class M> typename std::enable_if_t<messageType<M>(), ssize_t> receive(M& m)
   {
-    return this->receive(&m, sizeof(M));
+    return this->receive(&m, sizeof(M), 0);
   }
+  void reinject(const char* data, size_t size);
+  bool has_pending_data() const { return not buffer_.empty(); }
 
   // Socket handling
   int get_socket() const { return socket_; }
index 3b89940..b88d7e2 100644 (file)
@@ -116,6 +116,7 @@ static void wait_application_process(pid_t pid)
              "If you run from within a docker, adding `--cap-add SYS_PTRACE` to the docker line may help. "
              "If it does not help, please report this bug.",
              errno);
+  XBT_DEBUG("%d ptrace correctly setup.", getpid());
 }
 
 void CheckerSide::setup_events(bool socket_only)
@@ -128,18 +129,23 @@ void CheckerSide::setup_events(bool socket_only)
       [](evutil_socket_t, short events, void* arg) {
         auto checker = static_cast<simgrid::mc::CheckerSide*>(arg);
         if (events == EV_READ) {
-          std::array<char, MC_MESSAGE_LENGTH> buffer;
-          ssize_t size = recv(checker->get_channel().get_socket(), buffer.data(), buffer.size(), MSG_DONTWAIT);
-          if (size == -1) {
-            XBT_ERROR("Channel::receive failure: %s", strerror(errno));
-            if (errno != EAGAIN)
-              throw simgrid::xbt::errno_error();
-          }
+          do {
+
+            std::array<char, MC_MESSAGE_LENGTH> buffer;
+            ssize_t size = checker->get_channel().receive(buffer.data(), buffer.size(), MSG_DONTWAIT);
+            if (size == -1) {
+              XBT_ERROR("Channel::receive failure: %s", strerror(errno));
+              if (errno != EAGAIN)
+                throw simgrid::xbt::errno_error();
+            }
 
-          if (size == 0) // The app closed the socket. It must be dead by now.
-            checker->handle_waitpid();
-          else if (not checker->handle_message(buffer.data(), size))
-            checker->break_loop();
+            if (size == 0) // The app closed the socket. It must be dead by now.
+              checker->handle_waitpid();
+            else if (not checker->handle_message(buffer.data(), size)) {
+              checker->break_loop();
+              break;
+            }
+          } while (checker->get_channel().has_pending_data());
         } else {
           xbt_die("Unexpected event");
         }
@@ -173,11 +179,10 @@ CheckerSide::CheckerSide(const std::vector<char*>& args, bool need_memory_info)
 {
   XBT_DEBUG("Create a CheckerSide. Needs_meminfo: %s", need_memory_info ? "YES" : "no");
 
-  // Create an AF_LOCAL socketpair used for exchanging messages between the model-checker process (ancestor)
+  // Create an AF_UNIX socketpair used for exchanging messages between the model-checker process (ancestor)
   // and the application process (child)
   int sockets[2];
-  xbt_assert(socketpair(AF_LOCAL, SOCK_SEQPACKET, 0, sockets) != -1, "Could not create socketpair: %s",
-             strerror(errno));
+  xbt_assert(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != -1, "Could not create socketpair: %s", strerror(errno));
 
   pid_ = fork();
   xbt_assert(pid_ >= 0, "Could not fork application process");
@@ -230,7 +235,7 @@ CheckerSide::~CheckerSide()
 
 /* This constructor is called when cloning a checkerside to get its application to fork away */
 CheckerSide::CheckerSide(int socket, CheckerSide* child_checker)
-    : channel_(socket), running_(true), child_checker_(child_checker)
+    : channel_(socket, child_checker->channel_), running_(true), child_checker_(child_checker)
 {
   setup_events(true); // We already have a signal handled in that case
 
@@ -275,7 +280,7 @@ void CheckerSide::finalize(bool terminate_asap)
              (int)answer.type, (int)MessageType::FINALIZE_REPLY);
 }
 
-void CheckerSide::dispatch_events() const
+void CheckerSide::dispatch_events()
 {
   event_base_dispatch(base_.get());
 }
@@ -288,15 +293,17 @@ void CheckerSide::break_loop() const
 bool CheckerSide::handle_message(const char* buffer, ssize_t size)
 {
   s_mc_message_t base_message;
+  ssize_t consumed;
   xbt_assert(size >= (ssize_t)sizeof(base_message), "Broken message. Got only %ld bytes.", size);
   memcpy(&base_message, buffer, sizeof(base_message));
 
   switch (base_message.type) {
     case MessageType::IGNORE_HEAP: {
+      consumed = sizeof(s_mc_message_ignore_heap_t);
 #if SIMGRID_HAVE_STATEFUL_MC
       if (remote_memory_ != nullptr) {
         s_mc_message_ignore_heap_t message;
-        xbt_assert(size == sizeof(message), "Broken message");
+        xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
         memcpy(&message, buffer, sizeof(message));
 
         IgnoredHeapRegion region;
@@ -312,10 +319,11 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size)
     }
 
     case MessageType::UNIGNORE_HEAP: {
+      consumed = sizeof(s_mc_message_ignore_memory_t);
 #if SIMGRID_HAVE_STATEFUL_MC
       if (remote_memory_ != nullptr) {
         s_mc_message_ignore_memory_t message;
-        xbt_assert(size == sizeof(message), "Broken message");
+        xbt_assert(size == static_cast<ssize_t>(sizeof(message)), "Broken message");
         memcpy(&message, buffer, sizeof(message));
         get_remote_memory()->unignore_heap((void*)message.addr, message.size);
       } else
@@ -325,10 +333,11 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size)
     }
 
     case MessageType::IGNORE_MEMORY: {
+      consumed = sizeof(s_mc_message_ignore_memory_t);
 #if SIMGRID_HAVE_STATEFUL_MC
       if (remote_memory_ != nullptr) {
         s_mc_message_ignore_memory_t message;
-        xbt_assert(size == sizeof(message), "Broken message");
+        xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
         memcpy(&message, buffer, sizeof(message));
         get_remote_memory()->ignore_region(message.addr, message.size);
       } else
@@ -338,10 +347,11 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size)
     }
 
     case MessageType::STACK_REGION: {
+      consumed = sizeof(s_mc_message_stack_region_t);
 #if SIMGRID_HAVE_STATEFUL_MC
       if (remote_memory_ != nullptr) {
         s_mc_message_stack_region_t message;
-        xbt_assert(size == sizeof(message), "Broken message");
+        xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
         memcpy(&message, buffer, sizeof(message));
         get_remote_memory()->stack_areas().push_back(message.stack_region);
       } else
@@ -351,9 +361,10 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size)
     }
 
     case MessageType::REGISTER_SYMBOL: {
+      consumed = sizeof(s_mc_message_register_symbol_t);
 #if SIMGRID_HAVE_STATEFUL_MC
       s_mc_message_register_symbol_t message;
-      xbt_assert(size == sizeof(message), "Broken message");
+      xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
       memcpy(&message, buffer, sizeof(message));
       xbt_assert(not message.callback, "Support for client-side function proposition is not implemented.");
       XBT_DEBUG("Received symbol: %s", message.name.data());
@@ -366,15 +377,27 @@ bool CheckerSide::handle_message(const char* buffer, ssize_t size)
     }
 
     case MessageType::WAITING:
+      consumed = sizeof(s_mc_message_t);
+      if (size > consumed) {
+        XBT_DEBUG("%d reinject %d bytes after a %s message", getpid(), (int)(size - consumed),
+                  to_c_str(base_message.type));
+        channel_.reinject(&buffer[consumed], size - consumed);
+      }
+
       return false;
 
     case MessageType::ASSERTION_FAILED:
+      consumed = sizeof(s_mc_message_t);
       Exploration::get_instance()->report_assertion_failure();
       break;
 
     default:
       xbt_die("Unexpected message from the application");
   }
+  if (size > consumed) {
+    XBT_DEBUG("%d reinject %d bytes after a %s message", getpid(), (int)(size - consumed), to_c_str(base_message.type));
+    channel_.reinject(&buffer[consumed], size - consumed);
+  }
   return true;
 }
 
index 84c70d1..9d99973 100644 (file)
@@ -52,7 +52,7 @@ public:
   Channel& get_channel() { return channel_; }
 
   bool handle_message(const char* buffer, ssize_t size);
-  void dispatch_events() const;
+  void dispatch_events();
   void break_loop() const;
   void wait_for_requests();
 
index 920f43f..5da8aef 100644 (file)
@@ -36,7 +36,7 @@ constexpr unsigned SIMCALL_SERIALIZATION_BUFFER_SIZE = 2048;
 
 /** Basic structure for a MC message
  *
- *  The current version of the client/server protocol sends C structures over `AF_LOCAL`
+ *  The current version of the client/server protocol sends C structures over `AF_UNIX`
  *  `SOCK_SEQPACKET` sockets. This means that the protocol is ABI/architecture specific:
  *  we currently can't model-check a x86 process from a x86_64 process.
  *