Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
remove or deprecate now useless code
[simgrid.git] / src / s4u / s4u_Actor.cpp
1 /* Copyright (c) 2006-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/Exception.hpp"
7 #include "simgrid/actor.h"
8 #include "simgrid/s4u/Actor.hpp"
9 #include "simgrid/s4u/Exec.hpp"
10 #include "simgrid/s4u/Host.hpp"
11 #include "simgrid/s4u/VirtualMachine.hpp"
12 #include "src/kernel/activity/ExecImpl.hpp"
13 #include "src/simix/smx_private.hpp"
14 #include "src/surf/HostImpl.hpp"
15
16 #include <algorithm>
17 #include <sstream>
18
19 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor, "S4U actors");
20
21 namespace simgrid {
22 namespace s4u {
23
24 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_creation;
25 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_suspend;
26 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_resume;
27 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_sleep;
28 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_wake_up;
29 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_migration_start;
30 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_migration_end;
31 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_destruction;
32
33 // ***** Actor creation *****
34 ActorPtr Actor::self()
35 {
36   smx_context_t self_context = simgrid::kernel::context::Context::self();
37   if (self_context == nullptr)
38     return simgrid::s4u::ActorPtr();
39
40   return self_context->get_actor()->iface();
41 }
42
43 ActorPtr Actor::create(std::string name, s4u::Host* host, std::function<void()> code)
44 {
45   simgrid::kernel::actor::ActorImpl* actor =
46       simcall_process_create(std::move(name), std::move(code), nullptr, host, nullptr);
47   return actor->iface();
48 }
49
50 ActorPtr Actor::create(std::string name, s4u::Host* host, const std::string& function, std::vector<std::string> args)
51 {
52   simgrid::simix::ActorCodeFactory& factory = SIMIX_get_actor_code_factory(function);
53   return create(std::move(name), host, factory(std::move(args)));
54 }
55
56 void intrusive_ptr_add_ref(Actor* actor)
57 {
58   intrusive_ptr_add_ref(actor->pimpl_);
59 }
60 void intrusive_ptr_release(Actor* actor)
61 {
62   intrusive_ptr_release(actor->pimpl_);
63 }
64
65 // ***** Actor methods *****
66
67 void Actor::join()
68 {
69   simcall_process_join(this->pimpl_, -1);
70 }
71
72 void Actor::join(double timeout)
73 {
74   simcall_process_join(this->pimpl_, timeout);
75 }
76
77 void Actor::set_auto_restart(bool autorestart)
78 {
79   simgrid::simix::simcall([this, autorestart]() {
80     xbt_assert(autorestart && not pimpl_->has_to_auto_restart()); // FIXME: handle all cases
81     pimpl_->set_auto_restart(autorestart);
82
83     simgrid::kernel::actor::ProcessArg* arg = new simgrid::kernel::actor::ProcessArg(pimpl_->get_host(), pimpl_);
84     XBT_DEBUG("Adding Process %s to the actors_at_boot_ list of Host %s", arg->name.c_str(), arg->host->get_cname());
85     pimpl_->get_host()->pimpl_->actors_at_boot_.emplace_back(arg);
86   });
87 }
88
89 void Actor::on_exit(int_f_pvoid_pvoid_t fun,
90                     void* data) /* deprecated: cleanup SIMIX_process_on_exit: change prototype of second parameter and
91                                    remove the last one */
92 {
93   simgrid::simix::simcall([this, fun, data] { SIMIX_process_on_exit(pimpl_, fun, data); });
94 }
95
96 void Actor::on_exit(std::function<void(bool /*failed*/)> const fun)
97 {
98   simgrid::simix::simcall(
99       [this, fun] { SIMIX_process_on_exit(pimpl_, [fun](int a, void* /*data*/) { fun(a != 0); }, nullptr); });
100 }
101
102 void Actor::migrate(Host* new_host)
103 {
104   s4u::Actor::on_migration_start(this);
105
106   simgrid::simix::simcall([this, new_host]() {
107     if (pimpl_->waiting_synchro != nullptr) {
108       // The actor is blocked on an activity. If it's an exec, migrate it too.
109       // FIXME: implement the migration of other kind of activities
110       simgrid::kernel::activity::ExecImplPtr exec =
111           boost::dynamic_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_->waiting_synchro);
112       xbt_assert(exec.get() != nullptr, "We can only migrate blocked actors when they are blocked on executions.");
113       exec->migrate(new_host);
114     }
115     this->pimpl_->set_host(new_host);
116   });
117
118   s4u::Actor::on_migration_end(this);
119 }
120
121 s4u::Host* Actor::get_host()
122 {
123   return this->pimpl_->get_host();
124 }
125
126 void Actor::daemonize()
127 {
128   simgrid::simix::simcall([this]() { pimpl_->daemonize(); });
129 }
130
131 bool Actor::is_daemon() const
132 {
133   return this->pimpl_->is_daemon();
134 }
135
136 const simgrid::xbt::string& Actor::get_name() const
137 {
138   return this->pimpl_->get_name();
139 }
140
141 const char* Actor::get_cname() const
142 {
143   return this->pimpl_->get_cname();
144 }
145
146 aid_t Actor::get_pid() const
147 {
148   return this->pimpl_->get_pid();
149 }
150
151 aid_t Actor::get_ppid() const
152 {
153   return this->pimpl_->get_ppid();
154 }
155
156 void Actor::suspend()
157 {
158   s4u::Actor::on_suspend(this);
159   simcall_process_suspend(pimpl_);
160 }
161
162 void Actor::resume()
163 {
164   simgrid::simix::simcall([this] { pimpl_->resume(); });
165   s4u::Actor::on_resume(this);
166 }
167
168 bool Actor::is_suspended()
169 {
170   return simgrid::simix::simcall([this] { return pimpl_->is_suspended(); });
171 }
172
173 void Actor::set_kill_time(double kill_time)
174 {
175   simgrid::simix::simcall([this, kill_time] { pimpl_->set_kill_time(kill_time); });
176 }
177
178 /** @brief Get the kill time of an actor(or 0 if unset). */
179 double Actor::get_kill_time()
180 {
181   return pimpl_->get_kill_time();
182 }
183
184 void Actor::kill(aid_t pid) // deprecated
185 {
186   smx_actor_t killer  = SIMIX_process_self();
187   smx_actor_t victim  = SIMIX_process_from_PID(pid);
188   if (victim != nullptr) {
189     simgrid::simix::simcall([killer, victim] { killer->kill(victim); });
190   } else {
191     std::ostringstream oss;
192     oss << "kill: (" << pid << ") - No such actor" << std::endl;
193     throw std::runtime_error(oss.str());
194   }
195 }
196
197 void Actor::kill()
198 {
199   smx_actor_t process = SIMIX_process_self();
200   simgrid::simix::simcall([this, process] {
201     if (pimpl_ == simix_global->maestro_process)
202       pimpl_->exit();
203     else
204       process->kill(pimpl_);
205   });
206 }
207
208 smx_actor_t Actor::get_impl()
209 {
210   return pimpl_;
211 }
212
213 // ***** Static functions *****
214
215 ActorPtr Actor::by_pid(aid_t pid)
216 {
217   smx_actor_t process = SIMIX_process_from_PID(pid);
218   if (process != nullptr)
219     return process->iface();
220   else
221     return ActorPtr();
222 }
223
224 void Actor::kill_all()
225 {
226   smx_actor_t self = SIMIX_process_self();
227   simgrid::simix::simcall([self] { self->kill_all(); });
228 }
229
230 std::unordered_map<std::string, std::string>* Actor::get_properties()
231 {
232   return simgrid::simix::simcall([this] { return this->pimpl_->get_properties(); });
233 }
234
235 /** Retrieve the property value (or nullptr if not set) */
236 const char* Actor::get_property(const std::string& key)
237 {
238   return simgrid::simix::simcall([this, key] { return pimpl_->get_property(key); });
239 }
240
241 void Actor::set_property(const std::string& key, std::string value)
242 {
243   simgrid::simix::simcall([this, key, value] { pimpl_->set_property(key, std::move(value)); });
244 }
245
246 Actor* Actor::restart()
247 {
248   return simgrid::simix::simcall([this]() { return pimpl_->restart(); });
249 }
250
251 // ***** this_actor *****
252
253 namespace this_actor {
254
255 /** Returns true if run from the kernel mode, and false if run from a real actor
256  *
257  * Everything that is run out of any actor (simulation setup before the engine is run,
258  * computing the model evolutions as a result to the actors' action, etc) is run in
259  * kernel mode, just as in any operating systems.
260  *
261  * In SimGrid, the actor in charge of doing the stuff in kernel mode is called Maestro,
262  * because it is the one scheduling when the others should move or wait.
263  */
264 bool is_maestro()
265 {
266   smx_actor_t process = SIMIX_process_self();
267   return process == nullptr || process == simix_global->maestro_process;
268 }
269
270 void sleep_for(double duration)
271 {
272   if (duration > 0) {
273     smx_actor_t actor = SIMIX_process_self();
274     simgrid::s4u::Actor::on_sleep(actor->iface());
275
276     simcall_process_sleep(duration);
277
278     simgrid::s4u::Actor::on_wake_up(actor->iface());
279   }
280 }
281
282 void yield()
283 {
284   simgrid::simix::simcall([] { /* do nothing*/ });
285 }
286
287 XBT_PUBLIC void sleep_until(double timeout)
288 {
289   double now = SIMIX_get_clock();
290   if (timeout > now)
291     sleep_for(timeout - now);
292 }
293
294 void execute(double flops)
295 {
296   execute(flops, 1.0 /* priority */);
297 }
298
299 void execute(double flops, double priority)
300 {
301   exec_init(flops)->set_priority(priority)->start()->wait();
302 }
303
304 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
305                       const std::vector<double>& bytes_amounts)
306 {
307   parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
308 }
309
310 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
311                       const std::vector<double>& bytes_amounts, double timeout)
312 {
313   xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
314   xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
315              "Host count (%zu) does not match flops_amount count (%zu).", hosts.size(), flops_amounts.size());
316   xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(),
317              "bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.",
318              hosts.size(), hosts.size(), flops_amounts.size());
319   /* Check that we are not mixing VMs and PMs in the parallel task */
320   bool is_a_vm = (nullptr != dynamic_cast<simgrid::s4u::VirtualMachine*>(hosts.front()));
321   xbt_assert(std::all_of(hosts.begin(), hosts.end(),
322                          [is_a_vm](s4u::Host* elm) {
323                            bool tmp_is_a_vm = (nullptr != dynamic_cast<simgrid::s4u::VirtualMachine*>(elm));
324                            return is_a_vm == tmp_is_a_vm;
325                          }),
326              "parallel_execute: mixing VMs and PMs is not supported (yet).");
327   /* checking for infinite values */
328   xbt_assert(std::all_of(flops_amounts.begin(), flops_amounts.end(), [](double elm) { return std::isfinite(elm); }),
329              "flops_amounts comprises infinite values!");
330   xbt_assert(std::all_of(bytes_amounts.begin(), bytes_amounts.end(), [](double elm) { return std::isfinite(elm); }),
331              "flops_amounts comprises infinite values!");
332
333   exec_init(hosts, flops_amounts, bytes_amounts)->set_timeout(timeout)->wait();
334 }
335
336 // deprecated
337 void parallel_execute(int host_nb, s4u::Host* const* host_list, const double* flops_amount, const double* bytes_amount,
338                       double timeout)
339 {
340   smx_activity_t s =
341       simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount, /* rate */ -1, timeout);
342   simcall_execution_wait(s);
343   delete[] flops_amount;
344   delete[] bytes_amount;
345 }
346
347 // deprecated
348 void parallel_execute(int host_nb, s4u::Host* const* host_list, const double* flops_amount, const double* bytes_amount)
349 {
350   smx_activity_t s = simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount,
351                                                       /* rate */ -1, /*timeout*/ -1);
352   simcall_execution_wait(s);
353   delete[] flops_amount;
354   delete[] bytes_amount;
355 }
356
357 ExecPtr exec_init(double flops_amount)
358 {
359   return ExecPtr(new ExecSeq(get_host(), flops_amount));
360 }
361
362 ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
363                   const std::vector<double>& bytes_amounts)
364 {
365   return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts));
366 }
367
368 ExecPtr exec_async(double flops)
369 {
370   ExecPtr res = exec_init(flops);
371   res->start();
372   return res;
373 }
374
375 aid_t get_pid()
376 {
377   return SIMIX_process_self()->get_pid();
378 }
379
380 aid_t get_ppid()
381 {
382   return SIMIX_process_self()->get_ppid();
383 }
384
385 std::string get_name()
386 {
387   return SIMIX_process_self()->get_name();
388 }
389
390 const char* get_cname()
391 {
392   return SIMIX_process_self()->get_cname();
393 }
394
395 Host* get_host()
396 {
397   return SIMIX_process_self()->get_host();
398 }
399
400 void suspend()
401 {
402   smx_actor_t actor = SIMIX_process_self();
403   simgrid::s4u::Actor::on_suspend(actor->iface());
404
405   simcall_process_suspend(actor);
406 }
407
408 void resume()
409 {
410   smx_actor_t process = SIMIX_process_self();
411   simgrid::simix::simcall([process] { process->resume(); });
412   simgrid::s4u::Actor::on_resume(process->iface());
413 }
414
415 void exit()
416 {
417   smx_actor_t actor = SIMIX_process_self();
418   simgrid::simix::simcall([actor] { actor->exit(); });
419 }
420
421 void on_exit(std::function<void(bool)> const fun)
422 {
423   SIMIX_process_self()->iface()->on_exit(fun);
424 }
425
426 void on_exit(std::function<void(int, void*)> const fun, void* data) /* deprecated */
427 {
428   SIMIX_process_self()->iface()->on_exit([fun, data](bool exit) { fun(exit, data); });
429 }
430
431 /** @brief Moves the current actor to another host
432  *
433  * @see simgrid::s4u::Actor::migrate() for more information
434  */
435 void migrate(Host* new_host)
436 {
437   SIMIX_process_self()->iface()->migrate(new_host);
438 }
439
440 std::string getName() /* deprecated */
441 {
442   return get_name();
443 }
444 const char* getCname() /* deprecated */
445 {
446   return get_cname();
447 }
448 bool isMaestro() /* deprecated */
449 {
450   return is_maestro();
451 }
452 aid_t getPid() /* deprecated */
453 {
454   return get_pid();
455 }
456 aid_t getPpid() /* deprecated */
457 {
458   return get_ppid();
459 }
460 Host* getHost() /* deprecated */
461 {
462   return get_host();
463 }
464 void on_exit(int_f_pvoid_pvoid_t fun, void* data) /* deprecated */
465 {
466   SIMIX_process_self()->iface()->on_exit([fun, data](int a) { fun((void*)(intptr_t)a, data); });
467 }
468 void onExit(int_f_pvoid_pvoid_t fun, void* data) /* deprecated */
469 {
470   on_exit([fun, data](int a) { fun((void*)(intptr_t)a, data); });
471 }
472 void kill() /* deprecated */
473 {
474   exit();
475 }
476
477 } // namespace this_actor
478 } // namespace s4u
479 } // namespace simgrid
480
481 /* **************************** Public C interface *************************** */
482
483 /** @ingroup m_actor_management
484  * @brief Returns the process ID of @a actor.
485  *
486  * This function checks whether @a actor is a valid pointer and return its PID (or 0 in case of problem).
487  */
488 aid_t sg_actor_get_PID(sg_actor_t actor)
489 {
490   /* Do not raise an exception here: this function is called by the logs
491    * and the exceptions, so it would be called back again and again */
492   if (actor == nullptr || actor->get_impl() == nullptr)
493     return 0;
494   return actor->get_pid();
495 }
496
497 /** @ingroup m_actor_management
498  * @brief Returns the process ID of the parent of @a actor.
499  *
500  * This function checks whether @a actor is a valid pointer and return its parent's PID.
501  * Returns -1 if the actor has not been created by any other actor.
502  */
503 aid_t sg_actor_get_PPID(sg_actor_t actor)
504 {
505   return actor->get_ppid();
506 }
507
508 /** @ingroup m_actor_management
509  *
510  * @brief Return a #sg_actor_t given its PID.
511  *
512  * This function search in the list of all the created sg_actor_t for a sg_actor_t  whose PID is equal to @a PID.
513  * If none is found, @c nullptr is returned.
514    Note that the PID are unique in the whole simulation, not only on a given host.
515  */
516 sg_actor_t sg_actor_by_PID(aid_t pid)
517 {
518   return simgrid::s4u::Actor::by_pid(pid).get();
519 }
520
521 /** @ingroup m_actor_management
522  * @brief Return the name of an actor.
523  */
524 const char* sg_actor_get_name(sg_actor_t actor)
525 {
526   return actor->get_cname();
527 }
528
529 sg_host_t sg_actor_get_host(sg_actor_t actor)
530 {
531   return actor->get_host();
532 }
533
534 /** @ingroup m_actor_management
535  * @brief Returns the value of a given actor property
536  *
537  * @param actor an actor
538  * @param name a property name
539  * @return value of a property (or nullptr if the property is not set)
540  */
541 const char* sg_actor_get_property_value(sg_actor_t actor, const char* name)
542 {
543   return actor->get_property(name);
544 }
545
546 /** @ingroup m_actor_management
547  * @brief Return the list of properties
548  *
549  * This function returns all the parameters associated with an actor
550  */
551 xbt_dict_t sg_actor_get_properties(sg_actor_t actor)
552 {
553   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
554   xbt_dict_t as_dict                        = xbt_dict_new_homogeneous(xbt_free_f);
555   std::unordered_map<std::string, std::string>* props = actor->get_properties();
556   if (props == nullptr)
557     return nullptr;
558   for (auto const& kv : *props) {
559     xbt_dict_set(as_dict, kv.first.c_str(), xbt_strdup(kv.second.c_str()), nullptr);
560   }
561   return as_dict;
562 }
563
564 /** @ingroup m_actor_management
565  * @brief Suspend the actor.
566  *
567  * This function suspends the actor by suspending the task on which it was waiting for the completion.
568  */
569 void sg_actor_suspend(sg_actor_t actor)
570 {
571   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
572   actor->suspend();
573 }
574
575 /** @ingroup m_actor_management
576  * @brief Resume a suspended actor.
577  *
578  * This function resumes a suspended actor by resuming the task on which it was waiting for the completion.
579  */
580 void sg_actor_resume(sg_actor_t actor)
581 {
582   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
583   actor->resume();
584 }
585
586 /** @ingroup m_actor_management
587  * @brief Returns true if the actor is suspended .
588  *
589  * This checks whether an actor is suspended or not by inspecting the task on which it was waiting for the completion.
590  */
591 int sg_actor_is_suspended(sg_actor_t actor)
592 {
593   return actor->is_suspended();
594 }
595
596 /**
597  * @ingroup m_actor_management
598  * @brief Restarts an actor from the beginning.
599  */
600 sg_actor_t sg_actor_restart(sg_actor_t actor)
601 {
602   return actor->restart();
603 }
604
605 /**
606  * @ingroup m_actor_management
607  * @brief Sets the "auto-restart" flag of the actor.
608  * If the flag is set to 1, the actor will be automatically restarted when its host comes back up.
609  */
610 void sg_actor_set_auto_restart(sg_actor_t actor, int auto_restart)
611 {
612   actor->set_auto_restart(auto_restart);
613 }
614
615 /** @ingroup m_actor_management
616  * @brief This actor will be terminated automatically when the last non-daemon actor finishes
617  */
618 void sg_actor_daemonize(sg_actor_t actor)
619 {
620   actor->daemonize();
621 }
622
623 /** @ingroup m_actor_management
624  * @brief Migrates an actor to another location.
625  *
626  * This function changes the value of the #sg_host_t on  which @a actor is running.
627  */
628 void sg_actor_migrate(sg_actor_t process, sg_host_t host)
629 {
630   process->migrate(host);
631 }
632
633 /** @ingroup m_actor_management
634  * @brief Wait for the completion of a #sg_actor_t.
635  *
636  * @param actor the actor to wait for
637  * @param timeout wait until the actor is over, or the timeout expires
638  */
639 void sg_actor_join(sg_actor_t actor, double timeout)
640 {
641   actor->join(timeout);
642 }
643
644 void sg_actor_kill(sg_actor_t actor)
645 {
646   actor->kill();
647 }
648
649 void sg_actor_kill_all()
650 {
651   simgrid::s4u::Actor::kill_all();
652 }
653
654 /** @ingroup m_actor_management
655  * @brief Set the kill time of an actor.
656  *
657  * @param actor an actor
658  * @param kill_time the time when the actor is killed.
659  */
660 void sg_actor_set_kill_time(sg_actor_t actor, double kill_time)
661 {
662   actor->set_kill_time(kill_time);
663 }
664
665 /** Yield the current actor; let the other actors execute first */
666 void sg_actor_yield()
667 {
668   simgrid::s4u::this_actor::yield();
669 }
670
671 void sg_actor_sleep_for(double duration)
672 {
673   simgrid::s4u::this_actor::sleep_for(duration);
674 }
675
676 sg_actor_t sg_actor_attach(const char* name, void* data, sg_host_t host, xbt_dict_t properties)
677 {
678   xbt_assert(host != nullptr, "Invalid parameters: host and code params must not be nullptr");
679   std::unordered_map<std::string, std::string> props;
680   xbt_dict_cursor_t cursor = nullptr;
681   char* key;
682   char* value;
683   xbt_dict_foreach (properties, cursor, key, value)
684     props[key] = value;
685   xbt_dict_free(&properties);
686
687   /* Let's create the process: SIMIX may decide to start it right now, even before returning the flow control to us */
688   smx_actor_t actor = nullptr;
689   try {
690     actor = simgrid::kernel::actor::ActorImpl::attach(name, data, host, &props).get();
691   } catch (simgrid::HostFailureException const&) {
692     xbt_die("Could not attach");
693   }
694
695   simgrid::s4u::this_actor::yield();
696   return actor->ciface();
697 }
698
699 void sg_actor_detach()
700 {
701   simgrid::kernel::actor::ActorImpl::detach();
702 }