Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Various sonar cleanups
[simgrid.git] / src / s4u / s4u_ActivitySet.cpp
1 /* Copyright (c) 2023-. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "src/kernel/activity/ActivityImpl.hpp"
7 #include "src/kernel/actor/ActorImpl.hpp"
8 #include "src/kernel/actor/CommObserver.hpp"
9 #include <simgrid/Exception.hpp>
10 #include <simgrid/activity_set.h>
11 #include <simgrid/s4u/ActivitySet.hpp>
12 #include <simgrid/s4u/Engine.hpp>
13
14 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activityset, s4u_activity, "S4U set of activities");
15
16 namespace simgrid {
17
18 template class xbt::Extendable<s4u::ActivitySet>;
19
20 namespace s4u {
21
22 void ActivitySet::erase(ActivityPtr a)
23 {
24   for (auto it = activities_.begin(); it != activities_.end(); it++)
25     if (*it == a) {
26       activities_.erase(it);
27       return;
28     }
29 }
30
31 void ActivitySet::wait_all_for(double timeout)
32 {
33   if (timeout < 0.0) {
34     for (const auto& act : activities_)
35       act->wait();
36
37   } else {
38
39     double deadline = Engine::get_clock() + timeout;
40     for (const auto& act : activities_)
41       act->wait_until(deadline);
42   }
43 }
44
45 ActivityPtr ActivitySet::test_any()
46 {
47   std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
48   std::transform(begin(activities_), end(activities_), begin(act_impls),
49                  [](const ActivityPtr& act) { return act->pimpl_.get(); });
50
51   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
52   kernel::actor::ActivityTestanySimcall observer{issuer, act_impls, "test_any"};
53   ssize_t changed_pos = kernel::actor::simcall_answered(
54       [&observer] {
55         return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
56       },
57       &observer);
58   if (changed_pos == -1)
59     return ActivityPtr(nullptr);
60
61   auto ret = activities_.at(changed_pos);
62   erase(ret);
63   ret->complete(Activity::State::FINISHED);
64   return ret;
65 }
66
67 void ActivitySet::handle_failed_activities()
68 {
69   for (size_t i = 0; i < activities_.size(); i++) {
70     auto act = activities_[i];
71     if (act->pimpl_->get_state() == kernel::activity::State::FAILED) {
72       act->complete(Activity::State::FAILED);
73
74       failed_activities_.push_back(act);
75       activities_[i] = activities_[activities_.size() - 1];
76       activities_.resize(activities_.size() - 1);
77       i--; // compensate the i++ occuring at the end of the loop
78     }
79   }
80 }
81
82 ActivityPtr ActivitySet::wait_any_for(double timeout)
83 {
84   std::vector<kernel::activity::ActivityImpl*> act_impls(activities_.size());
85   std::transform(begin(activities_), end(activities_), begin(act_impls),
86                  [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
87
88   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
89   kernel::actor::ActivityWaitanySimcall observer{issuer, act_impls, timeout, "wait_any_for"};
90   try {
91     ssize_t changed_pos = kernel::actor::simcall_blocking(
92         [&observer] {
93           kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
94                                                        observer.get_timeout());
95         },
96         &observer);
97     if (changed_pos == -1)
98       throw TimeoutException(XBT_THROW_POINT, "Timeouted");
99
100     auto ret = activities_.at(changed_pos);
101     erase(ret);
102     ret->complete(Activity::State::FINISHED);
103     return ret;
104   } catch (const HostFailureException& e) {
105     handle_failed_activities();
106     throw;
107   } catch (const NetworkFailureException& e) {
108     handle_failed_activities();
109     throw;
110   } catch (const StorageFailureException& e) {
111     handle_failed_activities();
112     throw;
113   }
114 }
115
116 ActivityPtr ActivitySet::get_failed_activity()
117 {
118   if (failed_activities_.empty())
119     return ActivityPtr(nullptr);
120   auto ret = failed_activities_.back();
121   failed_activities_.pop_back();
122   return ret;
123 }
124
125 } // namespace s4u
126 } // namespace simgrid
127
128 SG_BEGIN_DECL
129
130 sg_activity_set_t sg_activity_set_init()
131 {
132   return new simgrid::s4u::ActivitySet();
133 }
134 void sg_activity_set_push(sg_activity_set_t as, sg_activity_t acti)
135 {
136   as->push(acti);
137 }
138 void sg_activity_set_erase(sg_activity_set_t as, sg_activity_t acti)
139 {
140   as->erase(acti);
141 }
142 size_t sg_activity_set_size(sg_activity_set_t as)
143 {
144   return as->size();
145 }
146 int sg_activity_set_empty(sg_activity_set_t as)
147 {
148   return as->empty();
149 }
150
151 sg_activity_t sg_activity_set_test_any(sg_activity_set_t as)
152 {
153   return as->test_any().get();
154 }
155 void sg_activity_set_wait_all(sg_activity_set_t as)
156 {
157   as->wait_all();
158 }
159 int sg_activity_set_wait_all_for(sg_activity_set_t as, double timeout)
160 {
161   try {
162     as->wait_all_for(timeout);
163     return 1;
164   } catch (const simgrid::TimeoutException& e) {
165     return 0;
166   }
167 }
168 sg_activity_t sg_activity_set_wait_any(sg_activity_set_t as)
169 {
170   return as->wait_any().get();
171 }
172 sg_activity_t sg_activity_set_wait_any_for(sg_activity_set_t as, double timeout)
173 {
174   try {
175     return as->wait_any_for(timeout).get();
176   } catch (const simgrid::TimeoutException& e) {
177     return nullptr;
178   }
179 }
180
181 void sg_activity_set_delete(sg_activity_set_t as)
182 {
183   delete as;
184 }
185 void sg_activity_unref(sg_activity_t acti)
186 {
187   acti->unref();
188 }
189
190 SG_END_DECL