#include "src/kernel/activity/ActivityImpl.hpp"
#include "src/kernel/actor/ActorImpl.hpp"
#include "src/kernel/actor/CommObserver.hpp"
+#include <simgrid/Exception.hpp>
+#include <simgrid/activity_set.h>
#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Engine.hpp>
-namespace simgrid::s4u {
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities");
+
+namespace simgrid {
+
+template class xbt::Extendable<s4u::ActivitySet>;
+
+namespace s4u {
void ActivitySet::erase(ActivityPtr a)
{
return ret;
}
+void ActivitySet::handle_failed_activities()
+{
+ for (size_t i = 0; i < activities_.size(); i++) {
+ auto act = activities_[i];
+ if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
+ act->complete(Activity::State::FAILED);
+
+ failed_activities_.push_back(act);
+ activities_[i] = activities_[activities_.size() - 1];
+ activities_.resize(activities_.size() - 1);
+ i--; // compensate the i++ occuring at the end of the loop
+ }
+ }
+}
+
ActivityPtr ActivitySet::wait_any_for(double timeout)
{
std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
- ssize_t changed_pos = kernel::actor::simcall_blocking(
- [&observer] {
- kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
- observer.get_timeout());
- },
- &observer);
- xbt_assert(changed_pos != -1,
- "ActivityImpl::wait_any_for is not supposed to return -1 but instead to raise exceptions");
+ try {
+ ssize_t changed_pos = kernel::actor::simcall_blocking(
+ [&observer] {
+ kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
+ observer.get_timeout());
+ },
+ &observer);
+ if (changed_pos == -1)
+ throw TimeoutException(XBT_THROW_POINT, "Timeouted");
- auto ret = activities_.at(changed_pos);
- erase(ret);
- ret->complete(Activity::State::FINISHED);
- return ret;
+ auto ret = activities_.at(changed_pos);
+ erase(ret);
+ ret->complete(Activity::State::FINISHED);
+ return ret;
+ } catch (const HostFailureException& e) {
+ handle_failed_activities();
+ throw;
+ } catch (const NetworkFailureException& e) {
+ handle_failed_activities();
+ throw;
+ } catch (const StorageFailureException& e) {
+ handle_failed_activities();
+ throw;
+ }
}
ActivityPtr ActivitySet::get_failed_activity()
return ret;
}
-}; // namespace simgrid::s4u
\ No newline at end of file
+} // namespace s4u
+} // namespace simgrid
+
+SG_BEGIN_DECL
+
+sg_activity_set_t sg_activity_set_init()
+{
+ return new simgrid::s4u::ActivitySet();
+}
+void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti)
+{
+ as->push(acti);
+}
+void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti)
+{
+ as->erase(acti);
+}
+size_t sg_activity_set_size(sg_activity_set_t as)
+{
+ return as->size();
+}
+int sg_activity_set_empty(sg_activity_set_t as)
+{
+ return as->empty();
+}
+
+sg_activity_t sg_activity_set_test_any(sg_activity_set_t as)
+{
+ return as->test_any().get();
+}
+void sg_activity_set_wait_all(sg_activity_set_t as)
+{
+ as->wait_all();
+}
+int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout)
+{
+ try {
+ as->wait_all_for(timeout);
+ return 1;
+ } catch (const simgrid::TimeoutException& e) {
+ return 0;
+ }
+}
+sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as)
+{
+ return as->wait_any().get();
+}
+sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout)
+{
+ try {
+ return as->wait_any_for(timeout).get();
+ } catch (const simgrid::TimeoutException& e) {
+ return nullptr;
+ }
+}
+
+void sg_activity_set_delete(sg_activity_set_t as)
+{
+ delete as;
+}
+void sg_activity_unref(sg_activity_t acti)
+{
+ acti->unref();
+}
+
+SG_END_DECL