Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
5ec46126a84d219b1b8b2b2183dc7ca6ae876e42
[simgrid.git] / src / s4u / s4u_Actor.cpp
1 /* Copyright (c) 2006-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/Exception.hpp"
7 #include "simgrid/actor.h"
8 #include "simgrid/modelchecker.h"
9 #include "simgrid/s4u/Actor.hpp"
10 #include "simgrid/s4u/Exec.hpp"
11 #include "simgrid/s4u/Host.hpp"
12 #include "simgrid/s4u/VirtualMachine.hpp"
13 #include "src/include/mc/mc.h"
14 #include "src/kernel/activity/ExecImpl.hpp"
15 #include "src/mc/mc_replay.hpp"
16 #include "src/simix/smx_private.hpp"
17 #include "src/surf/HostImpl.hpp"
18
19 #include <algorithm>
20 #include <sstream>
21
22 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor, "S4U actors");
23
24 namespace simgrid {
25 namespace s4u {
26
27 xbt::signal<void(Actor&)> s4u::Actor::on_creation;
28 xbt::signal<void(Actor const&)> s4u::Actor::on_suspend;
29 xbt::signal<void(Actor const&)> s4u::Actor::on_resume;
30 xbt::signal<void(Actor const&)> s4u::Actor::on_sleep;
31 xbt::signal<void(Actor const&)> s4u::Actor::on_wake_up;
32 xbt::signal<void(Actor const&)> s4u::Actor::on_migration_start;
33 xbt::signal<void(Actor const&)> s4u::Actor::on_migration_end;
34 xbt::signal<void(Actor const&)> s4u::Actor::on_termination;
35 xbt::signal<void(Actor const&)> s4u::Actor::on_destruction;
36
37 // ***** Actor creation *****
38 Actor* Actor::self()
39 {
40   kernel::context::Context* self_context = kernel::context::Context::self();
41   if (self_context == nullptr)
42     return nullptr;
43
44   return self_context->get_actor()->ciface();
45 }
46 ActorPtr Actor::init(const std::string& name, s4u::Host* host)
47 {
48   kernel::actor::ActorImpl* self = SIMIX_process_self();
49   kernel::actor::ActorImpl* actor =
50       kernel::actor::simcall([self, &name, host] { return self->init(name, host).get(); });
51   return actor->iface();
52 }
53
54 ActorPtr Actor::start(const std::function<void()>& code)
55 {
56   simgrid::kernel::actor::simcall([this, &code] { pimpl_->start(code); });
57   return this;
58 }
59
60 ActorPtr Actor::create(const std::string& name, s4u::Host* host, const std::function<void()>& code)
61 {
62   kernel::actor::ActorImpl* self = SIMIX_process_self();
63   kernel::actor::ActorImpl* actor =
64       kernel::actor::simcall([self, &name, host, &code] { return self->init(name, host)->start(code); });
65
66   return actor->iface();
67 }
68
69 ActorPtr Actor::create(const std::string& name, s4u::Host* host, const std::string& function,
70                        std::vector<std::string> args)
71 {
72   simix::ActorCodeFactory& factory = SIMIX_get_actor_code_factory(function);
73   return create(name, host, factory(std::move(args)));
74 }
75
76 void intrusive_ptr_add_ref(Actor* actor)
77 {
78   intrusive_ptr_add_ref(actor->pimpl_);
79 }
80 void intrusive_ptr_release(Actor* actor)
81 {
82   intrusive_ptr_release(actor->pimpl_);
83 }
84 int Actor::get_refcount()
85 {
86   return pimpl_->get_refcount();
87 }
88
89 // ***** Actor methods *****
90
91 void Actor::join()
92 {
93   join(-1);
94 }
95
96 void Actor::join(double timeout)
97 {
98   kernel::actor::ActorImpl* issuer = SIMIX_process_self();
99   kernel::actor::ActorImpl* target = pimpl_;
100   kernel::actor::simcall_blocking<void>([issuer, target, timeout] {
101     if (target->finished_) {
102       // The joined process is already finished, just wake up the issuer right away
103       issuer->simcall_answer();
104     } else {
105       smx_activity_t sync = issuer->join(target, timeout);
106       sync->register_simcall(&issuer->simcall);
107     }
108   });
109 }
110
111 void Actor::set_auto_restart(bool autorestart)
112 {
113   kernel::actor::simcall([this, autorestart]() {
114     xbt_assert(autorestart && not pimpl_->has_to_auto_restart()); // FIXME: handle all cases
115     pimpl_->set_auto_restart(autorestart);
116
117     kernel::actor::ProcessArg* arg = new kernel::actor::ProcessArg(pimpl_->get_host(), pimpl_);
118     XBT_DEBUG("Adding %s to the actors_at_boot_ list of Host %s", arg->name.c_str(), arg->host->get_cname());
119     pimpl_->get_host()->pimpl_->add_actor_at_boot(arg);
120   });
121 }
122
123 void Actor::on_exit(const std::function<void(bool /*failed*/)>& fun) const
124 {
125   kernel::actor::simcall([this, &fun] { SIMIX_process_on_exit(pimpl_, fun); });
126 }
127
128 void Actor::migrate(Host* new_host)
129 {
130   s4u::Actor::on_migration_start(*this);
131
132   kernel::actor::simcall([this, new_host]() {
133     if (pimpl_->waiting_synchro != nullptr) {
134       // The actor is blocked on an activity. If it's an exec, migrate it too.
135       // FIXME: implement the migration of other kinds of activities
136       kernel::activity::ExecImplPtr exec =
137           boost::dynamic_pointer_cast<kernel::activity::ExecImpl>(pimpl_->waiting_synchro);
138       xbt_assert(exec.get() != nullptr, "We can only migrate blocked actors when they are blocked on executions.");
139       exec->migrate(new_host);
140     }
141     this->pimpl_->set_host(new_host);
142   });
143
144   s4u::Actor::on_migration_end(*this);
145 }
146
147 s4u::Host* Actor::get_host() const
148 {
149   return this->pimpl_->get_host();
150 }
151
152 void Actor::daemonize()
153 {
154   kernel::actor::simcall([this]() { pimpl_->daemonize(); });
155 }
156
157 bool Actor::is_daemon() const
158 {
159   return this->pimpl_->is_daemon();
160 }
161
162 const simgrid::xbt::string& Actor::get_name() const
163 {
164   return this->pimpl_->get_name();
165 }
166
167 const char* Actor::get_cname() const
168 {
169   return this->pimpl_->get_cname();
170 }
171
172 aid_t Actor::get_pid() const
173 {
174   return this->pimpl_->get_pid();
175 }
176
177 aid_t Actor::get_ppid() const
178 {
179   return this->pimpl_->get_ppid();
180 }
181
182 void Actor::suspend()
183 {
184   kernel::actor::ActorImpl* issuer = SIMIX_process_self();
185   kernel::actor::ActorImpl* target = pimpl_;
186   s4u::Actor::on_suspend(*this);
187   kernel::actor::simcall_blocking<void>([issuer, target]() {
188     target->suspend();
189     if (target != issuer) {
190       /* If we are suspending ourselves, then just do not finish the simcall now */
191       issuer->simcall_answer();
192     }
193   });
194 }
195
196 void Actor::resume()
197 {
198   kernel::actor::simcall([this] { pimpl_->resume(); });
199   s4u::Actor::on_resume(*this);
200 }
201
202 bool Actor::is_suspended()
203 {
204   return pimpl_->is_suspended();
205 }
206
207 void Actor::set_kill_time(double kill_time)
208 {
209   kernel::actor::simcall([this, kill_time] { pimpl_->set_kill_time(kill_time); });
210 }
211
212 /** @brief Get the kill time of an actor(or 0 if unset). */
213 double Actor::get_kill_time()
214 {
215   return pimpl_->get_kill_time();
216 }
217
218 void Actor::kill()
219 {
220   kernel::actor::ActorImpl* self = SIMIX_process_self();
221   kernel::actor::simcall([this, self] {
222     xbt_assert(pimpl_ != simix_global->maestro_process, "Killing maestro is a rather bad idea");
223     self->kill(pimpl_);
224   });
225 }
226
227 // ***** Static functions *****
228
229 ActorPtr Actor::by_pid(aid_t pid)
230 {
231   kernel::actor::ActorImpl* actor = SIMIX_process_from_PID(pid);
232   if (actor != nullptr)
233     return actor->iface();
234   else
235     return ActorPtr();
236 }
237
238 void Actor::kill_all()
239 {
240   kernel::actor::ActorImpl* self = SIMIX_process_self();
241   kernel::actor::simcall([self] { self->kill_all(); });
242 }
243
244 const std::unordered_map<std::string, std::string>* Actor::get_properties() const
245 {
246   return pimpl_->get_properties();
247 }
248
249 /** Retrieve the property value (or nullptr if not set) */
250 const char* Actor::get_property(const std::string& key) const
251 {
252   return pimpl_->get_property(key);
253 }
254
255 void Actor::set_property(const std::string& key, const std::string& value)
256 {
257   kernel::actor::simcall([this, &key, &value] { pimpl_->set_property(key, value); });
258 }
259
260 Actor* Actor::restart()
261 {
262   return kernel::actor::simcall([this]() { return pimpl_->restart(); });
263 }
264
265 // ***** this_actor *****
266
267 namespace this_actor {
268
269 /** Returns true if run from the kernel mode, and false if run from a real actor
270  *
271  * Everything that is run out of any actor (simulation setup before the engine is run,
272  * computing the model evolutions as a result to the actors' action, etc) is run in
273  * kernel mode, just as in any operating systems.
274  *
275  * In SimGrid, the actor in charge of doing the stuff in kernel mode is called Maestro,
276  * because it is the one scheduling when the others should move or wait.
277  */
278 bool is_maestro()
279 {
280   kernel::actor::ActorImpl* self = SIMIX_process_self();
281   return self == nullptr || self == simix_global->maestro_process;
282 }
283
284 void sleep_for(double duration)
285 {
286   xbt_assert(std::isfinite(duration), "duration is not finite!");
287
288   if (duration > 0) {
289     kernel::actor::ActorImpl* issuer = SIMIX_process_self();
290     Actor::on_sleep(*issuer->ciface());
291
292     kernel::actor::simcall_blocking<void>([issuer, duration]() {
293       if (MC_is_active() || MC_record_replay_is_active()) {
294         MC_process_clock_add(issuer, duration);
295         issuer->simcall_answer();
296         return;
297       }
298       smx_activity_t sync = issuer->sleep(duration);
299       sync->register_simcall(&issuer->simcall);
300     });
301
302     Actor::on_wake_up(*issuer->ciface());
303   }
304 }
305
306 void yield()
307 {
308   kernel::actor::simcall([] { /* do nothing*/ });
309 }
310
311 XBT_PUBLIC void sleep_until(double wakeup_time)
312 {
313   double now = SIMIX_get_clock();
314   if (wakeup_time > now)
315     sleep_for(wakeup_time - now);
316 }
317
318 void execute(double flops)
319 {
320   execute(flops, 1.0 /* priority */);
321 }
322
323 void execute(double flops, double priority)
324 {
325   exec_init(flops)->set_priority(priority)->start()->wait();
326 }
327
328 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
329                       const std::vector<double>& bytes_amounts)
330 {
331   parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
332 }
333
334 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
335                       const std::vector<double>& bytes_amounts, double timeout)
336 {
337   xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
338   xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
339              "Host count (%zu) does not match flops_amount count (%zu).", hosts.size(), flops_amounts.size());
340   xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(),
341              "bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.",
342              hosts.size(), hosts.size(), flops_amounts.size());
343   /* Check that we are not mixing VMs and PMs in the parallel task */
344   bool is_a_vm = (nullptr != dynamic_cast<VirtualMachine*>(hosts.front()));
345   xbt_assert(std::all_of(hosts.begin(), hosts.end(),
346                          [is_a_vm](s4u::Host* elm) {
347                            bool tmp_is_a_vm = (nullptr != dynamic_cast<VirtualMachine*>(elm));
348                            return is_a_vm == tmp_is_a_vm;
349                          }),
350              "parallel_execute: mixing VMs and PMs is not supported (yet).");
351   /* checking for infinite values */
352   xbt_assert(std::all_of(flops_amounts.begin(), flops_amounts.end(), [](double elm) { return std::isfinite(elm); }),
353              "flops_amounts comprises infinite values!");
354   xbt_assert(std::all_of(bytes_amounts.begin(), bytes_amounts.end(), [](double elm) { return std::isfinite(elm); }),
355              "flops_amounts comprises infinite values!");
356
357   exec_init(hosts, flops_amounts, bytes_amounts)->set_timeout(timeout)->wait();
358 }
359
360 ExecPtr exec_init(double flops_amount)
361 {
362   return ExecPtr(new ExecSeq(get_host(), flops_amount));
363 }
364
365 ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
366                   const std::vector<double>& bytes_amounts)
367 {
368   return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts));
369 }
370
371 ExecPtr exec_async(double flops)
372 {
373   ExecPtr res = exec_init(flops);
374   res->start();
375   return res;
376 }
377
378 aid_t get_pid()
379 {
380   return SIMIX_process_self()->get_pid();
381 }
382
383 aid_t get_ppid()
384 {
385   return SIMIX_process_self()->get_ppid();
386 }
387
388 std::string get_name()
389 {
390   return SIMIX_process_self()->get_name();
391 }
392
393 const char* get_cname()
394 {
395   return SIMIX_process_self()->get_cname();
396 }
397
398 Host* get_host()
399 {
400   return SIMIX_process_self()->get_host();
401 }
402
403 void suspend()
404 {
405   kernel::actor::ActorImpl* self = SIMIX_process_self();
406   s4u::Actor::on_suspend(*self->ciface());
407   kernel::actor::simcall_blocking<void>([self] { self->suspend(); });
408 }
409
410 void resume()
411 {
412   kernel::actor::ActorImpl* self = SIMIX_process_self();
413   kernel::actor::simcall([self] { self->resume(); });
414   Actor::on_resume(*self->ciface());
415 }
416
417 void exit()
418 {
419   kernel::actor::ActorImpl* self = SIMIX_process_self();
420   simgrid::kernel::actor::simcall([self] { self->exit(); });
421 }
422
423 void on_exit(const std::function<void(bool)>& fun)
424 {
425   SIMIX_process_self()->iface()->on_exit(fun);
426 }
427
428 /** @brief Moves the current actor to another host
429  *
430  * @see simgrid::s4u::Actor::migrate() for more information
431  */
432 void migrate(Host* new_host)
433 {
434   SIMIX_process_self()->iface()->migrate(new_host);
435 }
436
437 } // namespace this_actor
438 } // namespace s4u
439 } // namespace simgrid
440
441 /* **************************** Public C interface *************************** */
442
443 /** @ingroup m_actor_management
444  * @brief Returns the process ID of @a actor.
445  *
446  * This function checks whether @a actor is a valid pointer and return its PID (or 0 in case of problem).
447  */
448 aid_t sg_actor_get_PID(sg_actor_t actor)
449 {
450   /* Do not raise an exception here: this function is called by the logs
451    * and the exceptions, so it would be called back again and again */
452   if (actor == nullptr || actor->get_impl() == nullptr)
453     return 0;
454   return actor->get_pid();
455 }
456
457 /** @ingroup m_actor_management
458  * @brief Returns the process ID of the parent of @a actor.
459  *
460  * This function checks whether @a actor is a valid pointer and return its parent's PID.
461  * Returns -1 if the actor has not been created by any other actor.
462  */
463 aid_t sg_actor_get_PPID(sg_actor_t actor)
464 {
465   return actor->get_ppid();
466 }
467
468 /** @ingroup m_actor_management
469  *
470  * @brief Return a #sg_actor_t given its PID.
471  *
472  * This function search in the list of all the created sg_actor_t for a sg_actor_t  whose PID is equal to @a PID.
473  * If none is found, @c nullptr is returned.
474    Note that the PID are unique in the whole simulation, not only on a given host.
475  */
476 sg_actor_t sg_actor_by_PID(aid_t pid)
477 {
478   return simgrid::s4u::Actor::by_pid(pid).get();
479 }
480
481 /** @ingroup m_actor_management
482  * @brief Return the name of an actor.
483  */
484 const char* sg_actor_get_name(sg_actor_t actor)
485 {
486   return actor->get_cname();
487 }
488
489 sg_host_t sg_actor_get_host(sg_actor_t actor)
490 {
491   return actor->get_host();
492 }
493
494 /** @ingroup m_actor_management
495  * @brief Returns the value of a given actor property
496  *
497  * @param actor an actor
498  * @param name a property name
499  * @return value of a property (or nullptr if the property is not set)
500  */
501 const char* sg_actor_get_property_value(sg_actor_t actor, const char* name)
502 {
503   return actor->get_property(name);
504 }
505
506 /** @ingroup m_actor_management
507  * @brief Return the list of properties
508  *
509  * This function returns all the parameters associated with an actor
510  */
511 xbt_dict_t sg_actor_get_properties(sg_actor_t actor)
512 {
513   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
514   xbt_dict_t as_dict                        = xbt_dict_new_homogeneous(xbt_free_f);
515   const std::unordered_map<std::string, std::string>* props = actor->get_properties();
516   if (props == nullptr)
517     return nullptr;
518   for (auto const& kv : *props) {
519     xbt_dict_set(as_dict, kv.first.c_str(), xbt_strdup(kv.second.c_str()), nullptr);
520   }
521   return as_dict;
522 }
523
524 /** @ingroup m_actor_management
525  * @brief Suspend the actor.
526  *
527  * This function suspends the actor by suspending the task on which it was waiting for the completion.
528  */
529 void sg_actor_suspend(sg_actor_t actor)
530 {
531   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
532   actor->suspend();
533 }
534
535 /** @ingroup m_actor_management
536  * @brief Resume a suspended actor.
537  *
538  * This function resumes a suspended actor by resuming the task on which it was waiting for the completion.
539  */
540 void sg_actor_resume(sg_actor_t actor)
541 {
542   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
543   actor->resume();
544 }
545
546 /** @ingroup m_actor_management
547  * @brief Returns true if the actor is suspended .
548  *
549  * This checks whether an actor is suspended or not by inspecting the task on which it was waiting for the completion.
550  */
551 int sg_actor_is_suspended(sg_actor_t actor)
552 {
553   return actor->is_suspended();
554 }
555
556 /**
557  * @ingroup m_actor_management
558  * @brief Restarts an actor from the beginning.
559  */
560 sg_actor_t sg_actor_restart(sg_actor_t actor)
561 {
562   return actor->restart();
563 }
564
565 /**
566  * @ingroup m_actor_management
567  * @brief Sets the "auto-restart" flag of the actor.
568  * If the flag is set to 1, the actor will be automatically restarted when its host comes back up.
569  */
570 void sg_actor_set_auto_restart(sg_actor_t actor, int auto_restart)
571 {
572   actor->set_auto_restart(auto_restart);
573 }
574
575 /** @ingroup m_actor_management
576  * @brief This actor will be terminated automatically when the last non-daemon actor finishes
577  */
578 void sg_actor_daemonize(sg_actor_t actor)
579 {
580   actor->daemonize();
581 }
582
583 /** @ingroup m_actor_management
584  * @brief Migrates an actor to another location.
585  *
586  * This function changes the value of the #sg_host_t on  which @a actor is running.
587  */
588 void sg_actor_migrate(sg_actor_t process, sg_host_t host)
589 {
590   process->migrate(host);
591 }
592
593 /** @ingroup m_actor_management
594  * @brief Wait for the completion of a #sg_actor_t.
595  *
596  * @param actor the actor to wait for
597  * @param timeout wait until the actor is over, or the timeout expires
598  */
599 void sg_actor_join(sg_actor_t actor, double timeout)
600 {
601   actor->join(timeout);
602 }
603
604 void sg_actor_kill(sg_actor_t actor)
605 {
606   actor->kill();
607 }
608
609 void sg_actor_kill_all()
610 {
611   simgrid::s4u::Actor::kill_all();
612 }
613
614 /** @ingroup m_actor_management
615  * @brief Set the kill time of an actor.
616  *
617  * @param actor an actor
618  * @param kill_time the time when the actor is killed.
619  */
620 void sg_actor_set_kill_time(sg_actor_t actor, double kill_time)
621 {
622   actor->set_kill_time(kill_time);
623 }
624
625 /** Yield the current actor; let the other actors execute first */
626 void sg_actor_yield()
627 {
628   simgrid::s4u::this_actor::yield();
629 }
630
631 void sg_actor_sleep_for(double duration)
632 {
633   simgrid::s4u::this_actor::sleep_for(duration);
634 }
635
636 sg_actor_t sg_actor_attach(const char* name, void* data, sg_host_t host, xbt_dict_t properties)
637 {
638   xbt_assert(host != nullptr, "Invalid parameters: host and code params must not be nullptr");
639   std::unordered_map<std::string, std::string> props;
640   xbt_dict_cursor_t cursor = nullptr;
641   char* key;
642   char* value;
643   xbt_dict_foreach (properties, cursor, key, value)
644     props[key] = value;
645   xbt_dict_free(&properties);
646
647   /* Let's create the process: SIMIX may decide to start it right now, even before returning the flow control to us */
648   smx_actor_t actor = nullptr;
649   try {
650     actor = simgrid::kernel::actor::ActorImpl::attach(name, data, host, &props).get();
651   } catch (simgrid::HostFailureException const&) {
652     xbt_die("Could not attach");
653   }
654
655   simgrid::s4u::this_actor::yield();
656   return actor->ciface();
657 }
658
659 void sg_actor_detach()
660 {
661   simgrid::kernel::actor::ActorImpl::detach();
662 }
663
664 aid_t sg_actor_self_get_pid()
665 {
666   return simgrid::s4u::this_actor::get_pid();
667 }
668
669 aid_t sg_actor_self_get_ppid()
670 {
671   return simgrid::s4u::this_actor::get_ppid();
672 }
673
674 const char* sg_actor_self_get_name()
675 {
676   return simgrid::s4u::this_actor::get_cname();
677 }
678
679 sg_actor_t sg_actor_self()
680 {
681   return simgrid::s4u::Actor::self();
682 }
683
684 void sg_actor_self_execute(double flops)
685 {
686   simgrid::s4u::this_actor::execute(flops);
687 }
688
689 /** @brief Take an extra reference on that actor to prevent it to be garbage-collected */
690 void sg_actor_ref(sg_actor_t actor)
691 {
692   intrusive_ptr_add_ref(actor);
693 }
694 /** @brief Release a reference on that actor so that it can get be garbage-collected */
695 void sg_actor_unref(sg_actor_t actor)
696 {
697   intrusive_ptr_release(actor);
698 }
699
700 /** @brief Return the user data of a #sg_actor_t */
701 void* sg_actor_data(sg_actor_t actor)
702 {
703   return actor->get_data();
704 }
705 /** @brief Set the user data of a #sg_actor_t */
706 void sg_actor_data_set(sg_actor_t actor, void* userdata)
707 {
708   actor->set_data(userdata);
709 }