Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
5732d13a6a1d2295ba91f2e66d5464a4e3659723
[simgrid.git] / src / s4u / s4u_Actor.cpp
1 /* Copyright (c) 2006-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/Exception.hpp"
7 #include "simgrid/actor.h"
8 #include "simgrid/s4u/Actor.hpp"
9 #include "simgrid/s4u/Exec.hpp"
10 #include "simgrid/s4u/Host.hpp"
11 #include "simgrid/s4u/VirtualMachine.hpp"
12 #include "src/kernel/activity/ExecImpl.hpp"
13 #include "src/simix/smx_private.hpp"
14 #include "src/surf/HostImpl.hpp"
15
16 #include <algorithm>
17 #include <sstream>
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor, "S4U actors");
20
21 namespace simgrid {
22 namespace s4u {
23
24 xbt::signal<void(Actor&)> s4u::Actor::on_creation;
25 xbt::signal<void(Actor const&)> s4u::Actor::on_suspend;
26 xbt::signal<void(Actor const&)> s4u::Actor::on_resume;
27 xbt::signal<void(Actor const&)> s4u::Actor::on_sleep;
28 xbt::signal<void(Actor const&)> s4u::Actor::on_wake_up;
29 xbt::signal<void(Actor const&)> s4u::Actor::on_migration_start;
30 xbt::signal<void(Actor const&)> s4u::Actor::on_migration_end;
31 xbt::signal<void(Actor const&)> s4u::Actor::on_termination;
32 xbt::signal<void(Actor const&)> s4u::Actor::on_destruction;
33
34 // ***** Actor creation *****
35 Actor* Actor::self()
36 {
37   kernel::context::Context* self_context = kernel::context::Context::self();
38   if (self_context == nullptr)
39     return nullptr;
40
41   return self_context->get_actor()->ciface();
42 }
43 ActorPtr Actor::init(const std::string& name, s4u::Host* host)
44 {
45   smx_actor_t self = SIMIX_process_self();
46   kernel::actor::ActorImpl* actor = simix::simcall([self, &name, host] { return self->init(name, host).get(); });
47   return actor->iface();
48 }
49
50 ActorPtr Actor::start(const std::function<void()>& code)
51 {
52   simgrid::simix::simcall([this, &code] { pimpl_->start(code); });
53   return this;
54 }
55
56 ActorPtr Actor::create(const std::string& name, s4u::Host* host, const std::function<void()>& code)
57 {
58   smx_actor_t self = SIMIX_process_self();
59   kernel::actor::ActorImpl* actor =
60       simix::simcall([self, &name, host, &code] { return self->init(name, host)->start(code); });
61
62   return actor->iface();
63 }
64
65 ActorPtr Actor::create(const std::string& name, s4u::Host* host, const std::string& function,
66                        std::vector<std::string> args)
67 {
68   simix::ActorCodeFactory& factory = SIMIX_get_actor_code_factory(function);
69   return create(name, host, factory(std::move(args)));
70 }
71
72 void intrusive_ptr_add_ref(Actor* actor)
73 {
74   intrusive_ptr_add_ref(actor->pimpl_);
75 }
76 void intrusive_ptr_release(Actor* actor)
77 {
78   intrusive_ptr_release(actor->pimpl_);
79 }
80 int Actor::get_refcount()
81 {
82   return pimpl_->get_refcount();
83 }
84
85 // ***** Actor methods *****
86
87 void Actor::join()
88 {
89   join(-1);
90 }
91
92 void Actor::join(double timeout)
93 {
94   auto issuer = SIMIX_process_self();
95   auto target = pimpl_;
96   simix::simcall_blocking([issuer, target, timeout] {
97     if (target->finished_) {
98       // The joined process is already finished, just wake up the issuer right away
99       issuer->simcall_answer();
100     } else {
101       smx_activity_t sync = issuer->join(target, timeout);
102       sync->simcalls_.push_back(&issuer->simcall);
103       issuer->waiting_synchro = sync;
104     }
105   });
106 }
107
108 void Actor::set_auto_restart(bool autorestart)
109 {
110   simix::simcall([this, autorestart]() {
111     xbt_assert(autorestart && not pimpl_->has_to_auto_restart()); // FIXME: handle all cases
112     pimpl_->set_auto_restart(autorestart);
113
114     kernel::actor::ProcessArg* arg = new kernel::actor::ProcessArg(pimpl_->get_host(), pimpl_);
115     XBT_DEBUG("Adding Process %s to the actors_at_boot_ list of Host %s", arg->name.c_str(), arg->host->get_cname());
116     pimpl_->get_host()->pimpl_->actors_at_boot_.emplace_back(arg);
117   });
118 }
119
120 void Actor::on_exit(const std::function<void(int, void*)>& fun, void* data) /* deprecated */
121 {
122   on_exit([fun, data](bool failed) { fun(failed ? SMX_EXIT_FAILURE : SMX_EXIT_SUCCESS, data); });
123 }
124
125 void Actor::on_exit(const std::function<void(bool /*failed*/)>& fun) const
126 {
127   simix::simcall([this, &fun] { SIMIX_process_on_exit(pimpl_, fun); });
128 }
129
130 void Actor::migrate(Host* new_host)
131 {
132   s4u::Actor::on_migration_start(*this);
133
134   simix::simcall([this, new_host]() {
135     if (pimpl_->waiting_synchro != nullptr) {
136       // The actor is blocked on an activity. If it's an exec, migrate it too.
137       // FIXME: implement the migration of other kind of activities
138       kernel::activity::ExecImplPtr exec =
139           boost::dynamic_pointer_cast<kernel::activity::ExecImpl>(pimpl_->waiting_synchro);
140       xbt_assert(exec.get() != nullptr, "We can only migrate blocked actors when they are blocked on executions.");
141       exec->migrate(new_host);
142     }
143     this->pimpl_->set_host(new_host);
144   });
145
146   s4u::Actor::on_migration_end(*this);
147 }
148
149 s4u::Host* Actor::get_host() const
150 {
151   return this->pimpl_->get_host();
152 }
153
154 void Actor::daemonize()
155 {
156   simix::simcall([this]() { pimpl_->daemonize(); });
157 }
158
159 bool Actor::is_daemon() const
160 {
161   return this->pimpl_->is_daemon();
162 }
163
164 const simgrid::xbt::string& Actor::get_name() const
165 {
166   return this->pimpl_->get_name();
167 }
168
169 const char* Actor::get_cname() const
170 {
171   return this->pimpl_->get_cname();
172 }
173
174 aid_t Actor::get_pid() const
175 {
176   return this->pimpl_->get_pid();
177 }
178
179 aid_t Actor::get_ppid() const
180 {
181   return this->pimpl_->get_ppid();
182 }
183
184 void Actor::suspend()
185 {
186   s4u::Actor::on_suspend(*this);
187   simcall_process_suspend(pimpl_);
188 }
189
190 void Actor::resume()
191 {
192   simix::simcall([this] { pimpl_->resume(); });
193   s4u::Actor::on_resume(*this);
194 }
195
196 bool Actor::is_suspended()
197 {
198   return simix::simcall([this] { return pimpl_->is_suspended(); });
199 }
200
201 void Actor::set_kill_time(double kill_time)
202 {
203   simix::simcall([this, kill_time] { pimpl_->set_kill_time(kill_time); });
204 }
205
206 /** @brief Get the kill time of an actor(or 0 if unset). */
207 double Actor::get_kill_time()
208 {
209   return pimpl_->get_kill_time();
210 }
211
212 void Actor::kill(aid_t pid) // deprecated
213 {
214   kernel::actor::ActorImpl* killer = SIMIX_process_self();
215   kernel::actor::ActorImpl* victim = SIMIX_process_from_PID(pid);
216   if (victim != nullptr) {
217     simix::simcall([killer, victim] { killer->kill(victim); });
218   } else {
219     std::ostringstream oss;
220     oss << "kill: (" << pid << ") - No such actor" << std::endl;
221     throw std::runtime_error(oss.str());
222   }
223 }
224
225 void Actor::kill()
226 {
227   kernel::actor::ActorImpl* process = SIMIX_process_self();
228   simix::simcall([this, process] {
229     xbt_assert(pimpl_ != simix_global->maestro_process, "Killing maestro is a rather bad idea");
230     process->kill(pimpl_);
231   });
232 }
233
234 // ***** Static functions *****
235
236 ActorPtr Actor::by_pid(aid_t pid)
237 {
238   kernel::actor::ActorImpl* process = SIMIX_process_from_PID(pid);
239   if (process != nullptr)
240     return process->iface();
241   else
242     return ActorPtr();
243 }
244
245 void Actor::kill_all()
246 {
247   kernel::actor::ActorImpl* self = SIMIX_process_self();
248   simix::simcall([self] { self->kill_all(); });
249 }
250
251 const std::unordered_map<std::string, std::string>* Actor::get_properties() const
252 {
253   return pimpl_->get_properties();
254 }
255
256 /** Retrieve the property value (or nullptr if not set) */
257 const char* Actor::get_property(const std::string& key) const
258 {
259   return pimpl_->get_property(key);
260 }
261
262 void Actor::set_property(const std::string& key, const std::string& value)
263 {
264   simix::simcall([this, &key, &value] { pimpl_->set_property(key, value); });
265 }
266
267 Actor* Actor::restart()
268 {
269   return simix::simcall([this]() { return pimpl_->restart(); });
270 }
271
272 // ***** this_actor *****
273
274 namespace this_actor {
275
276 /** Returns true if run from the kernel mode, and false if run from a real actor
277  *
278  * Everything that is run out of any actor (simulation setup before the engine is run,
279  * computing the model evolutions as a result to the actors' action, etc) is run in
280  * kernel mode, just as in any operating systems.
281  *
282  * In SimGrid, the actor in charge of doing the stuff in kernel mode is called Maestro,
283  * because it is the one scheduling when the others should move or wait.
284  */
285 bool is_maestro()
286 {
287   kernel::actor::ActorImpl* process = SIMIX_process_self();
288   return process == nullptr || process == simix_global->maestro_process;
289 }
290
291 void sleep_for(double duration)
292 {
293   if (duration > 0) {
294     kernel::actor::ActorImpl* actor = SIMIX_process_self();
295     Actor::on_sleep(*actor->ciface());
296
297     simcall_process_sleep(duration);
298
299     Actor::on_wake_up(*actor->ciface());
300   }
301 }
302
303 void yield()
304 {
305   simix::simcall([] { /* do nothing*/ });
306 }
307
308 XBT_PUBLIC void sleep_until(double wakeup_time)
309 {
310   double now = SIMIX_get_clock();
311   if (wakeup_time > now)
312     sleep_for(wakeup_time - now);
313 }
314
315 void execute(double flops)
316 {
317   execute(flops, 1.0 /* priority */);
318 }
319
320 void execute(double flops, double priority)
321 {
322   exec_init(flops)->set_priority(priority)->start()->wait();
323 }
324
325 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
326                       const std::vector<double>& bytes_amounts)
327 {
328   parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
329 }
330
331 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
332                       const std::vector<double>& bytes_amounts, double timeout)
333 {
334   xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
335   xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
336              "Host count (%zu) does not match flops_amount count (%zu).", hosts.size(), flops_amounts.size());
337   xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(),
338              "bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.",
339              hosts.size(), hosts.size(), flops_amounts.size());
340   /* Check that we are not mixing VMs and PMs in the parallel task */
341   bool is_a_vm = (nullptr != dynamic_cast<VirtualMachine*>(hosts.front()));
342   xbt_assert(std::all_of(hosts.begin(), hosts.end(),
343                          [is_a_vm](s4u::Host* elm) {
344                            bool tmp_is_a_vm = (nullptr != dynamic_cast<VirtualMachine*>(elm));
345                            return is_a_vm == tmp_is_a_vm;
346                          }),
347              "parallel_execute: mixing VMs and PMs is not supported (yet).");
348   /* checking for infinite values */
349   xbt_assert(std::all_of(flops_amounts.begin(), flops_amounts.end(), [](double elm) { return std::isfinite(elm); }),
350              "flops_amounts comprises infinite values!");
351   xbt_assert(std::all_of(bytes_amounts.begin(), bytes_amounts.end(), [](double elm) { return std::isfinite(elm); }),
352              "flops_amounts comprises infinite values!");
353
354   exec_init(hosts, flops_amounts, bytes_amounts)->set_timeout(timeout)->wait();
355 }
356
357 // deprecated
358 void parallel_execute(int host_nb, s4u::Host* const* host_list, const double* flops_amount, const double* bytes_amount,
359                       double timeout)
360 {
361   smx_activity_t s =
362       simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount, /* rate */ -1, timeout);
363   simcall_execution_wait(s);
364   delete[] flops_amount;
365   delete[] bytes_amount;
366 }
367
368 // deprecated
369 void parallel_execute(int host_nb, s4u::Host* const* host_list, const double* flops_amount, const double* bytes_amount)
370 {
371   smx_activity_t s = simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount,
372                                                       /* rate */ -1, /*timeout*/ -1);
373   simcall_execution_wait(s);
374   delete[] flops_amount;
375   delete[] bytes_amount;
376 }
377
378 ExecPtr exec_init(double flops_amount)
379 {
380   return ExecPtr(new ExecSeq(get_host(), flops_amount));
381 }
382
383 ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
384                   const std::vector<double>& bytes_amounts)
385 {
386   return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts));
387 }
388
389 ExecPtr exec_async(double flops)
390 {
391   ExecPtr res = exec_init(flops);
392   res->start();
393   return res;
394 }
395
396 aid_t get_pid()
397 {
398   return SIMIX_process_self()->get_pid();
399 }
400
401 aid_t get_ppid()
402 {
403   return SIMIX_process_self()->get_ppid();
404 }
405
406 std::string get_name()
407 {
408   return SIMIX_process_self()->get_name();
409 }
410
411 const char* get_cname()
412 {
413   return SIMIX_process_self()->get_cname();
414 }
415
416 Host* get_host()
417 {
418   return SIMIX_process_self()->get_host();
419 }
420
421 void suspend()
422 {
423   kernel::actor::ActorImpl* actor = SIMIX_process_self();
424   Actor::on_suspend(*actor->ciface());
425
426   simcall_process_suspend(actor);
427 }
428
429 void resume()
430 {
431   kernel::actor::ActorImpl* self = SIMIX_process_self();
432   simix::simcall([self] { self->resume(); });
433   Actor::on_resume(*self->ciface());
434 }
435
436 void exit()
437 {
438   kernel::actor::ActorImpl* self = SIMIX_process_self();
439   simgrid::simix::simcall([self] { self->exit(); });
440 }
441
442 void on_exit(const std::function<void(bool)>& fun)
443 {
444   SIMIX_process_self()->iface()->on_exit(fun);
445 }
446
447 void on_exit(const std::function<void(int, void*)>& fun, void* data) /* deprecated */
448 {
449   SIMIX_process_self()->iface()->on_exit([fun, data](bool exit) { fun(exit, data); });
450 }
451
452 /** @brief Moves the current actor to another host
453  *
454  * @see simgrid::s4u::Actor::migrate() for more information
455  */
456 void migrate(Host* new_host)
457 {
458   SIMIX_process_self()->iface()->migrate(new_host);
459 }
460
461 } // namespace this_actor
462 } // namespace s4u
463 } // namespace simgrid
464
465 /* **************************** Public C interface *************************** */
466
467 /** @ingroup m_actor_management
468  * @brief Returns the process ID of @a actor.
469  *
470  * This function checks whether @a actor is a valid pointer and return its PID (or 0 in case of problem).
471  */
472 aid_t sg_actor_get_PID(sg_actor_t actor)
473 {
474   /* Do not raise an exception here: this function is called by the logs
475    * and the exceptions, so it would be called back again and again */
476   if (actor == nullptr || actor->get_impl() == nullptr)
477     return 0;
478   return actor->get_pid();
479 }
480
481 /** @ingroup m_actor_management
482  * @brief Returns the process ID of the parent of @a actor.
483  *
484  * This function checks whether @a actor is a valid pointer and return its parent's PID.
485  * Returns -1 if the actor has not been created by any other actor.
486  */
487 aid_t sg_actor_get_PPID(sg_actor_t actor)
488 {
489   return actor->get_ppid();
490 }
491
492 /** @ingroup m_actor_management
493  *
494  * @brief Return a #sg_actor_t given its PID.
495  *
496  * This function search in the list of all the created sg_actor_t for a sg_actor_t  whose PID is equal to @a PID.
497  * If none is found, @c nullptr is returned.
498    Note that the PID are unique in the whole simulation, not only on a given host.
499  */
500 sg_actor_t sg_actor_by_PID(aid_t pid)
501 {
502   return simgrid::s4u::Actor::by_pid(pid).get();
503 }
504
505 /** @ingroup m_actor_management
506  * @brief Return the name of an actor.
507  */
508 const char* sg_actor_get_name(sg_actor_t actor)
509 {
510   return actor->get_cname();
511 }
512
513 sg_host_t sg_actor_get_host(sg_actor_t actor)
514 {
515   return actor->get_host();
516 }
517
518 /** @ingroup m_actor_management
519  * @brief Returns the value of a given actor property
520  *
521  * @param actor an actor
522  * @param name a property name
523  * @return value of a property (or nullptr if the property is not set)
524  */
525 const char* sg_actor_get_property_value(sg_actor_t actor, const char* name)
526 {
527   return actor->get_property(name);
528 }
529
530 /** @ingroup m_actor_management
531  * @brief Return the list of properties
532  *
533  * This function returns all the parameters associated with an actor
534  */
535 xbt_dict_t sg_actor_get_properties(sg_actor_t actor)
536 {
537   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
538   xbt_dict_t as_dict                        = xbt_dict_new_homogeneous(xbt_free_f);
539   const std::unordered_map<std::string, std::string>* props = actor->get_properties();
540   if (props == nullptr)
541     return nullptr;
542   for (auto const& kv : *props) {
543     xbt_dict_set(as_dict, kv.first.c_str(), xbt_strdup(kv.second.c_str()), nullptr);
544   }
545   return as_dict;
546 }
547
548 /** @ingroup m_actor_management
549  * @brief Suspend the actor.
550  *
551  * This function suspends the actor by suspending the task on which it was waiting for the completion.
552  */
553 void sg_actor_suspend(sg_actor_t actor)
554 {
555   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
556   actor->suspend();
557 }
558
559 /** @ingroup m_actor_management
560  * @brief Resume a suspended actor.
561  *
562  * This function resumes a suspended actor by resuming the task on which it was waiting for the completion.
563  */
564 void sg_actor_resume(sg_actor_t actor)
565 {
566   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
567   actor->resume();
568 }
569
570 /** @ingroup m_actor_management
571  * @brief Returns true if the actor is suspended .
572  *
573  * This checks whether an actor is suspended or not by inspecting the task on which it was waiting for the completion.
574  */
575 int sg_actor_is_suspended(sg_actor_t actor)
576 {
577   return actor->is_suspended();
578 }
579
580 /**
581  * @ingroup m_actor_management
582  * @brief Restarts an actor from the beginning.
583  */
584 sg_actor_t sg_actor_restart(sg_actor_t actor)
585 {
586   return actor->restart();
587 }
588
589 /**
590  * @ingroup m_actor_management
591  * @brief Sets the "auto-restart" flag of the actor.
592  * If the flag is set to 1, the actor will be automatically restarted when its host comes back up.
593  */
594 void sg_actor_set_auto_restart(sg_actor_t actor, int auto_restart)
595 {
596   actor->set_auto_restart(auto_restart);
597 }
598
599 /** @ingroup m_actor_management
600  * @brief This actor will be terminated automatically when the last non-daemon actor finishes
601  */
602 void sg_actor_daemonize(sg_actor_t actor)
603 {
604   actor->daemonize();
605 }
606
607 /** @ingroup m_actor_management
608  * @brief Migrates an actor to another location.
609  *
610  * This function changes the value of the #sg_host_t on  which @a actor is running.
611  */
612 void sg_actor_migrate(sg_actor_t process, sg_host_t host)
613 {
614   process->migrate(host);
615 }
616
617 /** @ingroup m_actor_management
618  * @brief Wait for the completion of a #sg_actor_t.
619  *
620  * @param actor the actor to wait for
621  * @param timeout wait until the actor is over, or the timeout expires
622  */
623 void sg_actor_join(sg_actor_t actor, double timeout)
624 {
625   actor->join(timeout);
626 }
627
628 void sg_actor_kill(sg_actor_t actor)
629 {
630   actor->kill();
631 }
632
633 void sg_actor_kill_all()
634 {
635   simgrid::s4u::Actor::kill_all();
636 }
637
638 /** @ingroup m_actor_management
639  * @brief Set the kill time of an actor.
640  *
641  * @param actor an actor
642  * @param kill_time the time when the actor is killed.
643  */
644 void sg_actor_set_kill_time(sg_actor_t actor, double kill_time)
645 {
646   actor->set_kill_time(kill_time);
647 }
648
649 /** Yield the current actor; let the other actors execute first */
650 void sg_actor_yield()
651 {
652   simgrid::s4u::this_actor::yield();
653 }
654
655 void sg_actor_sleep_for(double duration)
656 {
657   simgrid::s4u::this_actor::sleep_for(duration);
658 }
659
660 sg_actor_t sg_actor_attach(const char* name, void* data, sg_host_t host, xbt_dict_t properties)
661 {
662   xbt_assert(host != nullptr, "Invalid parameters: host and code params must not be nullptr");
663   std::unordered_map<std::string, std::string> props;
664   xbt_dict_cursor_t cursor = nullptr;
665   char* key;
666   char* value;
667   xbt_dict_foreach (properties, cursor, key, value)
668     props[key] = value;
669   xbt_dict_free(&properties);
670
671   /* Let's create the process: SIMIX may decide to start it right now, even before returning the flow control to us */
672   smx_actor_t actor = nullptr;
673   try {
674     actor = simgrid::kernel::actor::ActorImpl::attach(name, data, host, &props).get();
675   } catch (simgrid::HostFailureException const&) {
676     xbt_die("Could not attach");
677   }
678
679   simgrid::s4u::this_actor::yield();
680   return actor->ciface();
681 }
682
683 void sg_actor_detach()
684 {
685   simgrid::kernel::actor::ActorImpl::detach();
686 }
687
688 aid_t sg_actor_self_get_pid()
689 {
690   return simgrid::s4u::this_actor::get_pid();
691 }
692
693 aid_t sg_actor_self_get_ppid()
694 {
695   return simgrid::s4u::this_actor::get_ppid();
696 }
697
698 const char* sg_actor_self_get_name()
699 {
700   return simgrid::s4u::this_actor::get_cname();
701 }
702
703 sg_actor_t sg_actor_self()
704 {
705   return simgrid::s4u::Actor::self();
706 }
707
708 void sg_actor_self_execute(double flops)
709 {
710   simgrid::s4u::this_actor::execute(flops);
711 }
712
713 /** @brief Take an extra reference on that actor to prevent it to be garbage-collected */
714 void sg_actor_ref(sg_actor_t actor)
715 {
716   intrusive_ptr_add_ref(actor);
717 }
718 /** @brief Release a reference on that actor so that it can get be garbage-collected */
719 void sg_actor_unref(sg_actor_t actor)
720 {
721   intrusive_ptr_release(actor);
722 }