std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
std::transform(begin(*comms), end(*comms), rcomms.get(),
[](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
- return simcall_comm_waitany(rcomms.get(), comms->size(), timeout);
+ int changed_pos = simcall_comm_waitany(rcomms.get(), comms->size(), timeout);
+ if (changed_pos != -1)
+ comms->at(changed_pos)->release_dependencies();
+ return changed_pos;
}
void Comm::wait_all(const std::vector<CommPtr>* comms)
src_buff_size_ = size;
return this;
}
+
CommPtr Comm::set_dst_data(void** buff)
{
xbt_assert(state_ == State::INITED, "You cannot use %s() once your communication started (not implemented)",
Comm* Comm::start()
{
- xbt_assert(get_state() == State::INITED, "You cannot use %s() once your communication started (not implemented)",
- __FUNCTION__);
+ xbt_assert(get_state() == State::INITED || get_state() == State::STARTING,
+ "You cannot use %s() once your communication started (not implemented)", __FUNCTION__);
if (src_buff_ != nullptr) { // Sender side
on_sender_start(*Actor::self());
case State::FINISHED:
break;
- case State::INITED: // It's not started yet. Do it in one simcall
+ case State::INITED:
+ case State::STARTING: // It's not started yet. Do it in one simcall
if (src_buff_ != nullptr) {
on_sender_start(*Actor::self());
simcall_comm_send(sender_, mailbox_->get_impl(), remains_, rate_, src_buff_, src_buff_size_, match_fun_,
get_user_data(), timeout, rate_);
}
state_ = State::FINISHED;
+ this->release_dependencies();
break;
case State::STARTED:
simcall_comm_wait(pimpl_, timeout);
on_completion(*Actor::self());
state_ = State::FINISHED;
+ this->release_dependencies();
break;
case State::CANCELED:
}
return this;
}
+
int Comm::test_any(const std::vector<CommPtr>* comms)
{
std::unique_ptr<kernel::activity::CommImpl* []> rcomms(new kernel::activity::CommImpl*[comms->size()]);
std::transform(begin(*comms), end(*comms), rcomms.get(),
[](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
- return simcall_comm_testany(rcomms.get(), comms->size());
+ int changed_pos = simcall_comm_testany(rcomms.get(), comms->size());
+ if (changed_pos != -1)
+ comms->at(changed_pos)->release_dependencies();
+ return changed_pos;
}
Comm* Comm::detach()
__FUNCTION__);
xbt_assert(src_buff_ != nullptr && src_buff_size_ != 0, "You can only detach sends, not recvs");
detached_ = true;
- return start();
+ vetoable_start();
+ return this;
}
Comm* Comm::cancel()
if (state_ == State::FINISHED)
return true;
- if (state_ == State::INITED)
- this->start();
+ if (state_ == State::INITED || state_ == State::STARTING)
+ this->vetoable_start();
if (simcall_comm_test(pimpl_)) {
state_ = State::FINISHED;
+ this->release_dependencies();
return true;
}
return false;
xbt_assert(payload != nullptr, "You cannot send nullptr");
s4u::CommPtr res = put_init(payload, simulated_size_in_bytes);
- res->start();
+ res->vetoable_start();
return res;
}
void Mailbox::put(void* payload, uint64_t simulated_size_in_bytes)
CommPtr c = put_init();
c->set_remaining(simulated_size_in_bytes);
c->set_src_data(payload);
+ c->vetoable_start();
c->wait();
}
/** Blocking send with timeout */
CommPtr c = put_init();
c->set_remaining(simulated_size_in_bytes);
c->set_src_data(payload);
- // c->start() is optional.
+ c->vetoable_start();
c->wait_for(timeout);
}
{
s4u::CommPtr res = get_init();
res->set_dst_data(data, sizeof(*data));
- res->start();
+ res->vetoable_start();
return res;
}
void* res = nullptr;
CommPtr c = get_init();
c->set_dst_data(&res, sizeof(res));
+ c->vetoable_start();
c->wait();
return res;
}
void* res = nullptr;
CommPtr c = get_init();
c->set_dst_data(&res, sizeof(res));
+ c->vetoable_start();
c->wait_for(timeout);
return res;
}