if (action->get_activity()->get_actor() == maestro_)
action->get_activity()->get_iface()->complete(s4u::Activity::State::FAILED);
- activity::ActivityImplPtr(action->get_activity())->post();
+ activity::ActivityImplPtr(action->get_activity())->finish();
}
}
XBT_DEBUG("Handling the terminated actions (if any)");
if (action->get_activity()->get_actor() == maestro_)
action->get_activity()->get_iface()->complete(s4u::Activity::State::FINISHED);
- activity::ActivityImplPtr(action->get_activity())->post();
+ activity::ActivityImplPtr(action->get_activity())->finish();
}
}
}
bool ActivityImpl::test(actor::ActorImpl* issuer)
{
if (state_ != State::WAITING && state_ != State::RUNNING) {
- post();
+ finish();
issuer->exception_ = nullptr; // Do not propagate exception in that case
return true;
}
/* If the synchro is already finished then perform the error handling */
if (state_ != State::WAITING && state_ != State::RUNNING) {
- post();
+ finish();
} else {
/* we need a sleep action (even when the timeout is infinite) to be notified of host failures */
/* Comms handle that a bit differently of the other activities */
act->simcalls_.push_back(&issuer->simcall_);
observer->set_result(idx);
act->set_state(State::DONE);
- act->post();
+ act->finish();
}
return;
}
act->simcalls_.push_back(&issuer->simcall_);
/* see if the synchro is already finished */
if (act->get_state() != State::WAITING && act->get_state() != State::RUNNING) {
- act->post();
+ act->finish();
break;
}
}
virtual void resume();
virtual void cancel();
- virtual void post() = 0; // Called by the main loop when the activity is marked as terminated or failed by its model.
- // Setups the status, clean things up, and call finish()
virtual void set_exception(actor::ActorImpl* issuer) = 0; // Raising exceptions and stuff
- virtual void finish() = 0; // Unlock all simcalls blocked on that activity, either because it was marked as done by
- // the model or because it terminated without waiting for the model
+ virtual void finish() = 0; // Setups the status, clean things up, unlock all simcalls blocked on that activity.
s4u::Host* get_host() const { return hosts_.front(); }
const std::vector<s4u::Host*>& get_hosts() const { return hosts_; };
// Already in the queue
}
}
-void BarrierAcquisitionImpl::post()
-{
- finish();
-}
void BarrierAcquisitionImpl::finish()
{
for (auto const& acqui : ongoing_acquisitions_) {
acqui->granted_ = true;
if (acqui == acqui->get_issuer()->waiting_synchro_)
- acqui->post();
+ acqui->finish();
// else, the issuer is not blocked on this acquisition so no need to release it
}
ongoing_acquisitions_.clear(); // Rearm the barier for subsequent uses
bool test(actor::ActorImpl* issuer = nullptr) override;
void wait_for(actor::ActorImpl* issuer, double timeout) override;
- void post() override;
void finish() override;
void set_exception(actor::ActorImpl* issuer) override
{ /* nothing to do */
XBT_DEBUG("Communication from '%s' to '%s' failed to start because of a link failure", from_->get_cname(),
to_->get_cname());
set_state(State::LINK_FAILURE);
- post();
+ finish();
} else if ((src_actor_ != nullptr && src_actor_->is_suspended()) ||
(dst_actor_ != nullptr && dst_actor_->is_suspended())) {
if (MC_is_active() || MC_record_replay_is_active()) {
// FIXME: what about timeouts?
set_state(State::DONE);
- post();
+ finish();
return;
}
ActivityImpl::wait_for(issuer, timeout);
}
}
-void CommImpl::post()
-{
- on_completion(*this);
-
- /* Update synchro state */
- if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
- set_state(State::SRC_TIMEOUT);
- else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED)
- set_state(State::DST_TIMEOUT);
- else if ((from_ && not from_->is_on()) || (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED))
- set_state(State::SRC_HOST_FAILURE);
- else if ((to_ && not to_->is_on()) || (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED))
- set_state(State::DST_HOST_FAILURE);
- else if (model_action_ && model_action_->get_state() == resource::Action::State::FAILED) {
- set_state(State::LINK_FAILURE);
- } else if (get_state() == State::RUNNING) {
- xbt_assert(from_ && from_->is_on());
- xbt_assert(to_ && to_->is_on());
- set_state(State::DONE);
- }
-
- XBT_DEBUG("CommImpl::post(): comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
- src_actor_.get(), dst_actor_.get(), detached_);
-
- /* destroy the model actions associated with the communication activity */
- clean_action();
-
- /* Answer all simcalls associated with the synchro */
- finish();
-}
void CommImpl::set_exception(actor::ActorImpl* issuer)
{
switch (get_state()) {
void CommImpl::finish()
{
- XBT_DEBUG("CommImpl::finish() in state %s", get_state_str());
+ XBT_DEBUG("CommImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p, detached: %d", this, get_state_str(),
+ src_actor_.get(), dst_actor_.get(), detached_);
+
+ on_completion(*this);
+
+ /* Update synchro state */
+ if (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FINISHED)
+ set_state(State::SRC_TIMEOUT);
+ else if (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FINISHED)
+ set_state(State::DST_TIMEOUT);
+ else if ((from_ && not from_->is_on()) ||
+ (src_timeout_ && src_timeout_->get_state() == resource::Action::State::FAILED))
+ set_state(State::SRC_HOST_FAILURE);
+ else if ((to_ && not to_->is_on()) || (dst_timeout_ && dst_timeout_->get_state() == resource::Action::State::FAILED))
+ set_state(State::DST_HOST_FAILURE);
+ else if (model_action_ && model_action_->get_state() == resource::Action::State::FAILED) {
+ set_state(State::LINK_FAILURE);
+ } else if (get_state() == State::RUNNING) {
+ xbt_assert(from_ && from_->is_on());
+ xbt_assert(to_ && to_->is_on());
+ set_state(State::DONE);
+ }
+
+ /* destroy the model actions associated with the communication activity */
+ clean_action();
+
/* If the synchro is still in a rendez-vous point then remove from it */
if (mbox_)
mbox_->remove(this);
void suspend() override;
void resume() override;
void cancel() override;
- void post() override;
void set_exception(actor::ActorImpl* issuer) override;
void finish() override;
return *this;
}
-void ExecImpl::post()
-{
- if (model_action_ != nullptr) {
- if (auto const& hosts = get_hosts();
- std::any_of(hosts.begin(), hosts.end(), [](const s4u::Host* host) { return not host->is_on(); })) {
- /* If one of the hosts running the synchro failed, notice it. This way, the asking
- * process can be killed if it runs on that host itself */
- set_state(State::FAILED);
- } else if (model_action_->get_state() == resource::Action::State::FAILED) {
- /* If all the hosts are running the synchro didn't fail, then the synchro was canceled */
- set_state(State::CANCELED);
- } else {
- set_state(State::DONE);
- }
-
- clean_action();
- }
-
- if (get_actor() != nullptr)
- get_actor()->activities_.erase(this);
-
- if (get_state() != State::FAILED && cb_id_ >= 0)
- s4u::Host::on_state_change.disconnect(cb_id_);
- /* Answer all simcalls associated with the synchro */
- finish();
-}
-
void ExecImpl::set_exception(actor::ActorImpl* issuer)
{
switch (get_state()) {
void ExecImpl::finish()
{
XBT_DEBUG("ExecImpl::finish() in state %s", get_state_str());
+ if (model_action_ != nullptr) {
+ if (auto const& hosts = get_hosts();
+ std::any_of(hosts.begin(), hosts.end(), [](const s4u::Host* host) { return not host->is_on(); })) {
+ /* If one of the hosts running the synchro failed, notice it. This way, the asking
+ * process can be killed if it runs on that host itself */
+ set_state(State::FAILED);
+ } else if (model_action_->get_state() == resource::Action::State::FAILED) {
+ /* If all the hosts are running the synchro didn't fail, then the synchro was canceled */
+ set_state(State::CANCELED);
+ } else {
+ set_state(State::DONE);
+ }
+
+ clean_action();
+ }
+
+ if (get_actor() != nullptr)
+ get_actor()->activities_.erase(this);
+
+ if (get_state() != State::FAILED && cb_id_ >= 0)
+ s4u::Host::on_state_change.disconnect(cb_id_);
+
while (not simcalls_.empty()) {
actor::Simcall* simcall = simcalls_.front();
simcalls_.pop_front();
virtual ActivityImpl* migrate(s4u::Host* to);
ExecImpl* start();
- void post() override;
void set_exception(actor::ActorImpl* issuer) override;
void finish() override;
return this;
}
-void IoImpl::post()
-{
- if (model_action_ != nullptr) {
- performed_ioops_ = model_action_->get_cost();
- if (model_action_->get_state() == resource::Action::State::FAILED) {
- if (host_ && dst_host_) { // this is an I/O stream
- if (not host_->is_on())
- set_state(State::SRC_HOST_FAILURE);
- else if (not dst_host_->is_on())
- set_state(State::DST_HOST_FAILURE);
- } else if ((disk_ && not disk_->is_on()) || (dst_disk_ && not dst_disk_->is_on()))
- set_state(State::FAILED);
- else
- set_state(State::CANCELED);
- } else {
- set_state(State::DONE);
- }
-
- clean_action();
- }
-
- /* Answer all simcalls associated with the synchro */
- finish();
-}
void IoImpl::set_exception(actor::ActorImpl* issuer)
{
switch (get_state()) {
void IoImpl::finish()
{
XBT_DEBUG("IoImpl::finish() in state %s", get_state_str());
+ if (model_action_ != nullptr) {
+ performed_ioops_ = model_action_->get_cost();
+ if (model_action_->get_state() == resource::Action::State::FAILED) {
+ if (host_ && dst_host_) { // this is an I/O stream
+ if (not host_->is_on())
+ set_state(State::SRC_HOST_FAILURE);
+ else if (not dst_host_->is_on())
+ set_state(State::DST_HOST_FAILURE);
+ } else if ((disk_ && not disk_->is_on()) || (dst_disk_ && not dst_disk_->is_on()))
+ set_state(State::FAILED);
+ else
+ set_state(State::CANCELED);
+ } else {
+ set_state(State::DONE);
+ }
+
+ clean_action();
+ }
+
while (not simcalls_.empty()) {
actor::Simcall* simcall = simcalls_.front();
simcalls_.pop_front();
resource::DiskImpl* get_disk() const { return disk_; }
IoImpl* start();
- void post() override;
void set_exception(actor::ActorImpl* issuer) override;
void finish() override;
};
/** @brief Removes all communication activities from a mailbox
*/
-void MailboxImpl::clear( bool do_post )
+void MailboxImpl::clear(bool do_finish)
{
// CommImpl::cancel() will remove the comm from the mailbox..
for (auto comm : done_comm_queue_) {
comm->cancel();
comm->set_state(State::FAILED);
- if(do_post)
- comm->post();
+ if (do_finish)
+ comm->finish();
}
done_comm_queue_.clear();
if (comm->get_state() == State::WAITING && not comm->is_detached()) {
comm->cancel();
comm->set_state(State::FAILED);
- if(do_post)
- comm->post();
+ if (do_finish)
+ comm->finish();
} else
comm_queue_.pop_back();
}
void push(CommImplPtr comm);
void push_done(CommImplPtr done_comm) { done_comm_queue_.push_back(done_comm); }
void remove(const CommImplPtr& comm);
- void clear(bool do_post );
+ void clear(bool do_finish);
CommImplPtr iprobe(int type, const std::function<bool(void*, void*, CommImpl*)>& match_fun, void* data);
CommImplPtr find_matching_comm(CommImplType type, const std::function<bool(void*, void*, CommImpl*)>& match_fun,
void* this_user_data, const CommImplPtr& my_synchro, bool done, bool remove_matching);
this->register_simcall(&issuer_->simcall_); // Block on that acquisition
if (mutex_->get_owner() == issuer_) { // I'm the owner
- post();
+ finish();
} else {
// Already in the queue
}
}
-void MutexAcquisitionImpl::post()
-{
- finish();
-}
void MutexAcquisitionImpl::finish()
{
owner_ = acq->get_issuer();
if (acq == owner_->waiting_synchro_)
- acq->post();
+ acq->finish();
// else, the issuer is not blocked on this acquisition so no need to release it
} else {
bool test(actor::ActorImpl* issuer = nullptr) override;
void wait_for(actor::ActorImpl* issuer, double timeout) override;
- void post() override;
void finish() override;
void set_exception(actor::ActorImpl* issuer) override
{ /* nothing to do */
this->register_simcall(&issuer_->simcall_); // Block on that acquisition
if (granted_) {
- post();
+ finish();
} else if (timeout > 0) {
model_action_ = get_issuer()->get_host()->get_cpu()->sleep(timeout);
model_action_->set_activity(this);
// Already in the queue
}
}
-void SemAcquisitionImpl::post()
-{
- finish();
-}
void SemAcquisitionImpl::finish()
{
xbt_assert(simcalls_.size() == 1, "Unexpected number of simcalls waiting: %zu", simcalls_.size());
acqui->granted_ = true;
if (acqui == acqui->get_issuer()->waiting_synchro_)
- acqui->post();
+ acqui->finish();
// else, the issuer is not blocked on this acquisition so no need to release it
} else {
bool test(actor::ActorImpl* issuer = nullptr) override { return granted_; }
void wait_for(actor::ActorImpl* issuer, double timeout) override;
- void post() override;
void finish() override;
void cancel() override;
void set_exception(actor::ActorImpl* issuer) override
return this;
}
-void SleepImpl::post()
+void SleepImpl::set_exception(actor::ActorImpl* issuer)
+{
+ /* FIXME: Really, nothing bad can happen while we sleep? */
+}
+void SleepImpl::finish()
{
if (model_action_->get_state() == resource::Action::State::FAILED) {
if (host_ && not host_->is_on())
}
clean_action();
- /* Answer all simcalls associated with the synchro */
- finish();
-}
-void SleepImpl::set_exception(actor::ActorImpl* issuer)
-{
- /* FIXME: Really, nothing bad can happen while we sleep? */
-}
-void SleepImpl::finish()
-{
XBT_DEBUG("SleepImpl::finish() in state %s", get_state_str());
while (not simcalls_.empty()) {
const actor::Simcall* simcall = simcalls_.front();
public:
SleepImpl& set_host(s4u::Host* host);
SleepImpl& set_duration(double duration);
- void post() override;
void set_exception(actor::ActorImpl* issuer) override;
void finish() override;
SleepImpl* start();
/* I cannot cancel raw synchros directly. */
}
-void SynchroImpl::post()
-{
- if (model_action_->get_state() == resource::Action::State::FAILED)
- set_state(State::FAILED);
- else if (model_action_->get_state() == resource::Action::State::FINISHED)
- set_state(State::SRC_TIMEOUT);
-
- clean_action();
- /* Answer all simcalls associated with the synchro */
- finish();
-}
void SynchroImpl::set_exception(actor::ActorImpl* issuer)
{
if (get_state() == State::FAILED) {
void SynchroImpl::finish()
{
XBT_DEBUG("SynchroImpl::finish() in state %s", get_state_str());
+ if (model_action_->get_state() == resource::Action::State::FAILED)
+ set_state(State::FAILED);
+ else if (model_action_->get_state() == resource::Action::State::FINISHED)
+ set_state(State::SRC_TIMEOUT);
+
+ clean_action();
+
xbt_assert(simcalls_.size() == 1, "Unexpected number of simcalls waiting: %zu", simcalls_.size());
actor::Simcall* simcall = simcalls_.front();
simcalls_.pop_front();
void suspend() override;
void resume() override;
void cancel() override;
- void post() override;
void set_exception(actor::ActorImpl* issuer) override;
void finish() override;
};
activity::ActivityImplPtr activity = waiting_synchro_;
activity->cancel();
activity->set_state(activity::State::FAILED);
- activity->post();
+ activity->finish();
activities_.erase(waiting_synchro_);
waiting_synchro_ = nullptr;
if (not h.is_on() && pimpl->get_state() == kernel::activity::State::RUNNING &&
std::find(pimpl->get_hosts().begin(), pimpl->get_hosts().end(), &h) != pimpl->get_hosts().end()) {
pimpl->set_state(kernel::activity::State::FAILED);
- pimpl->post();
+ pimpl->finish();
}
});
pimpl->set_cb_id(cb_id);