X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/f3b7e5f4b4d7c87ee3e8827313ec966ea8fc8387..5ed37babb2fa9097abe82df299c0aa259ed84d5a:/src/kernel/activity/ActivityImpl.cpp diff --git a/src/kernel/activity/ActivityImpl.cpp b/src/kernel/activity/ActivityImpl.cpp index 7e163b0e11..b16a24b7c9 100644 --- a/src/kernel/activity/ActivityImpl.cpp +++ b/src/kernel/activity/ActivityImpl.cpp @@ -1,19 +1,26 @@ -/* Copyright (c) 2007-2020. The SimGrid Team. All rights reserved. */ +/* Copyright (c) 2007-2023. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ +#include +#include +#include + #include "src/kernel/activity/ActivityImpl.hpp" -#include "simgrid/modelchecker.h" +#include "src/kernel/activity/CommImpl.hpp" +#include "src/kernel/activity/Synchro.hpp" +#include "src/kernel/actor/ActorImpl.hpp" +#include "src/kernel/actor/SimcallObserver.hpp" +#include "src/kernel/resource/CpuImpl.hpp" #include "src/mc/mc_replay.hpp" -#include "src/simix/smx_private.hpp" + +#include #include // isfinite() -XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_process); +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(ker_activity, kernel, "Kernel activity-related synchronization"); -namespace simgrid { -namespace kernel { -namespace activity { +namespace simgrid::kernel::activity { ActivityImpl::~ActivityImpl() { @@ -21,12 +28,20 @@ ActivityImpl::~ActivityImpl() XBT_DEBUG("Destroy activity %p", this); } -void ActivityImpl::register_simcall(smx_simcall_t simcall) +void ActivityImpl::register_simcall(actor::Simcall* simcall) { simcalls_.push_back(simcall); simcall->issuer_->waiting_synchro_ = this; } +void ActivityImpl::unregister_simcall(actor::Simcall* simcall) +{ + // Remove the first occurrence of simcall: + auto j = boost::range::find(simcalls_, simcall); + if (j != simcalls_.end()) + simcalls_.erase(j); +} + void ActivityImpl::clean_action() { if (surf_action_) { @@ -40,45 +55,123 @@ double ActivityImpl::get_remaining() const return surf_action_ ? surf_action_->get_remains() : 0; } -bool ActivityImpl::test() +const char* ActivityImpl::get_state_str() const +{ + return to_c_str(state_); +} + +bool ActivityImpl::test(actor::ActorImpl* issuer) { if (state_ != State::WAITING && state_ != State::RUNNING) { finish(); + issuer->exception_ = nullptr; // Do not propagate exception in that case return true; } + + if (auto* observer = dynamic_cast(issuer->simcall_.observer_)) + observer->set_result(false); + return false; } +ssize_t ActivityImpl::test_any(actor::ActorImpl* issuer, const std::vector& activities) +{ + auto* observer = dynamic_cast(issuer->simcall_.observer_); + xbt_assert(observer != nullptr); + + if (MC_is_active() || MC_record_replay_is_active()) { + int idx = observer->get_value(); + xbt_assert(idx == -1 || activities[idx]->test(issuer)); + return idx; + } + + for (std::size_t i = 0; i < activities.size(); ++i) { + if (activities[i]->test(issuer)) { + observer->set_result(i); + return i; + } + } + return -1; +} + void ActivityImpl::wait_for(actor::ActorImpl* issuer, double timeout) { - XBT_DEBUG("Wait for execution of synchro %p, state %d", this, (int)state_); + XBT_DEBUG("Wait for execution of synchro %p, state %s", this, get_state_str()); xbt_assert(std::isfinite(timeout), "timeout is not finite!"); /* Associate this simcall to the synchro */ register_simcall(&issuer->simcall_); - if (MC_is_active() || MC_record_replay_is_active()) { - int idx = SIMCALL_GET_MC_VALUE(issuer->simcall_); - if (idx == 0) { - state_ = simgrid::kernel::activity::State::DONE; + xbt_assert(not MC_is_active() && not MC_record_replay_is_active(), "MC is currently not supported here."); + + /* If the synchro is already finished then perform the error handling */ + if (state_ != State::WAITING && state_ != State::RUNNING) { + finish(); + } else { + /* we need a sleep action (even when the timeout is infinite) to be notified of host failures */ + /* Comms handle that a bit differently of the other activities */ + if (auto* comm = dynamic_cast(this)) { + resource::Action* sleep_action = issuer->get_host()->get_cpu()->sleep(timeout); + sleep_action->set_activity(comm); + + if (issuer == comm->src_actor_) + comm->src_timeout_ = sleep_action; + else + comm->dst_timeout_ = sleep_action; } else { - /* If we reached this point, the wait simcall must have a timeout */ - /* Otherwise it shouldn't be enabled and executed by the MC */ - if (timeout < 0.0) - THROW_IMPOSSIBLE; - state_ = simgrid::kernel::activity::State::TIMEOUT; + SynchroImplPtr synchro(new SynchroImpl([this, issuer]() { + this->unregister_simcall(&issuer->simcall_); + issuer->waiting_synchro_ = nullptr; + issuer->exception_ = nullptr; + auto* observer = dynamic_cast(issuer->simcall_.observer_); + xbt_assert(observer != nullptr); + observer->set_result(true); // Returns that the wait_for timeouted + })); + synchro->set_host(issuer->get_host()).set_timeout(timeout).start(); + synchro->register_simcall(&issuer->simcall_); + } + } +} + +void ActivityImpl::wait_any_for(actor::ActorImpl* issuer, const std::vector& activities, double timeout) +{ + XBT_DEBUG("Wait for execution of any synchro"); + if (MC_is_active() || MC_record_replay_is_active()) { + auto* observer = dynamic_cast(issuer->simcall_.observer_); + xbt_assert(observer != nullptr); + xbt_assert(timeout <= 0.0, "Timeout not implemented for waitany in the model-checker"); + if (int idx = observer->get_value(); idx != -1) { + auto* act = activities.at(idx); + act->simcalls_.push_back(&issuer->simcall_); + observer->set_result(idx); + act->set_state(State::DONE); + act->finish(); } - finish(); return; } - /* If the synchro is already finished then perform the error handling */ - if (state_ != simgrid::kernel::activity::State::RUNNING) - finish(); - else { - /* we need a sleep action (even when there is no timeout) to be notified of host failures */ - set_timeout(timeout); + if (timeout < 0.0) { + issuer->simcall_.timeout_cb_ = nullptr; + } else { + issuer->simcall_.timeout_cb_ = timer::Timer::set(s4u::Engine::get_clock() + timeout, [issuer, &activities]() { + issuer->simcall_.timeout_cb_ = nullptr; + for (auto* act : activities) + act->unregister_simcall(&issuer->simcall_); + // default result (-1) is set in actor::ActivityWaitanySimcall + issuer->simcall_answer(); + }); } + + for (auto* act : activities) { + /* associate this simcall to the the synchro */ + act->simcalls_.push_back(&issuer->simcall_); + /* see if the synchro is already finished */ + if (act->get_state() != State::WAITING && act->get_state() != State::RUNNING) { + act->finish(); + break; + } + } + XBT_DEBUG("Exit from ActivityImlp::wait_any_for"); } void ActivityImpl::suspend() @@ -87,7 +180,7 @@ void ActivityImpl::suspend() return; XBT_VERB("This activity is suspended (remain: %f)", surf_action_->get_remains()); surf_action_->suspend(); - on_suspended(*this); + s4u::Activity::on_suspended(*get_iface()); } void ActivityImpl::resume() @@ -96,7 +189,7 @@ void ActivityImpl::resume() return; XBT_VERB("This activity is resumed (remain: %f)", surf_action_->get_remains()); surf_action_->resume(); - on_resumed(*this); + s4u::Activity::on_resumed(*get_iface()); } void ActivityImpl::cancel() @@ -107,21 +200,40 @@ void ActivityImpl::cancel() state_ = State::CANCELED; } +void ActivityImpl::handle_activity_waitany(actor::Simcall* simcall) +{ + /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany + * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the + * simcall */ + if (auto* observer = dynamic_cast(simcall->observer_)) { + if (simcall->timeout_cb_) { + simcall->timeout_cb_->remove(); + simcall->timeout_cb_ = nullptr; + } + + auto activities = observer->get_activities(); + for (auto* act : activities) + act->unregister_simcall(simcall); + + if (not MC_is_active() && not MC_record_replay_is_active()) { + auto element = std::find(activities.begin(), activities.end(), this); + int rank = element != activities.end() ? static_cast(std::distance(activities.begin(), element)) : -1; + observer->set_result(rank); + } + } +} + // boost::intrusive_ptr support: -void intrusive_ptr_add_ref(simgrid::kernel::activity::ActivityImpl* activity) +void intrusive_ptr_add_ref(ActivityImpl* activity) { activity->refcount_.fetch_add(1, std::memory_order_relaxed); } -void intrusive_ptr_release(simgrid::kernel::activity::ActivityImpl* activity) +void intrusive_ptr_release(ActivityImpl* activity) { if (activity->refcount_.fetch_sub(1, std::memory_order_release) == 1) { std::atomic_thread_fence(std::memory_order_acquire); delete activity; } } -xbt::signal ActivityImpl::on_resumed; -xbt::signal ActivityImpl::on_suspended; -} -} -} // namespace simgrid::kernel::activity:: +} // namespace simgrid::kernel::activity