comm->wait_for(simcall->issuer_, timeout);
}
-bool simcall_HANDLER_comm_test(smx_simcall_t, simgrid::kernel::activity::CommImpl* comm)
+bool simcall_HANDLER_comm_test(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comm)
{
- return comm->test();
+ return comm->test(simcall->issuer_);
}
ssize_t simcall_HANDLER_comm_testany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count)
{
- std::vector<simgrid::kernel::activity::CommImpl*> comms_vec(comms, comms + count);
- return simgrid::kernel::activity::CommImpl::test_any(simcall->issuer_, comms_vec);
+ std::vector<simgrid::kernel::activity::ActivityImpl*> comms_vec(comms, comms + count);
+ return simgrid::kernel::activity::ActivityImpl::test_any(simcall->issuer_, comms_vec);
}
void simcall_HANDLER_comm_waitany(smx_simcall_t simcall, simgrid::kernel::activity::CommImpl* comms[], size_t count,
copied_ = true;
}
-bool CommImpl::test()
+bool CommImpl::test(actor::ActorImpl* issuer)
{
if ((MC_is_active() || MC_record_replay_is_active()) && src_actor_ && dst_actor_)
set_state(State::DONE);
- return ActivityImpl::test();
+ return ActivityImpl::test(issuer);
}
void CommImpl::wait_for(actor::ActorImpl* issuer, double timeout)
/* Associate this simcall to the wait synchro */
register_simcall(&issuer->simcall_);
-
if (MC_is_active() || MC_record_replay_is_active()) {
int idx = issuer->simcall_.mc_value_;
if (idx == 0) {
finish();
return;
}
-
- /* If the synchro has already finish perform the error handling, */
- /* otherwise set up a waiting timeout on the right side */
- if (get_state() != State::WAITING && get_state() != State::RUNNING) {
- finish();
- } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */
- resource::Action* sleep = issuer->get_host()->get_cpu()->sleep(timeout);
- sleep->set_activity(this);
-
- if (issuer == src_actor_)
- src_timeout_ = sleep;
- else
- dst_timeout_ = sleep;
- }
-}
-
-ssize_t CommImpl::test_any(const actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms)
-{
- if (MC_is_active() || MC_record_replay_is_active()) {
- int idx = issuer->simcall_.mc_value_;
- xbt_assert(idx == -1 || comms[idx]->test());
- return idx;
- }
-
- for (std::size_t i = 0; i < comms.size(); ++i) {
- if (comms[i]->test())
- return i;
- }
- return -1;
+ ActivityImpl::wait_for(issuer, timeout);
}
void CommImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector<CommImpl*>& comms, double timeout)
comm->finish();
return;
}
-
- if (timeout < 0.0) {
- issuer->simcall_.timeout_cb_ = nullptr;
- } else {
- issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, comms]() {
- // FIXME: Vector `comms' is copied here. Use a reference once its lifetime is extended (i.e. when the simcall is
- // modernized).
- issuer->simcall_.timeout_cb_ = nullptr;
- for (auto* comm : comms)
- comm->unregister_simcall(&issuer->simcall_);
- simcall_comm_waitany__set__result(&issuer->simcall_, -1);
- issuer->simcall_answer();
- });
- }
-
- for (auto* comm : comms) {
- /* associate this simcall to the the synchro */
- comm->simcalls_.push_back(&issuer->simcall_);
-
- /* see if the synchro is already finished */
- if (comm->get_state() != State::WAITING && comm->get_state() != State::RUNNING) {
- comm->finish();
- break;
- }
- }
+ std::vector<ActivityImpl*> activities(comms.begin(), comms.end());
+ ActivityImpl::wait_any_for(issuer, activities, timeout);
}
void CommImpl::suspend()
if (simcall->call_ == simix::Simcall::NONE) // FIXME: maybe a better way to handle this case
continue; // if actor handling comm is killed
- if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
- CommImpl** comms = simcall_comm_waitany__get__comms(simcall);
- size_t count = simcall_comm_waitany__get__count(simcall);
- for (size_t i = 0; i < count; i++)
- comms[i]->unregister_simcall(simcall);
- if (simcall->timeout_cb_) {
- simcall->timeout_cb_->remove();
- simcall->timeout_cb_ = nullptr;
- }
- if (not MC_is_active() && not MC_record_replay_is_active()) {
- auto element = std::find(comms, comms + count, this);
- ssize_t rank = (element != comms + count) ? element - comms : -1;
- simcall_comm_waitany__set__result(simcall, rank);
- }
- }
+
+ handle_activity_waitany(simcall);
/* Check out for errors */
set_exception(simcall->issuer_);
simcall->issuer_->simcall_answer();
}
- /* if there is an exception during a waitany or a testany, indicate the position of the failed communication */
- if (simcall->issuer_->exception_ &&
- (simcall->call_ == simix::Simcall::COMM_WAITANY || simcall->call_ == simix::Simcall::COMM_TESTANY)) {
- // First retrieve the rank of our failing synchro
- CommImpl** comms;
- size_t count;
- if (simcall->call_ == simix::Simcall::COMM_WAITANY) {
- comms = simcall_comm_waitany__get__comms(simcall);
- count = simcall_comm_waitany__get__count(simcall);
- } else {
- /* simcall->call_ == simix::Simcall::COMM_TESTANY */
- comms = simcall_comm_testany__get__comms(simcall);
- count = simcall_comm_testany__get__count(simcall);
- }
- auto element = std::find(comms, comms + count, this);
- ssize_t rank = (element != comms + count) ? element - comms : -1;
- // In order to modify the exception we have to rethrow it:
- try {
- std::rethrow_exception(simcall->issuer_->exception_);
- } catch (Exception& e) {
- e.set_value(rank);
- }
- }
simcall->issuer_->waiting_synchro_ = nullptr;
simcall->issuer_->activities_.remove(this);