Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
try to get rid of a warning (without causing a dead store in infer?)
[simgrid.git] / src / s4u / s4u_Comm.cpp
index b30f311..e834664 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2006-2021. The SimGrid Team. All rights reserved.          */
+/* Copyright (c) 2006-2022. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 #include "src/kernel/activity/CommImpl.hpp"
 #include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
 
 namespace simgrid {
 namespace s4u {
-xbt::signal<void(Comm const&, bool is_sender)> Comm::on_start;
+xbt::signal<void(Comm const&)> Comm::on_send;
+xbt::signal<void(Comm const&)> Comm::on_recv;
 xbt::signal<void(Comm const&)> Comm::on_completion;
 
-void Comm::complete(Activity::State state)
-{
-  Activity::complete(state);
-  on_completion(*this);
-}
-
 Comm::~Comm()
 {
   if (state_ == State::STARTED && not detached_ &&
-      (pimpl_ == nullptr || pimpl_->state_ == kernel::activity::State::RUNNING)) {
+      (pimpl_ == nullptr || pimpl_->get_state() == kernel::activity::State::RUNNING)) {
     XBT_INFO("Comm %p freed before its completion. Did you forget to detach it? (state: %s)", this, get_state_str());
     if (pimpl_ != nullptr)
       XBT_INFO("pimpl_->state: %s", pimpl_->get_state_str());
@@ -43,22 +39,21 @@ Comm::~Comm()
 
 ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
 {
-  std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
-  std::transform(begin(comms), end(comms), begin(rcomms),
-                 [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
+  std::vector<ActivityPtr> activities;
+  for (const auto& comm : comms)
+    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
   ssize_t changed_pos;
   try {
-    changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
+    changed_pos = Activity::wait_any_for(activities, timeout);
   } catch (const NetworkFailureException& e) {
+    changed_pos = -1;
     for (auto c : comms) {
-      if (c->pimpl_->state_ == kernel::activity::State::FAILED) {
+      if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
         c->complete(State::FAILED);
       }
     }
     e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
   }
-  if (changed_pos != -1)
-    comms.at(changed_pos)->complete(State::FINISHED);
   return changed_pos;
 }
 
@@ -90,6 +85,28 @@ size_t Comm::wait_all_for(const std::vector<CommPtr>& comms, double timeout)
   return comms.size();
 }
 
+CommPtr Comm::set_source(Host* from)
+{
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "Cannot change the source of a Comm once it's started (state: %s)", to_c_str(state_));
+  from_ = from;
+  // Setting 'from_' may allow to start the activity, let's try
+  vetoable_start();
+
+  return this;
+}
+
+CommPtr Comm::set_destination(Host* to)
+{
+  xbt_assert(state_ == State::INITED || state_ == State::STARTING,
+             "Cannot change the destination of a Comm once it's started (state: %s)", to_c_str(state_));
+  to_ = to;
+  // Setting 'to_' may allow to start the activity, let's try
+  vetoable_start();
+
+  return this;
+}
+
 CommPtr Comm::set_rate(double rate)
 {
   xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
@@ -159,9 +176,16 @@ CommPtr Comm::set_payload_size(uint64_t bytes)
   return this;
 }
 
-CommPtr Comm::sendto_init(Host* from, Host* to)
+CommPtr Comm::sendto_init()
 {
   CommPtr res(new Comm());
+  res->sender_ = kernel::actor::ActorImpl::self();
+  return res;
+}
+
+CommPtr Comm::sendto_init(Host* from, Host* to)
+{
+  auto res   = Comm::sendto_init();
   res->from_ = from;
   res->to_   = to;
 
@@ -195,15 +219,31 @@ Comm* Comm::start()
     });
 
   } else if (src_buff_ != nullptr) { // Sender side
-    on_start(*this, true /* is_sender*/);
-    pimpl_ = simcall_comm_isend(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
-                                clean_fun_, copy_data_function_, get_user_data(), detached_);
+    on_send(*this);
+    kernel::actor::CommIsendSimcall observer{sender_,
+                                             mailbox_->get_impl(),
+                                             remains_,
+                                             rate_,
+                                             static_cast<unsigned char*>(src_buff_),
+                                             src_buff_size_,
+                                             match_fun_,
+                                             clean_fun_,
+                                             copy_data_function_,
+                                             get_data<void>(),
+                                             detached_};
+    pimpl_ = kernel::actor::simcall([&observer] { return kernel::activity::CommImpl::isend(&observer); }, &observer);
   } else if (dst_buff_ != nullptr) { // Receiver side
     xbt_assert(not detached_, "Receive cannot be detached");
-    on_start(*this, false /*is_sender*/);
-    pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
-                                copy_data_function_, get_user_data(), rate_);
-
+    on_recv(*this);
+    kernel::actor::CommIrecvSimcall observer{receiver_,
+                                             mailbox_->get_impl(),
+                                             static_cast<unsigned char*>(dst_buff_),
+                                             &dst_buff_size_,
+                                             match_fun_,
+                                             copy_data_function_,
+                                             get_data<void>(),
+                                             rate_};
+    pimpl_ = kernel::actor::simcall([&observer] { return kernel::activity::CommImpl::irecv(&observer); }, &observer);
   } else {
     xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
   }
@@ -211,6 +251,11 @@ Comm* Comm::start()
   if (suspended_)
     pimpl_->suspend();
 
+  if (not detached_) {
+    pimpl_->set_iface(this);
+    pimpl_->set_actor(sender_);
+  }
+
   state_ = State::STARTED;
   return this;
 }
@@ -223,32 +268,39 @@ Comm* Comm::start()
  *                Negative values denote infinite wait times. 0 as a timeout returns immediately. */
 Comm* Comm::wait_for(double timeout)
 {
+  XBT_DEBUG("Calling Comm::wait_for with state %s", get_state_str());
+  kernel::actor::ActorImpl* issuer = nullptr;
   switch (state_) {
     case State::FINISHED:
       break;
     case State::FAILED:
       throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
-
     case State::INITED:
     case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
       if (from_ != nullptr || to_ != nullptr) {
         return vetoable_start()->wait_for(timeout); // In the case of host2host comm, do it in two simcalls
       } else if (src_buff_ != nullptr) {
-        on_start(*this, true /*is_sender*/);
+        on_send(*this);
         simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
-                          copy_data_function_, get_user_data(), timeout);
+                          copy_data_function_, get_data<void>(), timeout);
 
       } else { // Receiver
-        on_start(*this, false /*is_sender*/);
+        on_recv(*this);
         simcall_comm_recv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_, copy_data_function_,
-                          get_user_data(), timeout, rate_);
+                          get_data<void>(), timeout, rate_);
       }
       break;
-
     case State::STARTED:
       try {
-        simcall_comm_wait(get_impl(), timeout);
+        issuer = kernel::actor::ActorImpl::self();
+        kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
+        if (kernel::actor::simcall_blocking(
+                [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+                &observer)) {
+          throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+        }
       } catch (const NetworkFailureException& e) {
+        issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
         complete(State::FAILED);
         e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
       }
@@ -266,13 +318,10 @@ Comm* Comm::wait_for(double timeout)
 
 ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
 {
-  std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
-  std::transform(begin(comms), end(comms), begin(rcomms),
-                 [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
-  ssize_t changed_pos = simcall_comm_testany(rcomms.data(), rcomms.size());
-  if (changed_pos != -1)
-    comms.at(changed_pos)->complete(State::FINISHED);
-  return changed_pos;
+  std::vector<ActivityPtr> activities;
+  for (const auto& comm : comms)
+    activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
+  return Activity::test_any(activities);
 }
 
 Comm* Comm::detach()
@@ -285,24 +334,6 @@ Comm* Comm::detach()
   return this;
 }
 
-bool Comm::test() // TODO: merge with Activity::test, once modernized
-{
-  xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::STARTING ||
-             state_ == State::CANCELED || state_ == State::FINISHED);
-
-  if (state_ == State::CANCELED || state_ == State::FINISHED)
-    return true;
-
-  if (state_ == State::INITED || state_ == State::STARTING)
-    this->vetoable_start();
-
-  if (simcall_comm_test(get_impl())) {
-    complete(State::FINISHED);
-    return true;
-  }
-  return false;
-}
-
 Mailbox* Comm::get_mailbox() const
 {
   return mailbox_;