Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Start to modernize the remaining old simcalls related to comms
[simgrid.git] / src / kernel / activity / CommImpl.cpp
index 39fe618..44a6e80 100644 (file)
@@ -185,15 +185,15 @@ void simcall_HANDLER_comm_wait(smx_simcall_t simcall, simgrid::kernel::activity:
   comm->wait_for(simcall->issuer_, timeout);
 }
 
-bool simcall_HANDLER_comm_test(smx_simcall_t, simgrid::kernel::activity::CommImpl* comm)
+bool simcall_HANDLER_comm_test(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm)
 {
-  return comm->test();
+  return comm->test(simcall->issuer_);
 }
 
 ssize_t simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count)
 {
-  std::vector<simgrid::kernel::activity::CommImpl*> comms_vec(comms, comms + count);
-  return simgrid::kernel::activity::CommImpl::test_any(simcall->issuer_, comms_vec);
+  std::vector<simgrid::kernel::activity::ActivityImpl*> comms_vec(comms, comms + count);
+  return simgrid::kernel::activity::ActivityImpl::test_any(simcall->issuer_, comms_vec);
 }
 
 void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count,
@@ -387,11 +387,11 @@ void CommImpl::copy_data()
   copied_ = true;
 }
 
-bool CommImpl::test()
+bool CommImpl::test(actor::ActorImpl* issuer)
 {
   if ((MC_is_active() || MC_record_replay_is_active()) && src_actor_ && dst_actor_)
     set_state(State::DONE);
-  return ActivityImpl::test();
+  return ActivityImpl::test(issuer);
 }
 
 void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
@@ -400,7 +400,6 @@ void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
 
   /* Associate this simcall to the wait synchro */
   register_simcall(&issuer->simcall_);
-
   if (MC_is_active() || MC_record_replay_is_active()) {
     int idx = issuer->simcall_.mc_value_;
     if (idx == 0) {
@@ -415,35 +414,7 @@ void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
     finish();
     return;
   }
-
-  /* If the synchro has already finish perform the error handling, */
-  /* otherwise set up a waiting timeout on the right side          */
-  if (get_state() != State::WAITING && get_state() != State::RUNNING) {
-    finish();
-  } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */
-    resource::Action* sleep = issuer->get_host()->get_cpu()->sleep(timeout);
-    sleep->set_activity(this);
-
-    if (issuer == src_actor_)
-      src_timeout_ = sleep;
-    else
-      dst_timeout_ = sleep;
-  }
-}
-
-ssize_t CommImpl::test_any(const actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms)
-{
-  if (MC_is_active() || MC_record_replay_is_active()) {
-    int idx = issuer->simcall_.mc_value_;
-    xbt_assert(idx == -1 || comms[idx]->test());
-    return idx;
-  }
-
-  for (std::size_t i = 0; i < comms.size(); ++i) {
-    if (comms[i]->test())
-      return i;
-  }
-  return -1;
+  ActivityImpl::wait_for(issuer, timeout);
 }
 
 void CommImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms, double timeout)
@@ -458,31 +429,8 @@ void CommImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl
     comm->finish();
     return;
   }
-
-  if (timeout < 0.0) {
-    issuer->simcall_.timeout_cb_ = nullptr;
-  } else {
-    issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, comms]() {
-      // FIXME: Vector `comms' is copied here. Use a reference once its lifetime is extended (i.e. when the simcall is
-      // modernized).
-      issuer->simcall_.timeout_cb_ = nullptr;
-      for (auto* comm : comms)
-        comm->unregister_simcall(&issuer->simcall_);
-      simcall_comm_waitany__set__result(&issuer->simcall_, -1);
-      issuer->simcall_answer();
-    });
-  }
-
-  for (auto* comm : comms) {
-    /* associate this simcall to the the synchro */
-    comm->simcalls_.push_back(&issuer->simcall_);
-
-    /* see if the synchro is already finished */
-    if (comm->get_state() != State::WAITING && comm->get_state() != State::RUNNING) {
-      comm->finish();
-      break;
-    }
-  }
+  std::vector<ActivityImpl*> activities(comms.begin(), comms.end());
+  ActivityImpl::wait_any_for(issuer, activities, timeout);
 }
 
 void CommImpl::suspend()
@@ -643,21 +591,8 @@ void CommImpl::finish()
 
     if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
       continue;                                 // if actor handling comm is killed
-    if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
-      CommImpl** comms = simcall_comm_waitany__get__comms(simcall);
-      size_t count     = simcall_comm_waitany__get__count(simcall);
-      for (size_t i = 0; i < count; i++)
-        comms[i]->unregister_simcall(simcall);
-      if (simcall->timeout_cb_) {
-        simcall->timeout_cb_->remove();
-        simcall->timeout_cb_ = nullptr;
-      }
-      if (not MC_is_active() && not MC_record_replay_is_active()) {
-        auto element = std::find(comms, comms + count, this);
-        ssize_t rank = (element != comms + count) ? element - comms : -1;
-        simcall_comm_waitany__set__result(simcall, rank);
-      }
-    }
+
+    handle_activity_waitany(simcall);
 
     /* Check out for errors */
 
@@ -667,29 +602,6 @@ void CommImpl::finish()
       set_exception(simcall->issuer_);
       simcall->issuer_->simcall_answer();
     }
-    /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
-    if (simcall->issuer_->exception_ &&
-        (simcall->call_ == simix::Simcall::COMM_WAITANY || simcall->call_ == simix::Simcall::COMM_TESTANY)) {
-      // First retrieve the rank of our failing synchro
-      CommImpl** comms;
-      size_t count;
-      if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
-        comms = simcall_comm_waitany__get__comms(simcall);
-        count = simcall_comm_waitany__get__count(simcall);
-      } else {
-        /* simcall->call_ == simix::Simcall::COMM_TESTANY */
-        comms = simcall_comm_testany__get__comms(simcall);
-        count = simcall_comm_testany__get__count(simcall);
-      }
-      auto element = std::find(comms, comms + count, this);
-      ssize_t rank = (element != comms + count) ? element - comms : -1;
-      // In order to modify the exception we have to rethrow it:
-      try {
-        std::rethrow_exception(simcall->issuer_->exception_);
-      } catch (Exception& e) {
-        e.set_value(rank);
-      }
-    }
 
     simcall->issuer_->waiting_synchro_ = nullptr;
     simcall->issuer_->activities_.remove(this);