#include "src/kernel/activity/CommImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
+#include "src/kernel/actor/SimcallObserver.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_comm, s4u_activity, "S4U asynchronous communications");
ssize_t Comm::wait_any_for(const std::vector<CommPtr>& comms, double timeout)
{
- std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
- std::transform(begin(comms), end(comms), begin(rcomms),
- [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
+ std::vector<ActivityPtr> activities;
+ for (const auto& comm : comms)
+ activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
ssize_t changed_pos;
try {
- changed_pos = simcall_comm_waitany(rcomms.data(), rcomms.size(), timeout);
+ changed_pos = Activity::wait_any_for(activities, timeout);
} catch (const NetworkFailureException& e) {
for (auto c : comms) {
if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
}
e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
}
- if (changed_pos != -1)
- comms.at(changed_pos)->complete(State::FINISHED);
return changed_pos;
}
on_recv(*this);
pimpl_ = simcall_comm_irecv(receiver_, mailbox_->get_impl(), dst_buff_, &dst_buff_size_, match_fun_,
copy_data_function_, get_data<void>(), rate_);
-
} else {
xbt_die("Cannot start a communication before specifying whether we are the sender or the receiver");
}
* Negative values denote infinite wait times. 0 as a timeout returns immediately. */
Comm* Comm::wait_for(double timeout)
{
+ XBT_DEBUG("Calling Comm::wait_for with state %s", get_state_str());
+ kernel::actor::ActorImpl* issuer = nullptr;
switch (state_) {
case State::FINISHED:
break;
case State::FAILED:
throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed communication");
-
case State::INITED:
case State::STARTING: // It's not started yet. Do it in one simcall if it's a regular communication
if (from_ != nullptr || to_ != nullptr) {
get_data<void>(), timeout, rate_);
}
break;
-
case State::STARTED:
try {
- simcall_comm_wait(get_impl(), timeout);
+ issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout};
+ if (kernel::actor::simcall_blocking(
+ [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); },
+ &observer)) {
+ throw TimeoutException(XBT_THROW_POINT, "Timeouted");
+ }
} catch (const NetworkFailureException& e) {
+ issuer->simcall_.observer_ = nullptr; // Comm failed on network failure, reset the observer to nullptr
complete(State::FAILED);
e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
}
ssize_t Comm::test_any(const std::vector<CommPtr>& comms)
{
- std::vector<kernel::activity::CommImpl*> rcomms(comms.size());
- std::transform(begin(comms), end(comms), begin(rcomms),
- [](const CommPtr& comm) { return static_cast<kernel::activity::CommImpl*>(comm->pimpl_.get()); });
- ssize_t changed_pos = simcall_comm_testany(rcomms.data(), rcomms.size());
- if (changed_pos != -1)
- comms.at(changed_pos)->complete(State::FINISHED);
- return changed_pos;
+ std::vector<ActivityPtr> activities;
+ for (const auto& comm : comms)
+ activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
+ return Activity::test_any(activities);
}
Comm* Comm::detach()
return this;
}
-bool Comm::test() // TODO: merge with Activity::test, once modernized
-{
- xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::STARTING ||
- state_ == State::CANCELED || state_ == State::FINISHED);
-
- if (state_ == State::CANCELED || state_ == State::FINISHED)
- return true;
-
- if (state_ == State::INITED || state_ == State::STARTING)
- this->vetoable_start();
-
- if (simcall_comm_test(get_impl())) {
- complete(State::FINISHED);
- return true;
- }
- return false;
-}
-
Mailbox* Comm::get_mailbox() const
{
return mailbox_;