virtual Activity* do_start() = 0;
/** Tests whether the given activity is terminated yet. */
virtual bool test();
- /*! take a vector s4u::ActivityPtr and return the rank of the first finished one (or -1 if none is done). */
- static ssize_t test_any(const std::vector<ActivityPtr>& activities);
/** Blocks the current actor until the activity is terminated */
Activity* wait() { return wait_for(-1.0); }
/** Blocks the current actor until the activity is terminated, or until the time limit is reached\n
* Raises: timeout exception. */
void wait_until(double time_limit);
- /*! take a vector of s4u::ActivityPtr and return when one of them is finished.
- * The return value is the rank of the first finished ActivityPtr. */
- static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return wait_any_for(activities, -1); }
- /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
- static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout);
/** Cancel that activity */
Activity* cancel();
kernel::activity::ActivityImpl* get_impl() const { return pimpl_.get(); }
#ifndef DOXYGEN
+ static ssize_t deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout); // XBT_ATTRIB_DEPRECATED_v339
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t test_any(const std::vector<ActivityPtr>& activities);
+
friend void intrusive_ptr_release(Activity* a)
{
if (a->refcount_.fetch_sub(1, std::memory_order_release) == 1) {
* dependency or no resource assigned) */
void on_this_veto_cb(const std::function<void(AnyActivity&)>& cb) { on_this_veto.connect(cb); }
- XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(
- const std::function<void(Activity const&)>& cb)
- {
- on_suspend.connect(cb);
- }
- XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(
- const std::function<void(Activity const&)>& cb)
- {
- on_resume.connect(cb);
- }
+#ifndef DOXYGEN
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_suspend_cb() instead") static void on_suspended_cb(const std::function<void(Activity const&)>& cb) { on_suspend.connect(cb); }
+ XBT_ATTRIB_DEPRECATED_v338("Please use on_resume_cb() instead") static void on_resumed_cb(const std::function<void(Activity const&)>& cb) { on_resume.connect(cb); }
+
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any(const std::vector<ActivityPtr>& activities) { return deprecated_wait_any_for(activities, -1); }
+ XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) { return deprecated_wait_any_for(activities, timeout); }
+#endif
AnyActivity* add_successor(ActivityPtr a)
{
bool is_assigned() const override;
#ifndef DOXYGEN
- static ssize_t deprecated_wait_any_for(const std::vector<ExecPtr>& execs,
- double timeout); // XBT_ATTRIB_DEPRECATED_v339
+ static ssize_t deprecated_wait_any_for(const std::vector<ExecPtr>& execs, double timeout); // XBT_ATTRIB_DEPRECATED_v339
XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
- wait_any(const std::vector<ExecPtr>& execs)
- {
- return deprecated_wait_any_for(execs, -1);
- }
+ wait_any(const std::vector<ExecPtr>& execs) { return deprecated_wait_any_for(execs, -1); }
XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead") static ssize_t
- wait_any_for(const std::vector<ExecPtr>& execs, double timeout)
- {
- return deprecated_wait_any_for(execs, timeout);
- }
+ wait_any_for(const std::vector<ExecPtr>& execs, double timeout) { return deprecated_wait_any_for(execs, timeout); }
#endif
};
XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead")
static ssize_t wait_any(const std::vector<IoPtr>& ios) { return deprecated_wait_any_for(ios, -1); }
XBT_ATTRIB_DEPRECATED_v339("Please use ActivitySet instead")
- static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout) {
- return deprecated_wait_any_for(ios, timeout);
- }
+ static ssize_t wait_any_for(const std::vector<IoPtr>& ios, double timeout) { return deprecated_wait_any_for(ios, timeout); }
#endif
double get_remaining() const override;
return changed_pos;
}
-ssize_t Activity::wait_any_for(const std::vector<ActivityPtr>& activities, double timeout)
+ssize_t Activity::deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
std::transform(begin(activities), end(activities), begin(ractivities),
ssize_t Comm::test_any(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339
{
- std::vector<ActivityPtr> activities;
- for (const auto& comm : comms)
- activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
- return Activity::test_any(activities);
+ std::vector<kernel::activity::ActivityImpl*> ractivities(comms.size());
+ std::transform(begin(comms), end(comms), begin(ractivities), [](const CommPtr& act) { return act->pimpl_.get(); });
+
+ kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
+ kernel::actor::ActivityTestanySimcall observer{issuer, ractivities, "test_any"};
+ ssize_t changed_pos = kernel::actor::simcall_answered(
+ [&observer] {
+ return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
+ },
+ &observer);
+ if (changed_pos != -1)
+ comms.at(changed_pos)->complete(State::FINISHED);
+ return changed_pos;
}
/** @brief Block the calling actor until the communication is finished, or until timeout
ssize_t Comm::deprecated_wait_any_for(const std::vector<CommPtr>& comms, double timeout) // XBT_ATTRIB_DEPRECATED_v339
{
- std::vector<ActivityPtr> activities;
+ if (comms.empty())
+ return -1;
+ ActivitySet set;
for (const auto& comm : comms)
- activities.push_back(boost::dynamic_pointer_cast<Activity>(comm));
- ssize_t changed_pos;
+ set.push(comm);
try {
- changed_pos = Activity::wait_any_for(activities, timeout);
+ auto* ret = set.wait_any_for(timeout).get();
+ for (size_t i = 0; i < comms.size(); i++)
+ if (comms[i].get() == ret)
+ return i;
+
+ } catch (TimeoutException& e) {
+ return -1;
} catch (const NetworkFailureException& e) {
- changed_pos = -1;
- for (auto c : comms) {
- if (c->pimpl_->get_state() == kernel::activity::State::FAILED) {
+ for (auto c : comms)
+ if (c->pimpl_->get_state() == kernel::activity::State::FAILED)
c->complete(State::FAILED);
- }
- }
+
e.rethrow_nested(XBT_THROW_POINT, boost::core::demangle(typeid(e).name()) + " raised in kernel mode.");
}
- return changed_pos;
+ return -1;
}
void Comm::wait_all(const std::vector<CommPtr>& comms) // XBT_ATTRIB_DEPRECATED_v339