ActivityImpl::wait_for(issuer, timeout);
}
+void MessImpl::cancel()
+{
+ /* if the synchro is a waiting state means that it is still in a mbox so remove from it and delete it */
+ if (get_state() == State::WAITING) {
+ queue_->remove(this);
+ set_state(State::CANCELED);
+ }
+}
+
void MessImpl::finish()
{
- XBT_DEBUG("MessImpl::finish() comm %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
+ XBT_DEBUG("MessImpl::finish() mess %p, state %s, src_proc %p, dst_proc %p", this, get_state_str(),
src_actor_.get(), dst_actor_.get());
if (get_iface()) {
void wait_for(actor::ActorImpl* issuer, double timeout) override;
MessImpl* start();
+ void suspend() override { /* no action to suspend for Mess */ }
+ void resume() override { /* no action to resume for Mess */ }
+ void cancel() override;
void set_exception(actor::ActorImpl* issuer) override {};
void finish() override;
unsigned MessageQueueImpl::next_id_ = 0;
+MessageQueueImpl::~MessageQueueImpl()
+{
+ try {
+ clear();
+ } catch (const std::bad_alloc& ba) {
+ XBT_ERROR("MessageQueueImpl::clear() failure: %s", ba.what());
+ }
+}
+
+/** @brief Removes all message activities from a message queue */
+void MessageQueueImpl::clear()
+{
+ while (not queue_.empty()) {
+ auto mess = queue_.back();
+ if (mess->get_state() == State::WAITING) {
+ mess->cancel();
+ mess->set_state(State::FAILED);
+ } else
+ queue_.pop_back();
+ }
+ xbt_assert(queue_.empty());
+}
+
void MessageQueueImpl::push(const MessImplPtr& mess)
{
mess->set_queue(this);
MessageQueueImpl& operator=(const MailboxImpl&) = delete;
public:
+ ~MessageQueueImpl();
+
/** @brief Public interface */
unsigned get_id() const { return id_; }
const char* get_cname() const { return name_.c_str(); }
void push(const MessImplPtr& mess);
void remove(const MessImplPtr& mess);
+ void clear();
bool empty() const { return queue_.empty(); }
size_t size() const { return queue_.size(); }
const MessImplPtr& front() const { return queue_.front(); }