xbt_die("SimGrid was compiled without MC support.");
#endif
} else {
- master_socket_ = socket(AF_LOCAL,
- SOCK_SEQPACKET
+ master_socket_ = socket(AF_UNIX,
+ SOCK_STREAM
#ifdef SOCK_CLOEXEC
| SOCK_CLOEXEC /* MacOSX does not have it */
#endif
xbt_assert(master_socket_ != -1, "Cannot create the master socket: %s", strerror(errno));
struct sockaddr_un serv_addr = {};
- serv_addr.sun_family = AF_LOCAL;
+ serv_addr.sun_family = AF_UNIX;
snprintf(serv_addr.sun_path, 64, "/tmp/simgrid-mc-%d", getpid());
strcpy(master_socket_name, serv_addr.sun_path);
auto addr_size = offsetof(struct sockaddr_un, sun_path) + strlen(serv_addr.sun_path);
xbt_assert(answer_size != -1, "Could not receive message");
xbt_assert(answer_size == sizeof answer, "Broken message (size=%zd; expected %zu)", answer_size, sizeof answer);
xbt_assert(answer.type == MessageType::ACTORS_STATUS_REPLY_COUNT,
- "Received unexpected message %s (%i); expected MessageType::ACTORS_STATUS_REPLY_COUNT (%i)",
+ "%d Received unexpected message %s (%i); expected MessageType::ACTORS_STATUS_REPLY_COUNT (%i)", getpid(),
to_c_str(answer.type), (int)answer.type, (int)MessageType::ACTORS_STATUS_REPLY_COUNT);
// Message sanity checks
int fd = xbt_str_parse_int(fd_env, "Not a number in variable '" MC_ENV_SOCKET_FD "'");
XBT_DEBUG("Model-checked application found socket FD %i", fd);
- // Check the socket type/validity:
- int type;
- socklen_t socklen = sizeof(type);
- xbt_assert(getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &socklen) == 0, "Could not check socket type");
- xbt_assert(type == SOCK_SEQPACKET, "Unexpected socket type %i", type);
- XBT_DEBUG("Model-checked application found expected socket type");
-
instance_ = std::make_unique<simgrid::mc::AppSide>(fd);
// Wait for the model-checker:
xbt_assert(pid >= 0, "Could not fork application sub-process: %s.", strerror(errno));
if (pid == 0) { // Child
- int sock = socket(AF_LOCAL,
- SOCK_SEQPACKET
+ int sock = socket(AF_UNIX,
+ SOCK_STREAM
#ifdef SOCK_CLOEXEC
| SOCK_CLOEXEC /* MacOSX does not have it */
#endif
0);
struct sockaddr_un addr = {};
- addr.sun_family = AF_LOCAL;
+ addr.sun_family = AF_UNIX;
snprintf(addr.sun_path, 64, "/tmp/simgrid-mc-%lu", static_cast<unsigned long>(msg->value));
auto addr_size = offsetof(struct sockaddr_un, sun_path) + strlen(addr.sun_path);
-/* Copyright (c) 2015-2023. The SimGrid Team.
- * All rights reserved. */
+/* Copyright (c) 2015-2023. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include <sys/types.h>
#include <unistd.h>
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_Channel, mc, "MC interprocess communication");
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(mc_channel, mc, "MC interprocess communication");
namespace simgrid::mc {
+Channel::Channel(int sock, Channel const& other) : socket_(sock)
+{
+ if (not other.buffer_.empty()) {
+ ssize_t size = other.buffer_.size();
+ XBT_DEBUG("Adopt %d bytes buffered by father channel.", (int)size);
+ buffer_.resize(size);
+ memcpy(buffer_.data(), other.buffer_.data(), size);
+ }
+}
Channel::~Channel()
{
return 0;
}
-ssize_t Channel::receive(void* message, size_t size) const
+ssize_t Channel::receive(const void* message, size_t size, int flags)
{
- ssize_t res = recv(this->socket_, message, size, 0);
- xbt_assert(res != -1, "Channel::receive failure: %s", strerror(errno));
+ size_t bufsize = buffer_.size();
+ ssize_t copied = 0;
+ void* whereto = const_cast<void*>(message);
+ size_t todo = size;
+ if (bufsize > 0) {
+ XBT_DEBUG("%d %zu bytes (of %zu expected) are already in buffer", getpid(), bufsize, size);
+ if (bufsize >= size) {
+ copied = size;
+ memcpy(whereto, buffer_.data(), size);
+ memcpy(static_cast<void*>(buffer_.data()), buffer_.data() + size, bufsize - size);
+ buffer_.resize(bufsize - size);
+ todo = 0;
+ } else {
+ copied = bufsize;
+ memcpy(whereto, buffer_.data(), bufsize);
+ buffer_.clear();
+ todo -= bufsize;
+ whereto = static_cast<char*>(whereto) + bufsize;
+ }
+ }
+ ssize_t res = 0;
+ if (todo > 0) {
+ errno = 0;
+ res = recv(this->socket_, whereto, todo, flags);
+ xbt_assert(res != -1 || errno == EAGAIN, "Channel::receive failure: %s", strerror(errno));
+ if (res == -1) {
+ res = 0;
+ }
+ }
+ XBT_DEBUG("%d Wanted %d; Got %d from buffer and %d from network", getpid(), (int)size, (int)copied, (int)res);
+ res += copied;
if (static_cast<size_t>(res) >= sizeof(int) && is_valid_MessageType(*static_cast<const int*>(message))) {
- XBT_DEBUG("Receive %s (requested %zu; received %zd at %p)", to_c_str(*static_cast<const MessageType*>(message)),
- size, res, message);
+ XBT_DEBUG("%d Receive %s (requested %zu; received %zd at %p)", getpid(),
+ to_c_str(*static_cast<const MessageType*>(message)), size, res, message);
} else {
XBT_DEBUG("Receive %zd bytes", res);
}
return res;
}
+
+void Channel::reinject(const char* data, size_t size)
+{
+ xbt_assert(size > 0, "Cannot reinject less than one char (size: %lu)", size);
+ auto prev_size = buffer_.size();
+ XBT_DEBUG("%d Reinject %zu bytes on top of %zu pre-existing bytes", getpid(), size, prev_size);
+ buffer_.resize(prev_size + size);
+ memcpy(buffer_.data() + prev_size, data, size);
+}
} // namespace simgrid::mc
class Channel {
int socket_ = -1;
template <class M> static constexpr bool messageType() { return std::is_class_v<M> && std::is_trivial_v<M>; }
+ std::vector<char> buffer_;
public:
Channel() = default;
explicit Channel(int sock) : socket_(sock) {}
+ Channel(int sock, Channel const& other);
~Channel();
// No copy:
}
// Receive
- ssize_t receive(void* message, size_t size) const;
- template <class M> typename std::enable_if_t<messageType<M>(), ssize_t> receive(M& m) const
+ ssize_t receive(const void* message, size_t size, int flags = 0);
+ template <class M> typename std::enable_if_t<messageType<M>(), ssize_t> receive(M& m)
{
- return this->receive(&m, sizeof(M));
+ return this->receive(&m, sizeof(M), 0);
}
+ void reinject(const char* data, size_t size);
+ bool has_pending_data() const { return not buffer_.empty(); }
// Socket handling
int get_socket() const { return socket_; }
"If you run from within a docker, adding `--cap-add SYS_PTRACE` to the docker line may help. "
"If it does not help, please report this bug.",
errno);
+ XBT_DEBUG("%d ptrace correctly setup.", getpid());
}
void CheckerSide::setup_events(bool socket_only)
[](evutil_socket_t, short events, void* arg) {
auto checker = static_cast<simgrid::mc::CheckerSide*>(arg);
if (events == EV_READ) {
- std::array<char, MC_MESSAGE_LENGTH> buffer;
- ssize_t size = recv(checker->get_channel().get_socket(), buffer.data(), buffer.size(), MSG_DONTWAIT);
- if (size == -1) {
- XBT_ERROR("Channel::receive failure: %s", strerror(errno));
- if (errno != EAGAIN)
- throw simgrid::xbt::errno_error();
- }
+ do {
+
+ std::array<char, MC_MESSAGE_LENGTH> buffer;
+ ssize_t size = checker->get_channel().receive(buffer.data(), buffer.size(), MSG_DONTWAIT);
+ if (size == -1) {
+ XBT_ERROR("Channel::receive failure: %s", strerror(errno));
+ if (errno != EAGAIN)
+ throw simgrid::xbt::errno_error();
+ }
- if (size == 0) // The app closed the socket. It must be dead by now.
- checker->handle_waitpid();
- else if (not checker->handle_message(buffer.data(), size))
- checker->break_loop();
+ if (size == 0) // The app closed the socket. It must be dead by now.
+ checker->handle_waitpid();
+ else if (not checker->handle_message(buffer.data(), size)) {
+ checker->break_loop();
+ break;
+ }
+ } while (checker->get_channel().has_pending_data());
} else {
xbt_die("Unexpected event");
}
{
XBT_DEBUG("Create a CheckerSide. Needs_meminfo: %s", need_memory_info ? "YES" : "no");
- // Create an AF_LOCAL socketpair used for exchanging messages between the model-checker process (ancestor)
+ // Create an AF_UNIX socketpair used for exchanging messages between the model-checker process (ancestor)
// and the application process (child)
int sockets[2];
- xbt_assert(socketpair(AF_LOCAL, SOCK_SEQPACKET, 0, sockets) != -1, "Could not create socketpair: %s",
- strerror(errno));
+ xbt_assert(socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != -1, "Could not create socketpair: %s", strerror(errno));
pid_ = fork();
xbt_assert(pid_ >= 0, "Could not fork application process");
/* This constructor is called when cloning a checkerside to get its application to fork away */
CheckerSide::CheckerSide(int socket, CheckerSide* child_checker)
- : channel_(socket), running_(true), child_checker_(child_checker)
+ : channel_(socket, child_checker->channel_), running_(true), child_checker_(child_checker)
{
setup_events(true); // We already have a signal handled in that case
(int)answer.type, (int)MessageType::FINALIZE_REPLY);
}
-void CheckerSide::dispatch_events() const
+void CheckerSide::dispatch_events()
{
event_base_dispatch(base_.get());
}
bool CheckerSide::handle_message(const char* buffer, ssize_t size)
{
s_mc_message_t base_message;
+ ssize_t consumed;
xbt_assert(size >= (ssize_t)sizeof(base_message), "Broken message. Got only %ld bytes.", size);
memcpy(&base_message, buffer, sizeof(base_message));
switch (base_message.type) {
case MessageType::IGNORE_HEAP: {
+ consumed = sizeof(s_mc_message_ignore_heap_t);
#if SIMGRID_HAVE_STATEFUL_MC
if (remote_memory_ != nullptr) {
s_mc_message_ignore_heap_t message;
- xbt_assert(size == sizeof(message), "Broken message");
+ xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
memcpy(&message, buffer, sizeof(message));
IgnoredHeapRegion region;
}
case MessageType::UNIGNORE_HEAP: {
+ consumed = sizeof(s_mc_message_ignore_memory_t);
#if SIMGRID_HAVE_STATEFUL_MC
if (remote_memory_ != nullptr) {
s_mc_message_ignore_memory_t message;
- xbt_assert(size == sizeof(message), "Broken message");
+ xbt_assert(size == static_cast<ssize_t>(sizeof(message)), "Broken message");
memcpy(&message, buffer, sizeof(message));
get_remote_memory()->unignore_heap((void*)message.addr, message.size);
} else
}
case MessageType::IGNORE_MEMORY: {
+ consumed = sizeof(s_mc_message_ignore_memory_t);
#if SIMGRID_HAVE_STATEFUL_MC
if (remote_memory_ != nullptr) {
s_mc_message_ignore_memory_t message;
- xbt_assert(size == sizeof(message), "Broken message");
+ xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
memcpy(&message, buffer, sizeof(message));
get_remote_memory()->ignore_region(message.addr, message.size);
} else
}
case MessageType::STACK_REGION: {
+ consumed = sizeof(s_mc_message_stack_region_t);
#if SIMGRID_HAVE_STATEFUL_MC
if (remote_memory_ != nullptr) {
s_mc_message_stack_region_t message;
- xbt_assert(size == sizeof(message), "Broken message");
+ xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
memcpy(&message, buffer, sizeof(message));
get_remote_memory()->stack_areas().push_back(message.stack_region);
} else
}
case MessageType::REGISTER_SYMBOL: {
+ consumed = sizeof(s_mc_message_register_symbol_t);
#if SIMGRID_HAVE_STATEFUL_MC
s_mc_message_register_symbol_t message;
- xbt_assert(size == sizeof(message), "Broken message");
+ xbt_assert(size >= static_cast<ssize_t>(sizeof(message)), "Broken message");
memcpy(&message, buffer, sizeof(message));
xbt_assert(not message.callback, "Support for client-side function proposition is not implemented.");
XBT_DEBUG("Received symbol: %s", message.name.data());
}
case MessageType::WAITING:
+ consumed = sizeof(s_mc_message_t);
+ if (size > consumed) {
+ XBT_DEBUG("%d reinject %d bytes after a %s message", getpid(), (int)(size - consumed),
+ to_c_str(base_message.type));
+ channel_.reinject(&buffer[consumed], size - consumed);
+ }
+
return false;
case MessageType::ASSERTION_FAILED:
+ consumed = sizeof(s_mc_message_t);
Exploration::get_instance()->report_assertion_failure();
break;
default:
xbt_die("Unexpected message from the application");
}
+ if (size > consumed) {
+ XBT_DEBUG("%d reinject %d bytes after a %s message", getpid(), (int)(size - consumed), to_c_str(base_message.type));
+ channel_.reinject(&buffer[consumed], size - consumed);
+ }
return true;
}
Channel& get_channel() { return channel_; }
bool handle_message(const char* buffer, ssize_t size);
- void dispatch_events() const;
+ void dispatch_events();
void break_loop() const;
void wait_for_requests();
/** Basic structure for a MC message
*
- * The current version of the client/server protocol sends C structures over `AF_LOCAL`
+ * The current version of the client/server protocol sends C structures over `AF_UNIX`
* `SOCK_SEQPACKET` sockets. This means that the protocol is ABI/architecture specific:
* we currently can't model-check a x86 process from a x86_64 process.
*