/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
+/* This example demonstrate the actor migrations.
+ *
+ * The worker actor first move by itself, and then start an execution.
+ * During that execution, the monitor migrates the worker, that wakes up on another host.
+ * The execution was of the right amount of flops to take exactly 5 seconds on the first host
+ * and 5 other seconds on the second one, so it stops after 10 seconds.
+ *
+ * Then another migration is done by the monitor while the worker is suspended.
+ *
+ * Note that worker() takes an uncommon set of parameters,
+ * and that this is perfectly accepted by createActor().
+ */
+
#include <simgrid/s4u.hpp>
#include <simgrid/s4u/Mutex.hpp>
XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor_migration, "Messages specific for this s4u example");
-simgrid::s4u::MutexPtr checkpoint = nullptr;
-simgrid::s4u::ConditionVariablePtr identification = nullptr;
-static simgrid::s4u::ActorPtr controlled_process = nullptr;
-
-/* The Emigrant process will be moved from host to host. */
-static void emigrant()
+static void worker(simgrid::s4u::Host* first, simgrid::s4u::Host* second)
{
- XBT_INFO("I'll look for a new job on another machine ('Boivin') where the grass is greener.");
- simgrid::s4u::this_actor::migrate(
- simgrid::s4u::Host::by_name("Boivin")); /* - First, move to another host by myself */
-
- XBT_INFO("Yeah, found something to do");
- simgrid::s4u::this_actor::execute(98095000);
- simgrid::s4u::this_actor::sleep_for(2);
+ double flopAmount = first->getSpeed() * 5 + second->getSpeed() * 5;
- XBT_INFO("Moving back home after work");
- simgrid::s4u::this_actor::migrate(simgrid::s4u::Host::by_name("Jacquelin")); /* - Move back to original location */
+ XBT_INFO("Let's move to %s to execute %.2f Mflops (5sec on %s and 5sec on %s)", first->getCname(), flopAmount / 1e6,
+ first->getCname(), second->getCname());
- simgrid::s4u::this_actor::migrate(simgrid::s4u::Host::by_name("Boivin")); /* - Go back to the other host to sleep*/
- simgrid::s4u::this_actor::sleep_for(4);
+ simgrid::s4u::this_actor::migrate(first);
+ simgrid::s4u::this_actor::execute(flopAmount);
- checkpoint->lock(); /* - Get controlled at checkpoint */
- controlled_process = simgrid::s4u::Actor::self(); /* - and get moved back by the policeman process */
- identification->notify_all();
- checkpoint->unlock();
+ XBT_INFO("I wake up on %s. Let's suspend a bit", simgrid::s4u::this_actor::getHost()->getCname());
simgrid::s4u::this_actor::suspend();
- XBT_INFO("I've been moved on this new host: %s", simgrid::s4u::this_actor::getHost()->getCname());
- XBT_INFO("Uh, nothing to do here. Stopping now");
+ XBT_INFO("I wake up on %s", simgrid::s4u::this_actor::getHost()->getCname());
+ XBT_INFO("Done");
}
-/* The policeman check for emigrants and move them back to 'Jacquelin' */
-static void policeman()
+static void monitor()
{
- checkpoint->lock();
+ simgrid::s4u::Host* boivin = simgrid::s4u::Host::by_name("Boivin");
+ simgrid::s4u::Host* jacquelin = simgrid::s4u::Host::by_name("Jacquelin");
+ simgrid::s4u::Host* fafard = simgrid::s4u::Host::by_name("Fafard");
- XBT_INFO("Wait at the checkpoint."); /* - block on the mutex+condition */
- while (controlled_process == nullptr)
- identification->wait(checkpoint);
+ simgrid::s4u::ActorPtr actor = simgrid::s4u::Actor::createActor("worker", fafard, worker, boivin, jacquelin);
- controlled_process->migrate(simgrid::s4u::Host::by_name("Jacquelin")); /* - Move an emigrant to Jacquelin */
- XBT_INFO("I moved the emigrant");
- controlled_process->resume();
+ simgrid::s4u::this_actor::sleep_for(5);
- checkpoint->unlock();
+ XBT_INFO("After 5 seconds, move the process to %s", jacquelin->getCname());
+ actor->migrate(jacquelin);
+
+ simgrid::s4u::this_actor::sleep_until(15);
+ XBT_INFO("At t=15, move the process to %s and resume it.", fafard->getCname());
+ actor->migrate(fafard);
+ actor->resume();
}
int main(int argc, char* argv[])
{
simgrid::s4u::Engine e(&argc, argv);
xbt_assert(argc == 2, "Usage: %s platform_file\n\tExample: %s msg_platform.xml\n", argv[0], argv[0]);
- e.loadPlatform(argv[1]); /* - Load the platform description */
-
- /* - Create and deploy the emigrant and policeman processes */
- simgrid::s4u::Actor::createActor("emigrant", simgrid::s4u::Host::by_name("Jacquelin"), emigrant);
- simgrid::s4u::Actor::createActor("policeman", simgrid::s4u::Host::by_name("Boivin"), policeman);
+ e.loadPlatform(argv[1]);
- checkpoint = simgrid::s4u::Mutex::createMutex(); /* - Initiate the mutex and conditions */
- identification = simgrid::s4u::ConditionVariable::createConditionVariable();
+ simgrid::s4u::Actor::createActor("monitor", simgrid::s4u::Host::by_name("Boivin"), monitor);
e.run();
- XBT_INFO("Simulation time %g", e.getClock());
-
return 0;
}
#! ./tesh
-p Testing the migration feature of MSG
+p Testing the actor migration feature
-! output sort 19
$ $SG_TEST_EXENV ${bindir:=.}/s4u-actor-migration ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%P@%h)%e%m%n"
-> [ 0.000000] (emigrant@Jacquelin) I'll look for a new job on another machine ('Boivin') where the grass is greener.
-> [ 0.000000] (emigrant@Boivin) Yeah, found something to do
-> [ 0.000000] (policeman@Boivin) Wait at the checkpoint.
-> [ 3.000000] (emigrant@Boivin) Moving back home after work
-> [ 7.000000] (maestro@) Simulation time 7
-> [ 7.000000] (emigrant@Jacquelin) I've been moved on this new host: Jacquelin
-> [ 7.000000] (emigrant@Jacquelin) Uh, nothing to do here. Stopping now
-> [ 7.000000] (policeman@Boivin) I moved the emigrant
+> [ 0.000000] (worker@Fafard) Let's move to Boivin to execute 1177.14 Mflops (5sec on Boivin and 5sec on Jacquelin)
+> [ 5.000000] (monitor@Boivin) After 5 seconds, move the process to Jacquelin
+> [ 10.000000] (worker@Jacquelin) I wake up on Jacquelin. Let's suspend a bit
+> [ 15.000000] (monitor@Boivin) At t=15, move the process to Fafard and resume it.
+> [ 15.000000] (worker@Fafard) I wake up on Fafard
+> [ 15.000000] (worker@Fafard) Done
simcall_process_on_exit(pimpl_, fun, data);
}
+/** @brief Moves the actor to another host
+ *
+ * If the actor is currently blocked on an execution activity, the activity is also
+ * migrated to the new host. If it's blocked on another kind of activity, an error is
+ * raised as the mandated code is not written yet. Please report that bug if you need it.
+ *
+ * Asynchronous activities started by the actor are not migrated automatically, so you have
+ * to take care of this yourself (only you knows which ones should be migrated).
+ */
void Actor::migrate(Host* new_host)
{
simgrid::simix::kernelImmediate([this, new_host]() {
- pimpl_->new_host = new_host;
+ if (pimpl_->waiting_synchro != nullptr) {
+ // The actor is blocked on an activity. If it's an exec, migrate it too.
+ // FIXME: implement the migration of other kind of activities
+ simgrid::kernel::activity::ExecImplPtr exec =
+ boost::dynamic_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_->waiting_synchro);
+ xbt_assert(exec.get() != nullptr, "We can only migrate blocked actors when they are blocked on executions.");
+ exec->migrate(new_host);
+ }
+ SIMIX_process_change_host(this->pimpl_, new_host);
});
}
simcall_process_on_exit(SIMIX_process_self(), fun, data);
}
+/** @brief Moves the current actor to another host
+ *
+ * @see simgrid::s4u::Actor::migrate() for more information
+ */
void migrate(Host* new_host)
{
SIMIX_process_self()->iface()->migrate(new_host);
simix_process_maxpid = reset_pid;
}
-void SIMIX_process_change_host(smx_actor_t process, sg_host_t dest)
+void SIMIX_process_change_host(smx_actor_t actor, sg_host_t dest)
{
- xbt_assert((process != nullptr), "Invalid parameters");
- simgrid::xbt::intrusive_erase(process->host->extension<simgrid::simix::Host>()->process_list, *process);
- process->host = dest;
- dest->extension<simgrid::simix::Host>()->process_list.push_back(*process);
+ xbt_assert((actor != nullptr), "Invalid parameters");
+ simgrid::xbt::intrusive_erase(actor->host->extension<simgrid::simix::Host>()->process_list, *actor);
+ actor->host = dest;
+ dest->extension<simgrid::simix::Host>()->process_list.push_back(*actor);
}
void simcall_HANDLER_process_suspend(smx_simcall_t simcall, smx_actor_t process)
/* Ok, maestro returned control to us */
XBT_DEBUG("Control returned to me: '%s'", self->name.c_str());
- if (self->new_host) {
- SIMIX_process_change_host(self, self->new_host);
- self->new_host = nullptr;
- }
-
if (self->context->iwannadie){
XBT_DEBUG("I wanna die!");
self->finished = true;