Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add a FAILED state to activities. tested on comm and exec
authorSUTER Frederic <frederic.suter@cc.in2p3.fr>
Mon, 23 Aug 2021 22:43:30 +0000 (00:43 +0200)
committerSUTER Frederic <frederic.suter@cc.in2p3.fr>
Mon, 23 Aug 2021 22:43:43 +0000 (00:43 +0200)
12 files changed:
MANIFEST.in
examples/cpp/CMakeLists.txt
examples/cpp/comm-failure/s4u-comm-failure.cpp [new file with mode: 0644]
examples/cpp/comm-failure/s4u-comm-failure.tesh [new file with mode: 0644]
examples/cpp/exec-failure/s4u-exec-failure.cpp [new file with mode: 0644]
examples/cpp/exec-failure/s4u-exec-failure.tesh [new file with mode: 0644]
include/simgrid/s4u/Activity.hpp
src/kernel/activity/CommImpl.cpp
src/kernel/activity/ExecImpl.cpp
src/kernel/activity/IoImpl.cpp
src/s4u/s4u_Activity.cpp
src/s4u/s4u_Comm.cpp

index 9c2f175..6fe2661 100644 (file)
@@ -173,6 +173,8 @@ include examples/cpp/clusters-multicpu/s4u-clusters-multicpu.cpp
 include examples/cpp/clusters-multicpu/s4u-clusters-multicpu.tesh
 include examples/cpp/comm-dependent/s4u-comm-dependent.cpp
 include examples/cpp/comm-dependent/s4u-comm-dependent.tesh
+include examples/cpp/comm-failure/s4u-comm-failure.cpp
+include examples/cpp/comm-failure/s4u-comm-failure.tesh
 include examples/cpp/comm-host2host/s4u-comm-host2host.cpp
 include examples/cpp/comm-host2host/s4u-comm-host2host.tesh
 include examples/cpp/comm-pingpong/s4u-comm-pingpong.cpp
@@ -242,6 +244,8 @@ include examples/cpp/exec-dependent/s4u-exec-dependent.cpp
 include examples/cpp/exec-dependent/s4u-exec-dependent.tesh
 include examples/cpp/exec-dvfs/s4u-exec-dvfs.cpp
 include examples/cpp/exec-dvfs/s4u-exec-dvfs.tesh
+include examples/cpp/exec-failure/s4u-exec-failure.cpp
+include examples/cpp/exec-failure/s4u-exec-failure.tesh
 include examples/cpp/exec-ptask-multicore/s4u-exec-ptask-multicore.cpp
 include examples/cpp/exec-ptask-multicore/s4u-exec-ptask-multicore.tesh
 include examples/cpp/exec-ptask/s4u-exec-ptask.cpp
index 960123b..a266bd4 100644 (file)
@@ -64,13 +64,13 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                  actor-lifetime actor-migrate actor-suspend actor-yield actor-stacksize
                  app-bittorrent app-chainsend app-token-ring
                  comm-pingpong comm-ready comm-serialize comm-suspend comm-wait comm-waitany comm-waitall comm-waituntil
-                 comm-dependent comm-host2host
+                 comm-dependent comm-host2host comm-failure
                  cloud-capping cloud-migration cloud-simple
                  dht-chord dht-kademlia
                  energy-exec energy-boot energy-link energy-vm energy-exec-ptask energy-wifi
                  engine-filtering
                  exec-async exec-basic exec-dvfs exec-remote exec-waitany exec-waitfor exec-dependent exec-unassigned
-                 exec-ptask-multicore exec-cpu-nonlinear exec-cpu-factors
+                 exec-ptask-multicore exec-cpu-nonlinear exec-cpu-factors exec-failure
                  maestro-set
                  mc-bugged1 mc-bugged2 mc-electric-fence mc-failing-assert
                                 network-wifi
diff --git a/examples/cpp/comm-failure/s4u-comm-failure.cpp b/examples/cpp/comm-failure/s4u-comm-failure.cpp
new file mode 100644 (file)
index 0000000..cc9a127
--- /dev/null
@@ -0,0 +1,130 @@
+/* Copyright (c) 2021. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example shows how to serialize a set of communications going through a link
+ *
+ * As for the other asynchronous examples, the sender initiates all the messages it wants to send and
+ * pack the resulting simgrid::s4u::CommPtr objects in a vector.
+ * At the same time, the receiver starts receiving all messages asynchronously. Without serialization,
+ * all messages would be received at the same timestamp in the receiver.
+ *
+ * However, as they will be serialized in a link of the platform, the messages arrive 2 by 2.
+ *
+ * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all()
+ */
+
+#include <simgrid/s4u.hpp>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_comm_failure, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+class Sender {
+  std::string mailbox1_name;
+  std::string mailbox2_name;
+
+public:
+  Sender(std::string mailbox1_name, std::string mailbox2_name)
+      : mailbox1_name(mailbox1_name), mailbox2_name(mailbox2_name)
+  {
+  }
+
+  void operator()()
+  {
+    auto mailbox1 = sg4::Mailbox::by_name(mailbox1_name);
+    auto mailbox2 = sg4::Mailbox::by_name(mailbox2_name);
+
+    XBT_INFO("Initiating asynchronous send to %s", mailbox1->get_cname());
+    auto comm1 = mailbox1->put_async((void*)666, 5);
+    XBT_INFO("Initiating asynchronous send to %s", mailbox2->get_cname());
+    auto comm2 = mailbox2->put_async((void*)666, 2);
+
+    XBT_INFO("Calling wait_any..");
+    std::vector<sg4::CommPtr> pending_comms;
+    pending_comms.push_back(comm1);
+    pending_comms.push_back(comm2);
+    long index;
+    try {
+      index = sg4::Comm::wait_any(pending_comms);
+      XBT_INFO("Wait any returned index %ld (comm to %s)", index, pending_comms.at(index)->get_mailbox()->get_cname());
+    } catch (simgrid::NetworkFailureException& e) {
+      XBT_INFO("Sender has experienced a network failure exception, so it knows that something went wrong");
+      XBT_INFO("Now it needs to figure out which of the two comms failed by looking at their state");
+    }
+
+    XBT_INFO("Comm to %s has state: %s", comm1->get_mailbox()->get_cname(), comm1->get_state_str());
+    XBT_INFO("Comm to %s has state: %s", comm2->get_mailbox()->get_cname(), comm2->get_state_str());
+
+    try {
+      comm1->wait();
+    } catch (simgrid::NetworkFailureException& e) {
+      XBT_INFO("Waiting on a FAILED comm raises an exception: '%s'", e.what());
+    }
+    XBT_INFO("Wait for remaining comm, just to be nice");
+    pending_comms.erase(pending_comms.begin());
+    index = simgrid::s4u::Comm::wait_any(pending_comms);
+  }
+};
+
+class Receiver {
+  std::string mailbox_name;
+
+public:
+  explicit Receiver(std::string mailbox_name) : mailbox_name(mailbox_name) {}
+
+  void operator()()
+  {
+    auto mailbox = sg4::Mailbox::by_name(mailbox_name);
+    XBT_INFO("Receiver posting a receive...");
+    try {
+      mailbox->get<void*>();
+      XBT_INFO("Receiver has received successfully!");
+    } catch (simgrid::NetworkFailureException& e) {
+      XBT_INFO("Receiver has experience a network failure exception");
+    }
+  }
+};
+
+class LinkKiller {
+  std::string link_name;
+
+public:
+  explicit LinkKiller(std::string link_name) : link_name(link_name) {}
+
+  void operator()()
+  {
+    auto link_to_kill = sg4::Link::by_name(link_name);
+    XBT_INFO("LinkKiller  sleeping 10 seconds...");
+    sg4::this_actor::sleep_for(10.0);
+    XBT_INFO("LinkKiller turning off link %s", link_to_kill->get_cname());
+    link_to_kill->turn_off();
+    XBT_INFO("LinkKiller killed. exiting");
+  }
+};
+
+int main(int argc, char** argv)
+{
+
+  sg4::Engine engine(&argc, argv);
+  auto* zone  = sg4::create_full_zone("AS0");
+  auto* host1 = zone->create_host("Host1", "1f");
+  auto* host2 = zone->create_host("Host2", "1f");
+  auto* host3 = zone->create_host("Host3", "1f");
+
+  sg4::LinkInRoute linkto2{zone->create_link("linkto2", "1bps")->seal()};
+  sg4::LinkInRoute linkto3{zone->create_link("linkto3", "1bps")->seal()};
+
+  zone->add_route(host1->get_netpoint(), host2->get_netpoint(), nullptr, nullptr, {linkto2}, false);
+  zone->add_route(host1->get_netpoint(), host3->get_netpoint(), nullptr, nullptr, {linkto3}, false);
+  zone->seal();
+
+  sg4::Actor::create("Sender", host1, Sender("mailbox2", "mailbox3"));
+  sg4::Actor::create("Receiver", host2, Receiver("mailbox2"))->daemonize();
+  sg4::Actor::create("Receiver", host3, Receiver("mailbox3"))->daemonize();
+  sg4::Actor::create("LinkKiller", host1, LinkKiller("linkto2"))->daemonize();
+
+  engine.run();
+
+  return 0;
+}
diff --git a/examples/cpp/comm-failure/s4u-comm-failure.tesh b/examples/cpp/comm-failure/s4u-comm-failure.tesh
new file mode 100644 (file)
index 0000000..c0b1209
--- /dev/null
@@ -0,0 +1,19 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-comm-failure "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [  0.000000] (4:LinkKiller@Host1) LinkKiller  sleeping 10 seconds...
+> [  0.000000] (2:Receiver@Host2) Receiver posting a receive...
+> [  0.000000] (3:Receiver@Host3) Receiver posting a receive...
+> [  0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox2
+> [  0.000000] (1:Sender@Host1) Initiating asynchronous send to mailbox3
+> [  0.000000] (1:Sender@Host1) Calling wait_any..
+> [ 10.000000] (4:LinkKiller@Host1) LinkKiller turning off link linkto2
+> [ 10.000000] (4:LinkKiller@Host1) LinkKiller killed. exiting
+> [ 10.000000] (2:Receiver@Host2) Receiver has experience a network failure exception
+> [ 10.000000] (1:Sender@Host1) Sender has experienced a network failure exception, so it knows that something went wrong
+> [ 10.000000] (1:Sender@Host1) Now it needs to figure out which of the two comms failed by looking at their state
+> [ 10.000000] (1:Sender@Host1) Comm to mailbox2 has state: FAILED
+> [ 10.000000] (1:Sender@Host1) Comm to mailbox3 has state: STARTED
+> [ 10.000000] (1:Sender@Host1) Waiting on a FAILED comm raises an exception: 'Cannot wait for a failed communication'
+> [ 10.000000] (1:Sender@Host1) Wait for remaining comm, just to be nice
+> [ 16.494845] (3:Receiver@Host3) Receiver has received successfully!
diff --git a/examples/cpp/exec-failure/s4u-exec-failure.cpp b/examples/cpp/exec-failure/s4u-exec-failure.cpp
new file mode 100644 (file)
index 0000000..f2f9b7e
--- /dev/null
@@ -0,0 +1,84 @@
+/* Copyright (c) 2021. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+/* This example shows how to serialize a set of communications going through a link
+ *
+ * As for the other asynchronous examples, the sender initiates all the messages it wants to send and
+ * pack the resulting simgrid::s4u::CommPtr objects in a vector.
+ * At the same time, the receiver starts receiving all messages asynchronously. Without serialization,
+ * all messages would be received at the same timestamp in the receiver.
+ *
+ * However, as they will be serialized in a link of the platform, the messages arrive 2 by 2.
+ *
+ * The sender then blocks until all ongoing communication terminate, using simgrid::s4u::Comm::wait_all()
+ */
+
+#include <simgrid/s4u.hpp>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_failure, "Messages specific for this s4u example");
+namespace sg4 = simgrid::s4u;
+
+static void dispatcher(sg4::Host* host1, sg4::Host* host2)
+{
+  std::vector<sg4::ExecPtr> pending_execs;
+  XBT_INFO("Initiating asynchronous exec on %s", host1->get_cname());
+  auto exec1 = sg4::this_actor::exec_init(20)->set_host(host1);
+  pending_execs.push_back(exec1);
+  exec1->start();
+  XBT_INFO("Initiating asynchronous exec on %s", host2->get_cname());
+  auto exec2 = sg4::this_actor::exec_init(20)->set_host(host2);
+  pending_execs.push_back(exec2);
+  exec2->start();
+
+  XBT_INFO("Calling wait_any..");
+  long index;
+  try {
+    index = sg4::Exec::wait_any(pending_execs);
+    XBT_INFO("Wait any returned index %ld (exec on %s)", index, pending_execs.at(index)->get_host()->get_cname());
+  } catch (simgrid::HostFailureException& e) {
+    XBT_INFO("Dispatcher has experienced a host failure exception, so it knows that something went wrong");
+    XBT_INFO("Now it needs to figure out which of the two execs failed by looking at their state");
+  }
+
+  XBT_INFO("Exec on %s has state: %s", pending_execs[0]->get_host()->get_cname(), pending_execs[0]->get_state_str());
+  XBT_INFO("Exec on %s has state: %s", pending_execs[1]->get_host()->get_cname(), pending_execs[1]->get_state_str());
+
+  try {
+    pending_execs[1]->wait();
+  } catch (simgrid::HostFailureException& e) {
+    XBT_INFO("Waiting on a FAILED exec raises an exception: '%s'", e.what());
+  }
+  pending_execs.pop_back();
+  XBT_INFO("Wait for remaining exec, just to be nice");
+  index = simgrid::s4u::Exec::wait_any(pending_execs);
+  XBT_INFO("Dispatcher ends");
+}
+
+static void host_killer(sg4::Host* to_kill)
+{
+  XBT_INFO("HostKiller  sleeping 10 seconds...");
+  sg4::this_actor::sleep_for(10.0);
+  XBT_INFO("HostKiller turning off host %s", to_kill->get_cname());
+  to_kill->turn_off();
+  XBT_INFO("HostKiller ends");
+}
+
+int main(int argc, char** argv)
+{
+
+  sg4::Engine engine(&argc, argv);
+
+  auto* zone  = sg4::create_full_zone("AS0");
+  auto* host1 = zone->create_host("Host1", "1f");
+  auto* host2 = zone->create_host("Host2", "1f");
+  zone->seal();
+
+  sg4::Actor::create("Dispatcher", host1, dispatcher, host1, host2);
+  sg4::Actor::create("HostKiller", host1, host_killer, host2)->daemonize();
+
+  engine.run();
+
+  return 0;
+}
diff --git a/examples/cpp/exec-failure/s4u-exec-failure.tesh b/examples/cpp/exec-failure/s4u-exec-failure.tesh
new file mode 100644 (file)
index 0000000..8c37127
--- /dev/null
@@ -0,0 +1,16 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-exec-failure "--log=root.fmt:[%10.6r]%e(%i:%a@%h)%e%m%n"
+> [  0.000000] (1:Dispatcher@Host1) Initiating asynchronous exec on Host1
+> [  0.000000] (2:HostKiller@Host1) HostKiller  sleeping 10 seconds...
+> [  0.000000] (1:Dispatcher@Host1) Initiating asynchronous exec on Host2
+> [  0.000000] (1:Dispatcher@Host1) Calling wait_any..
+> [ 10.000000] (2:HostKiller@Host1) HostKiller turning off host Host2
+> [ 10.000000] (1:Dispatcher@Host1) Dispatcher has experienced a host failure exception, so it knows that something went wrong
+> [ 10.000000] (1:Dispatcher@Host1) Now it needs to figure out which of the two execs failed by looking at their state
+> [ 10.000000] (1:Dispatcher@Host1) Exec on Host1 has state: STARTED
+> [ 10.000000] (1:Dispatcher@Host1) Exec on Host2 has state: FAILED
+> [ 10.000000] (1:Dispatcher@Host1) Waiting on a FAILED exec raises an exception: 'Cannot wait for a failed exec'
+> [ 10.000000] (1:Dispatcher@Host1) Wait for remaining exec, just to be nice
+> [ 10.000000] (2:HostKiller@Host1) HostKiller ends
+> [ 20.000000] (1:Dispatcher@Host1) Dispatcher ends
index 3e31a94..0b87eb5 100644 (file)
@@ -34,7 +34,7 @@ class XBT_PUBLIC Activity {
 
 public:
   // enum class State { ... }
-  XBT_DECLARE_ENUM_CLASS(State, INITED, STARTING, STARTED, CANCELED, FINISHED);
+  XBT_DECLARE_ENUM_CLASS(State, INITED, STARTING, STARTED, FAILED, CANCELED, FINISHED);
 
 protected:
   Activity()  = default;
index a91af44..8785fb7 100644 (file)
@@ -591,6 +591,10 @@ void CommImpl::finish()
       simcall->issuer_->context_->set_wannadie();
     } else {
       switch (state_) {
+        case State::FAILED:
+          simcall->issuer_->exception_ =
+              std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
+          break;
         case State::SRC_TIMEOUT:
           simcall->issuer_->exception_ = std::make_exception_ptr(
               TimeoutException(XBT_THROW_POINT, "Communication timeouted because of the sender"));
@@ -604,17 +608,21 @@ void CommImpl::finish()
         case State::SRC_HOST_FAILURE:
           if (simcall->issuer_ == src_actor_)
             simcall->issuer_->context_->set_wannadie();
-          else
+          else {
+            state_ = kernel::activity::State::FAILED;
             simcall->issuer_->exception_ =
                 std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
+          }
           break;
 
         case State::DST_HOST_FAILURE:
           if (simcall->issuer_ == dst_actor_)
             simcall->issuer_->context_->set_wannadie();
-          else
+          else {
+            state_ = kernel::activity::State::FAILED;
             simcall->issuer_->exception_ =
                 std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
+          }
           break;
 
         case State::LINK_FAILURE:
@@ -630,6 +638,7 @@ void CommImpl::finish()
           } else {
             XBT_DEBUG("I'm neither source nor dest");
           }
+          state_ = kernel::activity::State::FAILED;
           simcall->issuer_->throw_exception(
               std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Link failure")));
           break;
index 617cc86..3b55aff 100644 (file)
@@ -184,6 +184,7 @@ void ExecImpl::finish()
     }
     switch (state_) {
       case State::FAILED:
+        piface_->complete(s4u::Activity::State::FAILED);
         if (simcall->issuer_->get_host()->is_on())
           simcall->issuer_->exception_ = std::make_exception_ptr(HostFailureException(XBT_THROW_POINT, "Host failed"));
         else /* else, the actor will be killed with no possibility to survive */
@@ -249,7 +250,6 @@ void ExecImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<ExecImpl
   for (auto* exec : execs) {
     /* associate this simcall to the the synchro */
     exec->simcalls_.push_back(&issuer->simcall_);
-
     /* see if the synchro is already finished */
     if (exec->state_ != State::WAITING && exec->state_ != State::RUNNING) {
       exec->finish();
index 4f692b2..5cd665c 100644 (file)
@@ -129,6 +129,7 @@ void IoImpl::finish()
     switch (state_) {
       case State::FAILED:
         simcall->issuer_->context_->set_wannadie();
+        piface_->complete(s4u::Activity::State::FAILED);
         simcall->issuer_->exception_ =
             std::make_exception_ptr(StorageFailureException(XBT_THROW_POINT, "Storage failed"));
         break;
index a174119..ae3846b 100644 (file)
@@ -8,6 +8,8 @@
 #include "simgrid/Exception.hpp"
 #include "simgrid/s4u/Activity.hpp"
 #include "simgrid/s4u/Engine.hpp"
+#include "simgrid/s4u/Exec.hpp"
+#include "simgrid/s4u/Io.hpp"
 #include "src/kernel/activity/ActivityImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
 #include "src/kernel/actor/SimcallObserver.hpp"
@@ -30,6 +32,13 @@ Activity* Activity::wait_for(double timeout)
   if (state_ == State::INITED)
     vetoable_start();
 
+  if (state_ == State::FAILED) {
+    if (dynamic_cast<Exec*>(this))
+      throw HostFailureException(XBT_THROW_POINT, "Cannot wait for a failed exec");
+    if (dynamic_cast<Io*>(this))
+      throw StorageFailureException(XBT_THROW_POINT, "Cannot wait for a failed I/O");
+  }
+
   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
   kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
   if (kernel::actor::simcall_blocking(
index bcf557d..ca60e6f 100644 (file)
@@ -44,7 +44,17 @@ ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
   std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
   std::transform(begin(comms), end(comms), begin(rcomms),
                  [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
-  ssize_t changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
+  ssize_t changed_pos = -1;
+  try {
+    changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
+  } catch (const NetworkFailureException& e) {
+    for (auto c : comms) {
+      if (c->pimpl_->state_ == kernel::activity::State::FAILED) {
+        c->complete(State::FAILED);
+      }
+    }
+    e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
+  }
   if (changed_pos != -1)
     comms.at(changed_pos)->complete(State::FINISHED);
   return changed_pos;
@@ -214,6 +224,8 @@ Comm* Comm::wait_for(double timeout)
   switch (state_) {
     case State::FINISHED:
       break;
+    case State::FAILED:
+      throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
 
     case State::INITED:
     case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
@@ -232,7 +244,12 @@ Comm* Comm::wait_for(double timeout)
       break;
 
     case State::STARTED:
-      simcall_comm_wait(get_impl(), timeout);
+      try {
+        simcall_comm_wait(get_impl(), timeout);
+      } catch (const NetworkFailureException& e) {
+        complete(State::FAILED);
+        e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
+      }
       break;
 
     case State::CANCELED: