Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
fix #245: migrating an actor does not migrate its execution
authorMartin Quinson <martin.quinson@loria.fr>
Tue, 26 Dec 2017 21:26:15 +0000 (22:26 +0100)
committerMartin Quinson <martin.quinson@loria.fr>
Tue, 26 Dec 2017 21:37:19 +0000 (22:37 +0100)
Before, the migration was only taking place when the actor got awaken
after an activity. Now, it takes place right away, and if it's
blocking on an execution, the activity is also migrated. If it's another
kind of activity, then an error is raised as this is not implemented
yet.

Also rewrite the s4u-actor-migrate example to be less funny but more
informative, and to test that feature.

(fix #245)

ChangeLog
examples/s4u/actor-migration/s4u-actor-migration.cpp
examples/s4u/actor-migration/s4u-actor-migration.tesh
src/s4u/s4u_actor.cpp
src/simix/ActorImpl.cpp
src/simix/ActorImpl.hpp
src/simix/smx_network.cpp

index dd9f694..e4c7963 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -3,6 +3,9 @@ SimGrid (3.19) NOT RELEASED YET (target: March 20 2018, 16:15:27 UTC)
  S4U
  - Execution->setHost() can be called after start() to migrate it.
 
  S4U
  - Execution->setHost() can be called after start() to migrate it.
 
+ Fixed bugs:
+ - #245: migrating an actor does not migrate its execution
+
 SimGrid (3.18) Released December 24 2017
 
  The "Ho Ho Ho! SimGrid 4 beta is coming to town" release.
 SimGrid (3.18) Released December 24 2017
 
  The "Ho Ho Ho! SimGrid 4 beta is coming to town" release.
index 5db7924..b9a3636 100644 (file)
@@ -3,74 +3,69 @@
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 
+/* This example demonstrate the actor migrations.
+ *
+ * The worker actor first move by itself, and then start an execution.
+ * During that execution, the monitor migrates the worker, that wakes up on another host.
+ * The execution was of the right amount of flops to take exactly 5 seconds on the first host
+ * and 5 other seconds on the second one, so it stops after 10 seconds.
+ *
+ * Then another migration is done by the monitor while the worker is suspended.
+ *
+ * Note that worker() takes an uncommon set of parameters,
+ * and that this is perfectly accepted by createActor().
+ */
+
 #include <simgrid/s4u.hpp>
 #include <simgrid/s4u/Mutex.hpp>
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor_migration, "Messages specific for this s4u example");
 
 #include <simgrid/s4u.hpp>
 #include <simgrid/s4u/Mutex.hpp>
 
 XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_actor_migration, "Messages specific for this s4u example");
 
-simgrid::s4u::MutexPtr checkpoint                 = nullptr;
-simgrid::s4u::ConditionVariablePtr identification = nullptr;
-static simgrid::s4u::ActorPtr controlled_process  = nullptr;
-
-/* The Emigrant process will be moved from host to host. */
-static void emigrant()
+static void worker(simgrid::s4u::Host* first, simgrid::s4u::Host* second)
 {
 {
-  XBT_INFO("I'll look for a new job on another machine ('Boivin') where the grass is greener.");
-  simgrid::s4u::this_actor::migrate(
-      simgrid::s4u::Host::by_name("Boivin")); /* - First, move to another host by myself */
-
-  XBT_INFO("Yeah, found something to do");
-  simgrid::s4u::this_actor::execute(98095000);
-  simgrid::s4u::this_actor::sleep_for(2);
+  double flopAmount = first->getSpeed() * 5 + second->getSpeed() * 5;
 
 
-  XBT_INFO("Moving back home after work");
-  simgrid::s4u::this_actor::migrate(simgrid::s4u::Host::by_name("Jacquelin")); /* - Move back to original location */
+  XBT_INFO("Let's move to %s to execute %.2f Mflops (5sec on %s and 5sec on %s)", first->getCname(), flopAmount / 1e6,
+           first->getCname(), second->getCname());
 
 
-  simgrid::s4u::this_actor::migrate(simgrid::s4u::Host::by_name("Boivin")); /* - Go back to the other host to sleep*/
-  simgrid::s4u::this_actor::sleep_for(4);
+  simgrid::s4u::this_actor::migrate(first);
+  simgrid::s4u::this_actor::execute(flopAmount);
 
 
-  checkpoint->lock();                               /* - Get controlled at checkpoint */
-  controlled_process = simgrid::s4u::Actor::self(); /* - and get moved back by the policeman process */
-  identification->notify_all();
-  checkpoint->unlock();
+  XBT_INFO("I wake up on %s. Let's suspend a bit", simgrid::s4u::this_actor::getHost()->getCname());
 
   simgrid::s4u::this_actor::suspend();
 
 
   simgrid::s4u::this_actor::suspend();
 
-  XBT_INFO("I've been moved on this new host: %s", simgrid::s4u::this_actor::getHost()->getCname());
-  XBT_INFO("Uh, nothing to do here. Stopping now");
+  XBT_INFO("I wake up on %s", simgrid::s4u::this_actor::getHost()->getCname());
+  XBT_INFO("Done");
 }
 
 }
 
-/* The policeman check for emigrants and move them back to 'Jacquelin' */
-static void policeman()
+static void monitor()
 {
 {
-  checkpoint->lock();
+  simgrid::s4u::Host* boivin    = simgrid::s4u::Host::by_name("Boivin");
+  simgrid::s4u::Host* jacquelin = simgrid::s4u::Host::by_name("Jacquelin");
+  simgrid::s4u::Host* fafard    = simgrid::s4u::Host::by_name("Fafard");
 
 
-  XBT_INFO("Wait at the checkpoint."); /* - block on the mutex+condition */
-  while (controlled_process == nullptr)
-    identification->wait(checkpoint);
+  simgrid::s4u::ActorPtr actor = simgrid::s4u::Actor::createActor("worker", fafard, worker, boivin, jacquelin);
 
 
-  controlled_process->migrate(simgrid::s4u::Host::by_name("Jacquelin")); /* - Move an emigrant to Jacquelin */
-  XBT_INFO("I moved the emigrant");
-  controlled_process->resume();
+  simgrid::s4u::this_actor::sleep_for(5);
 
 
-  checkpoint->unlock();
+  XBT_INFO("After 5 seconds, move the process to %s", jacquelin->getCname());
+  actor->migrate(jacquelin);
+
+  simgrid::s4u::this_actor::sleep_until(15);
+  XBT_INFO("At t=15, move the process to %s and resume it.", fafard->getCname());
+  actor->migrate(fafard);
+  actor->resume();
 }
 
 int main(int argc, char* argv[])
 {
   simgrid::s4u::Engine e(&argc, argv);
   xbt_assert(argc == 2, "Usage: %s platform_file\n\tExample: %s msg_platform.xml\n", argv[0], argv[0]);
 }
 
 int main(int argc, char* argv[])
 {
   simgrid::s4u::Engine e(&argc, argv);
   xbt_assert(argc == 2, "Usage: %s platform_file\n\tExample: %s msg_platform.xml\n", argv[0], argv[0]);
-  e.loadPlatform(argv[1]); /* - Load the platform description */
-
-  /* - Create and deploy the emigrant and policeman processes */
-  simgrid::s4u::Actor::createActor("emigrant", simgrid::s4u::Host::by_name("Jacquelin"), emigrant);
-  simgrid::s4u::Actor::createActor("policeman", simgrid::s4u::Host::by_name("Boivin"), policeman);
+  e.loadPlatform(argv[1]);
 
 
-  checkpoint     = simgrid::s4u::Mutex::createMutex(); /* - Initiate the mutex and conditions */
-  identification = simgrid::s4u::ConditionVariable::createConditionVariable();
+  simgrid::s4u::Actor::createActor("monitor", simgrid::s4u::Host::by_name("Boivin"), monitor);
   e.run();
 
   e.run();
 
-  XBT_INFO("Simulation time %g", e.getClock());
-
   return 0;
 }
   return 0;
 }
index 3d54d91..6c82feb 100644 (file)
@@ -1,14 +1,11 @@
 #! ./tesh
 
 #! ./tesh
 
-p Testing the migration feature of MSG
+p Testing the actor migration feature 
 
 
-! output sort 19
 $ $SG_TEST_EXENV ${bindir:=.}/s4u-actor-migration ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%P@%h)%e%m%n"
 $ $SG_TEST_EXENV ${bindir:=.}/s4u-actor-migration ${platfdir}/small_platform.xml "--log=root.fmt:[%10.6r]%e(%P@%h)%e%m%n"
-> [  0.000000] (emigrant@Jacquelin) I'll look for a new job on another machine ('Boivin') where the grass is greener.
-> [  0.000000] (emigrant@Boivin) Yeah, found something to do
-> [  0.000000] (policeman@Boivin) Wait at the checkpoint.
-> [  3.000000] (emigrant@Boivin) Moving back home after work
-> [  7.000000] (maestro@) Simulation time 7
-> [  7.000000] (emigrant@Jacquelin) I've been moved on this new host: Jacquelin
-> [  7.000000] (emigrant@Jacquelin) Uh, nothing to do here. Stopping now
-> [  7.000000] (policeman@Boivin) I moved the emigrant
+> [  0.000000] (worker@Fafard) Let's move to Boivin to execute 1177.14 Mflops (5sec on Boivin and 5sec on Jacquelin)
+> [  5.000000] (monitor@Boivin) After 5 seconds, move the process to Jacquelin
+> [ 10.000000] (worker@Jacquelin) I wake up on Jacquelin. Let's suspend a bit
+> [ 15.000000] (monitor@Boivin) At t=15, move the process to Fafard and resume it.
+> [ 15.000000] (worker@Fafard) I wake up on Fafard
+> [ 15.000000] (worker@Fafard) Done
index 850069b..8474559 100644 (file)
@@ -74,10 +74,27 @@ void Actor::onExit(int_f_pvoid_pvoid_t fun, void* data)
   simcall_process_on_exit(pimpl_, fun, data);
 }
 
   simcall_process_on_exit(pimpl_, fun, data);
 }
 
+/** @brief Moves the actor to another host
+ *
+ * If the actor is currently blocked on an execution activity, the activity is also
+ * migrated to the new host. If it's blocked on another kind of activity, an error is
+ * raised as the mandated code is not written yet. Please report that bug if you need it.
+ *
+ * Asynchronous activities started by the actor are not migrated automatically, so you have
+ * to take care of this yourself (only you knows which ones should be migrated).
+ */
 void Actor::migrate(Host* new_host)
 {
   simgrid::simix::kernelImmediate([this, new_host]() {
 void Actor::migrate(Host* new_host)
 {
   simgrid::simix::kernelImmediate([this, new_host]() {
-    pimpl_->new_host = new_host;
+    if (pimpl_->waiting_synchro != nullptr) {
+      // The actor is blocked on an activity. If it's an exec, migrate it too.
+      // FIXME: implement the migration of other kind of activities
+      simgrid::kernel::activity::ExecImplPtr exec =
+          boost::dynamic_pointer_cast<simgrid::kernel::activity::ExecImpl>(pimpl_->waiting_synchro);
+      xbt_assert(exec.get() != nullptr, "We can only migrate blocked actors when they are blocked on executions.");
+      exec->migrate(new_host);
+    }
+    SIMIX_process_change_host(this->pimpl_, new_host);
   });
 }
 
   });
 }
 
@@ -361,6 +378,10 @@ void onExit(int_f_pvoid_pvoid_t fun, void* data)
   simcall_process_on_exit(SIMIX_process_self(), fun, data);
 }
 
   simcall_process_on_exit(SIMIX_process_self(), fun, data);
 }
 
+/** @brief Moves the current actor to another host
+ *
+ * @see simgrid::s4u::Actor::migrate() for more information
+ */
 void migrate(Host* new_host)
 {
   SIMIX_process_self()->iface()->migrate(new_host);
 void migrate(Host* new_host)
 {
   SIMIX_process_self()->iface()->migrate(new_host);
index 2db4aeb..7866fb0 100644 (file)
@@ -607,12 +607,12 @@ void SIMIX_process_killall(smx_actor_t issuer, int reset_pid)
     simix_process_maxpid = reset_pid;
 }
 
     simix_process_maxpid = reset_pid;
 }
 
-void SIMIX_process_change_host(smx_actor_t process, sg_host_t dest)
+void SIMIX_process_change_host(smx_actor_t actor, sg_host_t dest)
 {
 {
-  xbt_assert((process != nullptr), "Invalid parameters");
-  simgrid::xbt::intrusive_erase(process->host->extension<simgrid::simix::Host>()->process_list, *process);
-  process->host = dest;
-  dest->extension<simgrid::simix::Host>()->process_list.push_back(*process);
+  xbt_assert((actor != nullptr), "Invalid parameters");
+  simgrid::xbt::intrusive_erase(actor->host->extension<simgrid::simix::Host>()->process_list, *actor);
+  actor->host = dest;
+  dest->extension<simgrid::simix::Host>()->process_list.push_back(*actor);
 }
 
 void simcall_HANDLER_process_suspend(smx_simcall_t simcall, smx_actor_t process)
 }
 
 void simcall_HANDLER_process_suspend(smx_simcall_t simcall, smx_actor_t process)
@@ -744,11 +744,6 @@ void SIMIX_process_yield(smx_actor_t self)
   /* Ok, maestro returned control to us */
   XBT_DEBUG("Control returned to me: '%s'", self->name.c_str());
 
   /* Ok, maestro returned control to us */
   XBT_DEBUG("Control returned to me: '%s'", self->name.c_str());
 
-  if (self->new_host) {
-    SIMIX_process_change_host(self, self->new_host);
-    self->new_host = nullptr;
-  }
-
   if (self->context->iwannadie){
     XBT_DEBUG("I wanna die!");
     self->finished = true;
   if (self->context->iwannadie){
     XBT_DEBUG("I wanna die!");
     self->finished = true;
index 0fd300a..ba966d1 100644 (file)
@@ -56,7 +56,6 @@ public:
   bool suspended    = false;
   bool auto_restart = false;
 
   bool suspended    = false;
   bool auto_restart = false;
 
-  sg_host_t new_host             = nullptr; /* if not null, the host on which the process must migrate to */
   smx_activity_t waiting_synchro = nullptr; /* the current blocking synchro if any */
   std::list<smx_activity_t> comms;          /* the current non-blocking communication synchros */
   s_smx_simcall_t simcall;
   smx_activity_t waiting_synchro = nullptr; /* the current blocking synchro if any */
   std::list<smx_activity_t> comms;          /* the current non-blocking communication synchros */
   s_smx_simcall_t simcall;
index ea92bfd..68798b9 100644 (file)
@@ -475,7 +475,7 @@ static inline void SIMIX_comm_start(simgrid::kernel::activity::CommImplPtr comm)
       comm->cleanupSurf();
     }
 
       comm->cleanupSurf();
     }
 
-    /* If any of the process is suspend, create the synchro but stop its execution,
+    /* If any of the process is suspended, create the synchro but stop its execution,
        it will be restarted when the sender process resume */
     if (comm->src_proc->isSuspended() || comm->dst_proc->isSuspended()) {
       if (comm->src_proc->isSuspended())
        it will be restarted when the sender process resume */
     if (comm->src_proc->isSuspended() || comm->dst_proc->isSuspended()) {
       if (comm->src_proc->isSuspended())