Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Add new entry in Release_Notes.
[simgrid.git] / src / s4u / s4u_Activity.cpp
1 /* Copyright (c) 2006-2023. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include <simgrid/Exception.hpp>
7 #include <simgrid/s4u/Activity.hpp>
8 #include <simgrid/s4u/Comm.hpp>
9 #include <simgrid/s4u/Engine.hpp>
10 #include <simgrid/s4u/Exec.hpp>
11 #include <simgrid/s4u/Io.hpp>
12 #include <simgrid/s4u/Mess.hpp>
13 #include <xbt/log.h>
14
15 #include "src/kernel/activity/ActivityImpl.hpp"
16 #include "src/kernel/actor/ActorImpl.hpp"
17 #include "src/kernel/actor/CommObserver.hpp"
18
19 XBT_LOG_EXTERNAL_CATEGORY(s4u);
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(s4u_activity, s4u, "S4U activities");
21
22 namespace simgrid {
23
24 template class xbt::Extendable<s4u::Activity>;
25
26 namespace s4u {
27
28 std::set<Activity*>* Activity::vetoed_activities_ = nullptr;
29
30 void Activity::destroy()
31 {
32   /* First Remove all dependencies */
33   while (not dependencies_.empty())
34     (*(dependencies_.begin()))->remove_successor(this);
35   while (not successors_.empty())
36     this->remove_successor(successors_.front());
37
38   cancel();
39 }
40
41 void Activity::wait_until(double time_limit)
42 {
43   double now = Engine::get_clock();
44   if (time_limit > now)
45     wait_for(time_limit - now);
46 }
47
48 Activity* Activity::wait_for(double timeout)
49 {
50   if (state_ == State::INITED)
51     start();
52
53   if (state_ == State::FAILED) {
54     if (dynamic_cast<Comm*>(this))
55       throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed comm");
56     if (dynamic_cast<Mess*>(this))
57       throw NetworkFailureException(XBT_THROW_POINT, "Cannot wait for a failed mess");
58     if (dynamic_cast<Exec*>(this))
59       throw HostFailureException(XBT_THROW_POINT, "Cannot wait for a failed exec");
60     if (dynamic_cast<Io*>(this))
61       throw StorageFailureException(XBT_THROW_POINT, "Cannot wait for a failed I/O");
62     THROW_IMPOSSIBLE;
63   }
64
65   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
66   kernel::actor::ActivityWaitSimcall observer{issuer, pimpl_.get(), timeout, "wait_for"};
67   if (kernel::actor::simcall_blocking(
68           [&observer] { observer.get_activity()->wait_for(observer.get_issuer(), observer.get_timeout()); }, &observer))
69     throw TimeoutException(XBT_THROW_POINT, "Timeouted");
70   complete(State::FINISHED);
71   return this;
72 }
73
74 bool Activity::test()
75 {
76   xbt_assert(state_ == State::INITED || state_ == State::STARTED || state_ == State::STARTING ||
77              state_ == State::CANCELED || state_ == State::FINISHED);
78
79   if (state_ == State::CANCELED || state_ == State::FINISHED)
80     return true;
81
82   if (state_ == State::INITED || state_ == State::STARTING)
83     this->start();
84
85   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
86   kernel::actor::ActivityTestSimcall observer{issuer, pimpl_.get(), "test"};
87   if (kernel::actor::simcall_answered([&observer] { return observer.get_activity()->test(observer.get_issuer()); },
88                                       &observer)) {
89     complete(State::FINISHED);
90     return true;
91   }
92   return false;
93 }
94
95 ssize_t Activity::test_any(const std::vector<ActivityPtr>& activities) // XBT_ATTRIB_DEPRECATED_v339
96 {
97   std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
98   std::transform(begin(activities), end(activities), begin(ractivities),
99                  [](const ActivityPtr& act) { return act->pimpl_.get(); });
100
101   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
102   kernel::actor::ActivityTestanySimcall observer{issuer, ractivities, "test_any"};
103   ssize_t changed_pos = kernel::actor::simcall_answered(
104       [&observer] {
105         return kernel::activity::ActivityImpl::test_any(observer.get_issuer(), observer.get_activities());
106       },
107       &observer);
108   if (changed_pos != -1)
109     activities.at(changed_pos)->complete(State::FINISHED);
110   return changed_pos;
111 }
112
113 ssize_t Activity::deprecated_wait_any_for(const std::vector<ActivityPtr>& activities, double timeout) // XBT_ATTRIB_DEPRECATED_v339
114 {
115   std::vector<kernel::activity::ActivityImpl*> ractivities(activities.size());
116   std::transform(begin(activities), end(activities), begin(ractivities),
117                  [](const ActivityPtr& activity) { return activity->pimpl_.get(); });
118
119   kernel::actor::ActorImpl* issuer = kernel::actor::ActorImpl::self();
120   kernel::actor::ActivityWaitanySimcall observer{issuer, ractivities, timeout, "wait_any_for"};
121   ssize_t changed_pos = kernel::actor::simcall_blocking(
122       [&observer] {
123         kernel::activity::ActivityImpl::wait_any_for(observer.get_issuer(), observer.get_activities(),
124                                                      observer.get_timeout());
125       },
126       &observer);
127   if (changed_pos != -1)
128     activities.at(changed_pos)->complete(State::FINISHED);
129   return changed_pos;
130 }
131
132 Activity* Activity::cancel()
133 {
134   kernel::actor::simcall_answered([this] {
135     XBT_HERE();
136     if (pimpl_)
137       pimpl_->cancel();
138   });
139   complete(State::CANCELED);
140   return this;
141 }
142
143 Activity* Activity::suspend()
144 {
145   if (suspended_)
146     return this; // Already suspended
147   suspended_ = true;
148
149   if (state_ == State::STARTED)
150     pimpl_->suspend();
151
152   return this;
153 }
154
155 Activity* Activity::resume()
156 {
157   if (not suspended_)
158     return this; // nothing to restore when it's not suspended
159
160   if (state_ == State::STARTED)
161     pimpl_->resume();
162
163   return this;
164 }
165
166 const char* Activity::get_state_str() const
167 {
168   return to_c_str(state_);
169 }
170
171 double Activity::get_remaining() const
172 {
173   if (state_ == State::INITED || state_ == State::STARTING)
174     return remains_;
175   else
176     return pimpl_->get_remaining();
177 }
178 double Activity::get_start_time() const
179 {
180   return pimpl_->get_start_time();
181 }
182 double Activity::get_finish_time() const
183 {
184   return pimpl_->get_finish_time();
185 }
186
187 Activity* Activity::set_remaining(double remains)
188 {
189   xbt_assert(state_ == State::INITED || state_ == State::STARTING,
190              "Cannot change the remaining amount of work once the Activity is started");
191   remains_ = remains;
192   return this;
193 }
194
195 } // namespace s4u
196 } // namespace simgrid