Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Sanitize the prototype of Actor::on_exit() callbacks
[simgrid.git] / src / s4u / s4u_Actor.cpp
1 /* Copyright (c) 2006-2019. The SimGrid Team. All rights reserved.          */
2
3 /* This program is free software; you can redistribute it and/or modify it
4  * under the terms of the license (GNU LGPL) which comes with this package. */
5
6 #include "simgrid/Exception.hpp"
7 #include "simgrid/actor.h"
8 #include "simgrid/s4u/Actor.hpp"
9 #include "simgrid/s4u/Exec.hpp"
10 #include "simgrid/s4u/Host.hpp"
11 #include "src/kernel/activity/ExecImpl.hpp"
12 #include "src/simix/smx_host_private.hpp"
13 #include "src/simix/smx_private.hpp"
14 #include "src/surf/HostImpl.hpp"
15
16 #include <sstream>
17
18 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor, "S4U actors");
19
20 namespace simgrid {
21 namespace s4u {
22
23 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_creation;
24 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_suspend;
25 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_resume;
26 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_sleep;
27 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_wake_up;
28 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_migration_start;
29 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_migration_end;
30 simgrid::xbt::signal<void(simgrid::s4u::ActorPtr)> s4u::Actor::on_destruction;
31
32 // ***** Actor creation *****
33 ActorPtr Actor::self()
34 {
35   smx_context_t self_context = simgrid::kernel::context::Context::self();
36   if (self_context == nullptr)
37     return simgrid::s4u::ActorPtr();
38
39   return self_context->get_actor()->iface();
40 }
41
42 ActorPtr Actor::create(std::string name, s4u::Host* host, std::function<void()> code)
43 {
44   simgrid::kernel::actor::ActorImpl* actor =
45       simcall_process_create(std::move(name), std::move(code), nullptr, host, nullptr);
46   return actor->iface();
47 }
48
49 ActorPtr Actor::create(std::string name, s4u::Host* host, const std::string& function, std::vector<std::string> args)
50 {
51   simgrid::simix::ActorCodeFactory& factory = SIMIX_get_actor_code_factory(function);
52   return create(std::move(name), host, factory(std::move(args)));
53 }
54
55 void intrusive_ptr_add_ref(Actor* actor)
56 {
57   intrusive_ptr_add_ref(actor->pimpl_);
58 }
59 void intrusive_ptr_release(Actor* actor)
60 {
61   intrusive_ptr_release(actor->pimpl_);
62 }
63
64 // ***** Actor methods *****
65
66 void Actor::join()
67 {
68   simcall_process_join(this->pimpl_, -1);
69 }
70
71 void Actor::join(double timeout)
72 {
73   simcall_process_join(this->pimpl_, timeout);
74 }
75
76 void Actor::set_auto_restart(bool autorestart)
77 {
78   simgrid::simix::simcall([this, autorestart]() {
79     xbt_assert(autorestart && not pimpl_->has_to_auto_restart()); // FIXME: handle all cases
80     pimpl_->set_auto_restart(autorestart);
81
82     simgrid::kernel::actor::ProcessArg* arg = new simgrid::kernel::actor::ProcessArg(pimpl_->get_host(), pimpl_);
83     XBT_DEBUG("Adding Process %s to the actors_at_boot_ list of Host %s", arg->name.c_str(), arg->host->get_cname());
84     pimpl_->get_host()->pimpl_->actors_at_boot_.emplace_back(arg);
85   });
86 }
87
88 void Actor::on_exit(int_f_pvoid_pvoid_t fun,
89                     void* data) /* deprecated: cleanup SIMIX_process_on_exit: change prototype of second parameter and
90                                    remove the last one */
91 {
92   simgrid::simix::simcall([this, fun, data] { SIMIX_process_on_exit(pimpl_, fun, data); });
93 }
94
95 void Actor::on_exit(std::function<void(bool /*failed*/)> fun)
96 {
97   simgrid::simix::simcall(
98       [this, fun] { SIMIX_process_on_exit(pimpl_, [fun](int a, void* data) { fun(a != 0); }, nullptr); });
99 }
100
101 void Actor::migrate(Host* new_host)
102 {
103   s4u::Actor::on_migration_start(this);
104
105   simgrid::simix::simcall([this, new_host]() {
106     if (pimpl_->waiting_synchro != nullptr) {
107       // The actor is blocked on an activity. If it's an exec, migrate it too.
108       // FIXME: implement the migration of other kind of activities
109       simgrid::kernel::activity::ExecImplPtr exec =
110           boost::dynamic_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_->waiting_synchro);
111       xbt_assert(exec.get() != nullptr, "We can only migrate blocked actors when they are blocked on executions.");
112       exec->migrate(new_host);
113     }
114     this->pimpl_->set_host(new_host);
115   });
116
117   s4u::Actor::on_migration_end(this);
118 }
119
120 s4u::Host* Actor::get_host()
121 {
122   return this->pimpl_->get_host();
123 }
124
125 void Actor::daemonize()
126 {
127   simgrid::simix::simcall([this]() { pimpl_->daemonize(); });
128 }
129
130 bool Actor::is_daemon() const
131 {
132   return this->pimpl_->is_daemon();
133 }
134
135 const simgrid::xbt::string& Actor::get_name() const
136 {
137   return this->pimpl_->get_name();
138 }
139
140 const char* Actor::get_cname() const
141 {
142   return this->pimpl_->get_cname();
143 }
144
145 aid_t Actor::get_pid() const
146 {
147   return this->pimpl_->get_pid();
148 }
149
150 aid_t Actor::get_ppid() const
151 {
152   return this->pimpl_->get_ppid();
153 }
154
155 void Actor::suspend()
156 {
157   s4u::Actor::on_suspend(this);
158   simcall_process_suspend(pimpl_);
159 }
160
161 void Actor::resume()
162 {
163   simgrid::simix::simcall([this] { pimpl_->resume(); });
164   s4u::Actor::on_resume(this);
165 }
166
167 bool Actor::is_suspended()
168 {
169   return simgrid::simix::simcall([this] { return pimpl_->is_suspended(); });
170 }
171
172 void Actor::set_kill_time(double kill_time)
173 {
174   simgrid::simix::simcall([this, kill_time] { pimpl_->set_kill_time(kill_time); });
175 }
176
177 /** @brief Get the kill time of an actor(or 0 if unset). */
178 double Actor::get_kill_time()
179 {
180   return pimpl_->get_kill_time();
181 }
182
183 void Actor::kill(aid_t pid) // deprecated
184 {
185   smx_actor_t killer  = SIMIX_process_self();
186   smx_actor_t victim  = SIMIX_process_from_PID(pid);
187   if (victim != nullptr) {
188     simgrid::simix::simcall([killer, victim] { killer->kill(victim); });
189   } else {
190     std::ostringstream oss;
191     oss << "kill: (" << pid << ") - No such actor" << std::endl;
192     throw std::runtime_error(oss.str());
193   }
194 }
195
196 void Actor::kill()
197 {
198   smx_actor_t process = SIMIX_process_self();
199   simgrid::simix::simcall([this, process] {
200     if (pimpl_ == simix_global->maestro_process)
201       pimpl_->exit();
202     else
203       process->kill(pimpl_);
204   });
205 }
206
207 smx_actor_t Actor::get_impl()
208 {
209   return pimpl_;
210 }
211
212 // ***** Static functions *****
213
214 ActorPtr Actor::by_pid(aid_t pid)
215 {
216   smx_actor_t process = SIMIX_process_from_PID(pid);
217   if (process != nullptr)
218     return process->iface();
219   else
220     return ActorPtr();
221 }
222
223 void Actor::kill_all()
224 {
225   smx_actor_t self = SIMIX_process_self();
226   simgrid::simix::simcall([self] { self->kill_all(); });
227 }
228
229 std::unordered_map<std::string, std::string>* Actor::get_properties()
230 {
231   return simgrid::simix::simcall([this] { return this->pimpl_->get_properties(); });
232 }
233
234 /** Retrieve the property value (or nullptr if not set) */
235 const char* Actor::get_property(const std::string& key)
236 {
237   return simgrid::simix::simcall([this, key] { return pimpl_->get_property(key); });
238 }
239
240 void Actor::set_property(const std::string& key, std::string value)
241 {
242   simgrid::simix::simcall([this, key, value] { pimpl_->set_property(key, std::move(value)); });
243 }
244
245 Actor* Actor::restart()
246 {
247   return simgrid::simix::simcall([this]() { return pimpl_->restart(); });
248 }
249
250 // ***** this_actor *****
251
252 namespace this_actor {
253
254 /** Returns true if run from the kernel mode, and false if run from a real actor
255  *
256  * Everything that is run out of any actor (simulation setup before the engine is run,
257  * computing the model evolutions as a result to the actors' action, etc) is run in
258  * kernel mode, just as in any operating systems.
259  *
260  * In SimGrid, the actor in charge of doing the stuff in kernel mode is called Maestro,
261  * because it is the one scheduling when the others should move or wait.
262  */
263 bool is_maestro()
264 {
265   smx_actor_t process = SIMIX_process_self();
266   return process == nullptr || process == simix_global->maestro_process;
267 }
268
269 void sleep_for(double duration)
270 {
271   if (duration > 0) {
272     smx_actor_t actor = SIMIX_process_self();
273     simgrid::s4u::Actor::on_sleep(actor->iface());
274
275     simcall_process_sleep(duration);
276
277     simgrid::s4u::Actor::on_wake_up(actor->iface());
278   }
279 }
280
281 void yield()
282 {
283   simgrid::simix::simcall([] { /* do nothing*/ });
284 }
285
286 XBT_PUBLIC void sleep_until(double timeout)
287 {
288   double now = SIMIX_get_clock();
289   if (timeout > now)
290     sleep_for(timeout - now);
291 }
292
293 void execute(double flops)
294 {
295   execute(flops, 1.0 /* priority */);
296 }
297
298 void execute(double flops, double priority)
299 {
300   exec_init(flops)->set_priority(priority)->start()->wait();
301 }
302
303 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
304                       const std::vector<double>& bytes_amounts)
305 {
306   parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
307 }
308
309 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
310                       const std::vector<double>& bytes_amounts, double timeout)
311 {
312   xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
313   xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
314              "Host count (%zu) does not match flops_amount count (%zu).", hosts.size(), flops_amounts.size());
315   xbt_assert(hosts.size() * hosts.size() == bytes_amounts.size() || bytes_amounts.empty(),
316              "bytes_amounts must be a matrix of size host_count * host_count (%zu*%zu), but it's of size %zu.",
317              hosts.size(), hosts.size(), flops_amounts.size());
318
319   /* The vectors live as parameter of parallel_execute. No copy is created for simcall_execution_parallel_start(),
320    * but that's OK because simcall_execution_wait() is called from here too.
321    */
322   smx_activity_t s = simcall_execution_parallel_start("", hosts.size(), hosts.data(),
323                                                       (flops_amounts.empty() ? nullptr : flops_amounts.data()),
324                                                       (bytes_amounts.empty() ? nullptr : bytes_amounts.data()),
325                                                       /* rate */ -1, timeout);
326   simcall_execution_wait(s);
327 }
328
329 // deprecated
330 void parallel_execute(int host_nb, s4u::Host* const* host_list, const double* flops_amount, const double* bytes_amount,
331                       double timeout)
332 {
333   smx_activity_t s =
334       simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount, /* rate */ -1, timeout);
335   simcall_execution_wait(s);
336   delete[] flops_amount;
337   delete[] bytes_amount;
338 }
339
340 // deprecated
341 void parallel_execute(int host_nb, s4u::Host* const* host_list, const double* flops_amount, const double* bytes_amount)
342 {
343   smx_activity_t s = simcall_execution_parallel_start("", host_nb, host_list, flops_amount, bytes_amount,
344                                                       /* rate */ -1, /*timeout*/ -1);
345   simcall_execution_wait(s);
346   delete[] flops_amount;
347   delete[] bytes_amount;
348 }
349
350 ExecPtr exec_init(double flops_amount)
351 {
352   return ExecPtr(new Exec(get_host(), flops_amount));
353 }
354
355 ExecPtr exec_async(double flops)
356 {
357   ExecPtr res = exec_init(flops);
358   res->start();
359   return res;
360 }
361
362 aid_t get_pid()
363 {
364   return SIMIX_process_self()->get_pid();
365 }
366
367 aid_t get_ppid()
368 {
369   return SIMIX_process_self()->get_ppid();
370 }
371
372 std::string get_name()
373 {
374   return SIMIX_process_self()->get_name();
375 }
376
377 const char* get_cname()
378 {
379   return SIMIX_process_self()->get_cname();
380 }
381
382 Host* get_host()
383 {
384   return SIMIX_process_self()->get_host();
385 }
386
387 void suspend()
388 {
389   smx_actor_t actor = SIMIX_process_self();
390   simgrid::s4u::Actor::on_suspend(actor->iface());
391
392   simcall_process_suspend(actor);
393 }
394
395 void resume()
396 {
397   smx_actor_t process = SIMIX_process_self();
398   simgrid::simix::simcall([process] { process->resume(); });
399   simgrid::s4u::Actor::on_resume(process->iface());
400 }
401
402 void exit()
403 {
404   smx_actor_t actor = SIMIX_process_self();
405   simgrid::simix::simcall([actor] { actor->exit(); });
406 }
407
408 void on_exit(std::function<void(bool)> fun)
409 {
410   SIMIX_process_self()->iface()->on_exit(fun);
411 }
412
413 void on_exit(std::function<void(int, void*)> fun, void* data) /* deprecated */
414 {
415   SIMIX_process_self()->iface()->on_exit([fun, data](bool exit) { fun(exit, data); });
416 }
417
418 /** @brief Moves the current actor to another host
419  *
420  * @see simgrid::s4u::Actor::migrate() for more information
421  */
422 void migrate(Host* new_host)
423 {
424   SIMIX_process_self()->iface()->migrate(new_host);
425 }
426
427 std::string getName() /* deprecated */
428 {
429   return get_name();
430 }
431 const char* getCname() /* deprecated */
432 {
433   return get_cname();
434 }
435 bool isMaestro() /* deprecated */
436 {
437   return is_maestro();
438 }
439 aid_t getPid() /* deprecated */
440 {
441   return get_pid();
442 }
443 aid_t getPpid() /* deprecated */
444 {
445   return get_ppid();
446 }
447 Host* getHost() /* deprecated */
448 {
449   return get_host();
450 }
451 void on_exit(int_f_pvoid_pvoid_t fun, void* data) /* deprecated */
452 {
453   SIMIX_process_self()->iface()->on_exit([fun, data](int a) { fun((void*)(intptr_t)a, data); });
454 }
455 void onExit(int_f_pvoid_pvoid_t fun, void* data) /* deprecated */
456 {
457   on_exit([fun, data](int a) { fun((void*)(intptr_t)a, data); });
458 }
459 void kill() /* deprecated */
460 {
461   exit();
462 }
463
464 } // namespace this_actor
465 } // namespace s4u
466 } // namespace simgrid
467
468 /* **************************** Public C interface *************************** */
469
470 /** @ingroup m_actor_management
471  * @brief Returns the process ID of @a actor.
472  *
473  * This function checks whether @a actor is a valid pointer and return its PID (or 0 in case of problem).
474  */
475 aid_t sg_actor_get_PID(sg_actor_t actor)
476 {
477   /* Do not raise an exception here: this function is called by the logs
478    * and the exceptions, so it would be called back again and again */
479   if (actor == nullptr || actor->get_impl() == nullptr)
480     return 0;
481   return actor->get_pid();
482 }
483
484 /** @ingroup m_actor_management
485  * @brief Returns the process ID of the parent of @a actor.
486  *
487  * This function checks whether @a actor is a valid pointer and return its parent's PID.
488  * Returns -1 if the actor has not been created by any other actor.
489  */
490 aid_t sg_actor_get_PPID(sg_actor_t actor)
491 {
492   return actor->get_ppid();
493 }
494
495 /** @ingroup m_actor_management
496  *
497  * @brief Return a #sg_actor_t given its PID.
498  *
499  * This function search in the list of all the created sg_actor_t for a sg_actor_t  whose PID is equal to @a PID.
500  * If none is found, @c nullptr is returned.
501    Note that the PID are unique in the whole simulation, not only on a given host.
502  */
503 sg_actor_t sg_actor_by_PID(aid_t pid)
504 {
505   return simgrid::s4u::Actor::by_pid(pid).get();
506 }
507
508 /** @ingroup m_actor_management
509  * @brief Return the name of an actor.
510  */
511 const char* sg_actor_get_name(sg_actor_t actor)
512 {
513   return actor->get_cname();
514 }
515
516 sg_host_t sg_actor_get_host(sg_actor_t actor)
517 {
518   return actor->get_host();
519 }
520
521 /** @ingroup m_actor_management
522  * @brief Returns the value of a given actor property
523  *
524  * @param actor an actor
525  * @param name a property name
526  * @return value of a property (or nullptr if the property is not set)
527  */
528 const char* sg_actor_get_property_value(sg_actor_t actor, const char* name)
529 {
530   return actor->get_property(name);
531 }
532
533 /** @ingroup m_actor_management
534  * @brief Return the list of properties
535  *
536  * This function returns all the parameters associated with an actor
537  */
538 xbt_dict_t sg_actor_get_properties(sg_actor_t actor)
539 {
540   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
541   xbt_dict_t as_dict                        = xbt_dict_new_homogeneous(xbt_free_f);
542   std::unordered_map<std::string, std::string>* props = actor->get_properties();
543   if (props == nullptr)
544     return nullptr;
545   for (auto const& kv : *props) {
546     xbt_dict_set(as_dict, kv.first.c_str(), xbt_strdup(kv.second.c_str()), nullptr);
547   }
548   return as_dict;
549 }
550
551 /** @ingroup m_actor_management
552  * @brief Suspend the actor.
553  *
554  * This function suspends the actor by suspending the task on which it was waiting for the completion.
555  */
556 void sg_actor_suspend(sg_actor_t actor)
557 {
558   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
559   actor->suspend();
560 }
561
562 /** @ingroup m_actor_management
563  * @brief Resume a suspended actor.
564  *
565  * This function resumes a suspended actor by resuming the task on which it was waiting for the completion.
566  */
567 void sg_actor_resume(sg_actor_t actor)
568 {
569   xbt_assert(actor != nullptr, "Invalid parameter: First argument must not be nullptr");
570   actor->resume();
571 }
572
573 /** @ingroup m_actor_management
574  * @brief Returns true if the actor is suspended .
575  *
576  * This checks whether an actor is suspended or not by inspecting the task on which it was waiting for the completion.
577  */
578 int sg_actor_is_suspended(sg_actor_t actor)
579 {
580   return actor->is_suspended();
581 }
582
583 /**
584  * @ingroup m_actor_management
585  * @brief Restarts an actor from the beginning.
586  */
587 sg_actor_t sg_actor_restart(sg_actor_t actor)
588 {
589   return actor->restart();
590 }
591
592 /**
593  * @ingroup m_actor_management
594  * @brief Sets the "auto-restart" flag of the actor.
595  * If the flag is set to 1, the actor will be automatically restarted when its host comes back up.
596  */
597 void sg_actor_set_auto_restart(sg_actor_t actor, int auto_restart)
598 {
599   actor->set_auto_restart(auto_restart);
600 }
601
602 /** @ingroup m_actor_management
603  * @brief This actor will be terminated automatically when the last non-daemon actor finishes
604  */
605 void sg_actor_daemonize(sg_actor_t actor)
606 {
607   actor->daemonize();
608 }
609
610 /** @ingroup m_actor_management
611  * @brief Migrates an actor to another location.
612  *
613  * This function changes the value of the #sg_host_t on  which @a actor is running.
614  */
615 void sg_actor_migrate(sg_actor_t process, sg_host_t host)
616 {
617   process->migrate(host);
618 }
619
620 /** @ingroup m_actor_management
621  * @brief Wait for the completion of a #sg_actor_t.
622  *
623  * @param actor the actor to wait for
624  * @param timeout wait until the actor is over, or the timeout expires
625  */
626 void sg_actor_join(sg_actor_t actor, double timeout)
627 {
628   actor->join(timeout);
629 }
630
631 void sg_actor_kill(sg_actor_t actor)
632 {
633   actor->kill();
634 }
635
636 void sg_actor_kill_all()
637 {
638   simgrid::s4u::Actor::kill_all();
639 }
640
641 /** @ingroup m_actor_management
642  * @brief Set the kill time of an actor.
643  *
644  * @param actor an actor
645  * @param kill_time the time when the actor is killed.
646  */
647 void sg_actor_set_kill_time(sg_actor_t actor, double kill_time)
648 {
649   actor->set_kill_time(kill_time);
650 }
651
652 /** Yield the current actor; let the other actors execute first */
653 void sg_actor_yield()
654 {
655   simgrid::s4u::this_actor::yield();
656 }
657
658 void sg_actor_sleep_for(double duration)
659 {
660   simgrid::s4u::this_actor::sleep_for(duration);
661 }
662
663 sg_actor_t sg_actor_attach(const char* name, void* data, sg_host_t host, xbt_dict_t properties)
664 {
665   xbt_assert(host != nullptr, "Invalid parameters: host and code params must not be nullptr");
666   std::unordered_map<std::string, std::string> props;
667   xbt_dict_cursor_t cursor = nullptr;
668   char* key;
669   char* value;
670   xbt_dict_foreach (properties, cursor, key, value)
671     props[key] = value;
672   xbt_dict_free(&properties);
673
674   /* Let's create the process: SIMIX may decide to start it right now, even before returning the flow control to us */
675   smx_actor_t actor = nullptr;
676   try {
677     actor = simgrid::kernel::actor::ActorImpl::attach(name, data, host, &props).get();
678   } catch (simgrid::HostFailureException const&) {
679     xbt_die("Could not attach");
680   }
681
682   simgrid::s4u::this_actor::yield();
683   return actor->ciface();
684 }
685
686 void sg_actor_detach()
687 {
688   simgrid::kernel::actor::ActorImpl::detach();
689 }