Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Update copyright lines for 2022.
[simgrid.git] / src / kernel / activity / CommImpl.cpp
index ba477b8..6f2f9e0 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2007-2021. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2007-2022. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/activity/MailboxImpl.hpp"
 #include "src/kernel/context/Context.hpp"
+#include "src/kernel/resource/CpuImpl.hpp"
+#include "src/kernel/resource/LinkImpl.hpp"
+#include "src/kernel/resource/StandardLinkImpl.hpp"
 #include "src/mc/mc_replay.hpp"
-#include "src/surf/cpu_interface.hpp"
-#include "src/surf/network_interface.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(simix_network, simix, "SIMIX network-related synchronization");
 
@@ -226,6 +227,8 @@ void SIMIX_comm_copy_pointer_callback(simgrid::kernel::activity::CommImpl* comm,
 namespace simgrid {
 namespace kernel {
 namespace activity {
+xbt::signal<void(CommImpl const&)> CommImpl::on_start;
+xbt::signal<void(CommImpl const&)> CommImpl::on_completion;
 
 void (*CommImpl::copy_data_callback_)(CommImpl*, void*, size_t) = &s4u::Comm::copy_pointer_callback;
 
@@ -300,7 +303,6 @@ CommImpl* CommImpl::start()
   if (state_ == State::READY) {
     from_ = from_ != nullptr ? from_ : src_actor_->get_host();
     to_   = to_ != nullptr ? to_ : dst_actor_->get_host();
-
     /* Getting the network_model from the origin host
      * Valid while we have a single network model, otherwise we would need to change this function to first get the
      * routes and later create the respective surf actions */
@@ -309,7 +311,9 @@ CommImpl* CommImpl::start()
     surf_action_ = net_model->communicate(from_, to_, size_, rate_);
     surf_action_->set_activity(this);
     surf_action_->set_category(get_tracing_category());
+    start_time_ = surf_action_->get_start_time();
     state_ = State::RUNNING;
+    on_start(*this);
 
     XBT_DEBUG("Starting communication %p from '%s' to '%s' (surf_action: %p; state: %s)", this, from_->get_cname(),
               to_->get_cname(), surf_action_, get_state_str());
@@ -341,6 +345,16 @@ CommImpl* CommImpl::start()
   return this;
 }
 
+std::vector<s4u::Link*> CommImpl::get_traversed_links() const
+{
+  xbt_assert(state_ != State::WAITING, "You cannot use %s() if your communication is not ready (%s)", __FUNCTION__,
+             get_state_str());
+  std::vector<s4u::Link*> vlinks;
+  XBT_ATTRIB_UNUSED double res = 0;
+  from_->route_to(to_, vlinks, &res);
+  return vlinks;
+}
+
 /** @brief Copy the communication data from the sender's buffer to the receiver's one  */
 void CommImpl::copy_data()
 {
@@ -519,6 +533,8 @@ void CommImpl::cleanup_surf()
 
 void CommImpl::post()
 {
+  on_completion(*this);
+
   /* Update synchro state */
   if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
     state_ = State::SRC_TIMEOUT;
@@ -542,11 +558,74 @@ void CommImpl::post()
   /* Answer all simcalls associated with the synchro */
   finish();
 }
+void CommImpl::set_exception(actor::ActorImpl* issuer)
+{
+  switch (state_) {
+    case State::FAILED:
+      issuer->exception_ = std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
+      break;
+    case State::SRC_TIMEOUT:
+      issuer->exception_ =
+          std::make_exception_ptr(TimeoutException(XBT_THROW_POINT, "Communication timeouted because of the sender"));
+      break;
+
+    case State::DST_TIMEOUT:
+      issuer->exception_ =
+          std::make_exception_ptr(TimeoutException(XBT_THROW_POINT, "Communication timeouted because of the receiver"));
+      break;
+
+    case State::SRC_HOST_FAILURE:
+      if (issuer == src_actor_)
+        issuer->context_->set_wannadie();
+      else {
+        state_             = kernel::activity::State::FAILED;
+        issuer->exception_ = std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
+      }
+      break;
+
+    case State::DST_HOST_FAILURE:
+      if (issuer == dst_actor_)
+        issuer->context_->set_wannadie();
+      else {
+        state_             = kernel::activity::State::FAILED;
+        issuer->exception_ = std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
+      }
+      break;
+
+    case State::LINK_FAILURE:
+      XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) "
+                "detached:%d",
+                this, src_actor_ ? src_actor_->get_host()->get_cname() : nullptr,
+                dst_actor_ ? dst_actor_->get_host()->get_cname() : nullptr, issuer->get_cname(), issuer, detached_);
+      if (src_actor_ == issuer) {
+        XBT_DEBUG("I'm source");
+      } else if (dst_actor_ == issuer) {
+        XBT_DEBUG("I'm dest");
+      } else {
+        XBT_DEBUG("I'm neither source nor dest");
+      }
+      state_ = kernel::activity::State::FAILED;
+      issuer->throw_exception(std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Link failure")));
+      break;
+
+    case State::CANCELED:
+      if (issuer == dst_actor_)
+        issuer->exception_ =
+            std::make_exception_ptr(CancelException(XBT_THROW_POINT, "Communication canceled by the sender"));
+      else
+        issuer->exception_ =
+            std::make_exception_ptr(CancelException(XBT_THROW_POINT, "Communication canceled by the receiver"));
+      break;
+
+    default:
+      xbt_assert(state_ == State::DONE, "Internal error in CommImpl::finish(): unexpected synchro state %s",
+                 to_c_str(state_));
+  }
+}
 
 void CommImpl::finish()
 {
   XBT_DEBUG("CommImpl::finish() in state %s", to_c_str(state_));
-
   /* If the synchro is still in a rendez-vous point then remove from it */
   if (mbox_)
     mbox_->remove(this);
@@ -585,72 +664,7 @@ void CommImpl::finish()
     if (not simcall->issuer_->get_host()->is_on()) {
       simcall->issuer_->context_->set_wannadie();
     } else {
-      switch (state_) {
-        case State::FAILED:
-          simcall->issuer_->exception_ =
-              std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
-          break;
-        case State::SRC_TIMEOUT:
-          simcall->issuer_->exception_ = std::make_exception_ptr(
-              TimeoutException(XBT_THROW_POINT, "Communication timeouted because of the sender"));
-          break;
-
-        case State::DST_TIMEOUT:
-          simcall->issuer_->exception_ = std::make_exception_ptr(
-              TimeoutException(XBT_THROW_POINT, "Communication timeouted because of the receiver"));
-          break;
-
-        case State::SRC_HOST_FAILURE:
-          if (simcall->issuer_ == src_actor_)
-            simcall->issuer_->context_->set_wannadie();
-          else {
-            state_ = kernel::activity::State::FAILED;
-            simcall->issuer_->exception_ =
-                std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
-          }
-          break;
-
-        case State::DST_HOST_FAILURE:
-          if (simcall->issuer_ == dst_actor_)
-            simcall->issuer_->context_->set_wannadie();
-          else {
-            state_ = kernel::activity::State::FAILED;
-            simcall->issuer_->exception_ =
-                std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Remote peer failed"));
-          }
-          break;
-
-        case State::LINK_FAILURE:
-          XBT_DEBUG("Link failure in synchro %p between '%s' and '%s': posting an exception to the issuer: %s (%p) "
-                    "detached:%d",
-                    this, src_actor_ ? src_actor_->get_host()->get_cname() : nullptr,
-                    dst_actor_ ? dst_actor_->get_host()->get_cname() : nullptr, simcall->issuer_->get_cname(),
-                    simcall->issuer_, detached_);
-          if (src_actor_ == simcall->issuer_) {
-            XBT_DEBUG("I'm source");
-          } else if (dst_actor_ == simcall->issuer_) {
-            XBT_DEBUG("I'm dest");
-          } else {
-            XBT_DEBUG("I'm neither source nor dest");
-          }
-          state_ = kernel::activity::State::FAILED;
-          simcall->issuer_->throw_exception(
-              std::make_exception_ptr(NetworkFailureException(XBT_THROW_POINT, "Link failure")));
-          break;
-
-        case State::CANCELED:
-          if (simcall->issuer_ == dst_actor_)
-            simcall->issuer_->exception_ =
-                std::make_exception_ptr(CancelException(XBT_THROW_POINT, "Communication canceled by the sender"));
-          else
-            simcall->issuer_->exception_ =
-                std::make_exception_ptr(CancelException(XBT_THROW_POINT, "Communication canceled by the receiver"));
-          break;
-
-        default:
-          xbt_assert(state_ == State::DONE, "Internal error in CommImpl::finish(): unexpected synchro state %s",
-                     to_c_str(state_));
-      }
+      set_exception(simcall->issuer_);
       simcall->issuer_->simcall_answer();
     }
     /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */