Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Send actor action probes one at a time
authorMaxwell Pirtle <maxwellpirtle@gmail.com>
Mon, 20 Feb 2023 08:41:16 +0000 (09:41 +0100)
committerMaxwell Pirtle <maxwellpirtle@gmail.com>
Mon, 20 Feb 2023 09:13:47 +0000 (10:13 +0100)
To avoid the scenario whereby the AppSide
delivers a message larger than the maximum
datagram side allowed by the application,
we instead send a single message for each actor.
This prevents us from needing to complicate the
sending logic in Channel.cpp, which would have
been made more complicated should we have wanted
to send variable-sized messages between the
checker and the application side

src/mc/api/RemoteApp.cpp
src/mc/remote/AppSide.cpp
src/mc/remote/Channel.cpp

index 00f34be..ad68366 100644 (file)
@@ -179,31 +179,44 @@ void RemoteApp::get_actors_status(std::map<aid_t, ActorState>& whereto) const
              to_c_str(answer.type), (int)answer.type, (int)received, (int)MessageType::ACTORS_STATUS_REPLY,
              (int)sizeof(answer));
 
+  // Message sanity checks
+  xbt_assert(answer.count >= 0, "Received an ACTOR_STATUS_REPLY message with an actor count of '%d' < 0", answer.count);
+  xbt_assert(answer.transition_count >= 0, "Received an ACTOR_STATUS_REPLY message with transition_count '%d' < 0",
+             answer.transition_count);
+  xbt_assert(answer.transition_count == 0 || answer.count >= 0,
+             "Received an ACTOR_STATUS_REPLY message with no actor data "
+             "but with transition data nonetheless");
+
   std::vector<s_mc_message_actors_status_one_t> status(answer.count);
   if (answer.count > 0) {
-    size_t size = status.size() * sizeof(s_mc_message_actors_status_one_t);
-    received    = model_checker_->channel().receive(status.data(), size);
-    xbt_assert(static_cast<size_t>(received) == size);
-  }
-
-  std::vector<s_mc_message_simcall_probe_one_t> action_pool(answer.transition_count);
-  if (answer.transition_count > 0) {
-    size_t size = action_pool.size() * sizeof(s_mc_message_simcall_probe_one_t);
-    received    = model_checker_->channel().receive(action_pool.data(), size);
+    size_t size      = status.size() * sizeof(s_mc_message_actors_status_one_t);
+    ssize_t received = model_checker_->channel().receive(status.data(), size);
     xbt_assert(static_cast<size_t>(received) == size);
   }
 
-  // Ensures that each actor sends precisely `actor.max_considered` transitions. While technically
+  // Ensures that each actor sends precisely `answer.transition_count` transitions. While technically
   // this doesn't catch the edge case where actor A sends 3 instead of 2 and actor B sends 2 instead
   // of 3 transitions, that is ignored here since that invariant needs to be enforced on the AppSide
   const auto expected_transitions = std::accumulate(
       status.begin(), status.end(), 0, [](int total, const auto& actor) { return total + actor.n_transitions; });
-  xbt_assert(expected_transitions == static_cast<int>(action_pool.size()),
-             "Expected to receive %d transition(s) but was only notified of %lu by the app side", expected_transitions,
-             action_pool.size());
+  xbt_assert(expected_transitions == answer.transition_count,
+             "Expected to receive %d transition(s) but was only notified of %d by the app side", expected_transitions,
+             answer.transition_count);
+
+  std::vector<s_mc_message_simcall_probe_one_t> probes(answer.transition_count);
+  if (answer.transition_count > 0) {
+    for (auto& probe : probes) {
+      size_t size      = sizeof(s_mc_message_simcall_probe_one_t);
+      ssize_t received = model_checker_->channel().receive(&probe, size);
+      xbt_assert(received >= 0, "Could not receive response to ACTORS_PROBE message (%s)", strerror(errno));
+      xbt_assert(static_cast<size_t>(received) == size,
+                 "Could not receive response to ACTORS_PROBE message (%zd bytes received != %zu bytes expected",
+                 received, size);
+    }
+  }
 
   whereto.clear();
-  auto action_pool_iter = std::move_iterator(action_pool.begin());
+  auto probes_iter = std::move_iterator(probes.begin());
 
   for (const auto& actor : status) {
     xbt_assert(actor.n_transitions == 0 || actor.n_transitions == actor.max_considered,
@@ -213,8 +226,8 @@ void RemoteApp::get_actors_status(std::map<aid_t, ActorState>& whereto) const
                actor.max_considered, actor.n_transitions);
 
     auto actor_transitions = std::vector<std::unique_ptr<Transition>>(actor.n_transitions);
-    for (int times_considered = 0; times_considered < actor.n_transitions; times_considered++, action_pool_iter++) {
-      std::stringstream stream((*action_pool_iter).buffer.data());
+    for (int times_considered = 0; times_considered < actor.n_transitions; times_considered++, probes_iter++) {
+      std::stringstream stream((*probes_iter).buffer.data());
       auto transition = std::unique_ptr<Transition>(deserialize_transition(actor.aid, times_considered, stream));
       actor_transitions[times_considered] = std::move(transition);
     }
index b03efac..9db7806 100644 (file)
@@ -215,10 +215,12 @@ void AppSide::handle_actors_status() const
       // each SIMCALL_EXECUTE provides a `times_considered` to be used to prepare
       // the transition before execution.
     }
-
-    size_t size = probes.size() * sizeof(s_mc_message_simcall_probe_one_t);
     XBT_DEBUG("Deliver ACTOR_TRANSITION_PROBE payload");
-    xbt_assert(channel_.send(probes.data(), size) == 0, "Could not send ACTOR_TRANSITION_PROBE payload");
+
+    for (const auto& probe : probes) {
+      size_t size = sizeof(s_mc_message_simcall_probe_one_t);
+      xbt_assert(channel_.send(&probe, size) == 0, "Could not send ACTOR_TRANSITION_PROBE payload (%zu bytes)", size);
+    }
   }
 }
 
index 556c9b5..dfd07e5 100644 (file)
@@ -34,9 +34,9 @@ int Channel::send(const void* message, size_t size) const
   }
 
   if (is_valid_MessageType(*static_cast<const int*>(message))) {
-    XBT_DEBUG("Sending %s (%lu bytes sent)", to_c_str(*static_cast<const MessageType*>(message)), size);
+    XBT_DEBUG("Sending %s (%zu bytes sent)", to_c_str(*static_cast<const MessageType*>(message)), size);
   } else {
-    XBT_DEBUG("Sending bytes directly (from address %p) (%lu bytes sent)", message, size);
+    XBT_DEBUG("Sending bytes directly (from address %p) (%zu bytes sent)", message, size);
   }
 
   return 0;
@@ -47,9 +47,9 @@ ssize_t Channel::receive(void* message, size_t size, bool block) const
   ssize_t res = recv(this->socket_, message, size, block ? 0 : MSG_DONTWAIT);
   if (res != -1) {
     if (is_valid_MessageType(*static_cast<int*>(message))) {
-      XBT_DEBUG("Receive %s (requested %lu; received %ld)", to_c_str(*static_cast<MessageType*>(message)), size, res);
+      XBT_DEBUG("Receive %s (requested %zu; received %zd)", to_c_str(*static_cast<MessageType*>(message)), size, res);
     } else {
-      XBT_DEBUG("Receive %ld bytes", res);
+      XBT_DEBUG("Receive %zd bytes", res);
     }
   } else {
     XBT_ERROR("Channel::receive failure: %s", strerror(errno));