Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
implement Exec::wait_for()
authorFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 12 Dec 2019 17:09:03 +0000 (18:09 +0100)
committerFrederic Suter <frederic.suter@cc.in2p3.fr>
Thu, 12 Dec 2019 17:10:32 +0000 (18:10 +0100)
15 files changed:
.gitignore
ChangeLog
MANIFEST.in
examples/s4u/CMakeLists.txt
examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp [new file with mode: 0644]
examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh [new file with mode: 0644]
include/simgrid/simix.h
src/kernel/activity/ExecImpl.cpp
src/s4u/s4u_Actor.cpp
src/s4u/s4u_Exec.cpp
src/simix/libsmx.cpp
src/simix/popping_accessors.hpp
src/simix/popping_bodies.cpp
src/simix/popping_generated.cpp
src/simix/simcalls.in

index 5940061..49d8abb 100644 (file)
@@ -181,6 +181,7 @@ examples/s4u/exec-monitor/s4u-exec-monitor
 examples/s4u/exec-ptask/s4u-exec-ptask
 examples/s4u/exec-remote/s4u-exec-remote
 examples/s4u/exec-waitany/s4u-exec-waitany
+examples/s4u/exec-waitfor/s4u-exec-waitfor
 examples/s4u/io-async/s4u-io-async
 examples/s4u/io-file-remote/s4u-io-file-remote
 examples/s4u/io-file-system/s4u-io-file-system
index 242ee77..89cc700 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -6,6 +6,7 @@ S4U:
 - Actor: Merge signals on_migration_start/end into on_host_change
 - Actor: Rename migrate() into set_host()
 - Disk: Allow users to get the read and write nominal bandwidth values
+- Exec: Implement wait_for(timeout)
 
 XML:
 - Parse errors now raise a simgrid::ParseError that you may want to catch.
index e8e2913..f2baf4b 100644 (file)
@@ -376,6 +376,8 @@ include examples/s4u/exec-remote/s4u-exec-remote.cpp
 include examples/s4u/exec-remote/s4u-exec-remote.tesh
 include examples/s4u/exec-waitany/s4u-exec-waitany.cpp
 include examples/s4u/exec-waitany/s4u-exec-waitany.tesh
+include examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp
+include examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh
 include examples/s4u/io-async/s4u-io-async.cpp
 include examples/s4u/io-async/s4u-io-async.tesh
 include examples/s4u/io-disk-raw/s4u-io-disk-raw.cpp
index 7565b6c..1130d4d 100644 (file)
@@ -9,7 +9,7 @@ foreach (example actor-create actor-daemon actor-exiting actor-join actor-kill
                  cloud-capping cloud-migration cloud-simple
                  energy-exec energy-boot energy-link energy-vm energy-exec-ptask
                  engine-filtering
-                 exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany
+                 exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany exec-waitfor
                  io-async io-file-system io-file-remote io-disk-raw
                  platform-failures platform-profile platform-properties
                  plugin-hostload
diff --git a/examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp b/examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp
new file mode 100644 (file)
index 0000000..6aeccfe
--- /dev/null
@@ -0,0 +1,66 @@
+/* Copyright (c) 2019. The SimGrid Team. All rights reserved.          */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitfor, "Messages specific for this s4u example");
+
+static void worker()
+{
+  simgrid::s4u::ExecPtr exec;
+  double amount = 5 * simgrid::s4u::this_actor::get_host()->get_speed();
+  XBT_INFO("Create an activity that should run for 5 seconds");
+
+  exec = simgrid::s4u::this_actor::exec_async(amount);
+
+  /* Now that execution is started, wait for 3 seconds. */
+  XBT_INFO("But let it end after 3 seconds");
+  try {
+    exec->wait_for(3);
+    XBT_INFO("Execution complete");
+  } catch (simgrid::TimeoutException&) {
+    XBT_INFO("Execution Timeout!");
+  }
+
+  /* do it again, but this time with a timeout greater than the duration of the execution */
+  XBT_INFO("Create another activity that should run for 5 seconds and wait for it for 6 seconds");
+  exec = simgrid::s4u::this_actor::exec_async(amount);
+  try {
+    exec->wait_for(6);
+    XBT_INFO("Execution complete");
+  } catch (simgrid::TimeoutException&) {
+    XBT_INFO("Execution Timeout!");
+  }
+
+  XBT_INFO("Finally test with a parallel execution");
+  auto hosts         = simgrid::s4u::Engine::get_instance()->get_all_hosts();
+  size_t hosts_count = hosts.size();
+  std::vector<double> computation_amounts;
+  std::vector<double> communication_amounts;
+
+  computation_amounts.assign(hosts_count, 1e9 /*1Gflop*/);
+  communication_amounts.assign(hosts_count * hosts_count, 0);
+  for (size_t i = 0; i < hosts_count; i++)
+    for (size_t j = i + 1; j < hosts_count; j++)
+      communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
+
+  exec = simgrid::s4u::this_actor::exec_init(hosts, computation_amounts, communication_amounts);
+  try {
+    exec->wait_for(2);
+    XBT_INFO("Parallel Execution complete");
+  } catch (simgrid::TimeoutException&) {
+    XBT_INFO("Parallel Execution Timeout!");
+  }
+}
+
+int main(int argc, char* argv[])
+{
+  simgrid::s4u::Engine e(&argc, argv);
+  e.load_platform(argv[1]);
+  simgrid::s4u::Actor::create("worker", simgrid::s4u::Host::by_name("Tremblay"), worker);
+  e.run();
+
+  return 0;
+}
diff --git a/examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh b/examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh
new file mode 100644 (file)
index 0000000..c58d048
--- /dev/null
@@ -0,0 +1,10 @@
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-exec-waitfor ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
+> [  0.000000] [        worker] Create an activity that should run for 5 seconds
+> [  0.000000] [        worker] But let it end after 3 seconds
+> [  3.000000] [        worker] Execution Timeout!
+> [  3.000000] [        worker] Create another activity that should run for 5 seconds and wait for it for 6 seconds
+> [  8.000000] [        worker] Execution complete
+> [  8.000000] [        worker] Finally test with a parallel execution
+> [ 10.000000] [        worker] Parallel Execution Timeout!
index 9b6ff41..2e69f6a 100644 (file)
@@ -114,7 +114,7 @@ XBT_PUBLIC void SIMIX_comm_copy_buffer_callback(simgrid::kernel::activity::CommI
 
 /******************************* Host simcalls ********************************/
 #ifdef __cplusplus
-XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution);
+XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution, double timeout);
 XBT_PUBLIC unsigned int simcall_execution_waitany_for(simgrid::kernel::activity::ExecImpl* execs[], size_t count,
                                                       double timeout);
 XBT_PUBLIC bool simcall_execution_test(const smx_activity_t& execution);
index eda2b58..76b136f 100644 (file)
 
 XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_process);
 
-void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro)
+void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro, double timeout)
 {
   XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro, (int)synchro->state_);
+  xbt_assert(std::isfinite(timeout), "timeout is not finite!");
 
   /* Associate this simcall to the synchro */
   synchro->register_simcall(simcall);
 
   /* set surf's synchro */
   if (MC_is_active() || MC_record_replay_is_active()) {
-    synchro->state_ = simgrid::kernel::activity::State::DONE;
+    int idx = SIMCALL_GET_MC_VALUE(*simcall);
+    if (idx == 0) {
+      synchro->state_ = simgrid::kernel::activity::State::DONE;
+    } else {
+      /* If we reached this point, the wait simcall must have a timeout */
+      /* Otherwise it shouldn't be enabled and executed by the MC */
+      if (timeout < 0.0)
+        THROW_IMPOSSIBLE;
+      synchro->state_ = simgrid::kernel::activity::State::TIMEOUT;
+    }
     synchro->finish();
     return;
   }
 
   /* If the synchro is already finished then perform the error handling */
-  if (synchro->state_ != simgrid::kernel::activity::State::RUNNING)
+  if (synchro->state_ != simgrid::kernel::activity::State::RUNNING) {
     synchro->finish();
+  } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */
+    synchro->set_timeout(timeout);
+  }
 }
 
 void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro)
index 0ac6454..e0768d8 100644 (file)
@@ -345,11 +345,22 @@ void execute(double flops, double priority)
 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
                       const std::vector<double>& bytes_amounts)
 {
-  parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
+  exec_init(hosts, flops_amounts, bytes_amounts)->wait();
 }
 
 void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
                       const std::vector<double>& bytes_amounts, double timeout)
+{
+  exec_init(hosts, flops_amounts, bytes_amounts)->wait_for(timeout);
+}
+
+ExecPtr exec_init(double flops_amount)
+{
+  return ExecPtr(new ExecSeq(get_host(), flops_amount));
+}
+
+ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+                  const std::vector<double>& bytes_amounts)
 {
   xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
   xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
@@ -371,17 +382,6 @@ void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<do
   xbt_assert(std::all_of(bytes_amounts.begin(), bytes_amounts.end(), [](double elm) { return std::isfinite(elm); }),
              "flops_amounts comprises infinite values!");
 
-  exec_init(hosts, flops_amounts, bytes_amounts)->set_timeout(timeout)->wait();
-}
-
-ExecPtr exec_init(double flops_amount)
-{
-  return ExecPtr(new ExecSeq(get_host(), flops_amount));
-}
-
-ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
-                  const std::vector<double>& bytes_amounts)
-{
   return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts));
 }
 
index ffe6962..66016d3 100644 (file)
@@ -39,20 +39,20 @@ bool Exec::test()
 }
 
 Exec* Exec::wait()
+{
+  return this->wait_for(-1);
+}
+
+Exec* Exec::wait_for(double timeout)
 {
   if (state_ == State::INITED)
     start();
-  simcall_execution_wait(pimpl_);
+  simcall_execution_wait(pimpl_, timeout);
   state_ = State::FINISHED;
   on_completion(*Actor::self(), *this);
   return this;
 }
 
-Exec* Exec::wait_for(double)
-{
-  THROW_UNIMPLEMENTED;
-}
-
 int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
 {
   std::unique_ptr<kernel::activity::ExecImpl* []> rexecs(new kernel::activity::ExecImpl*[execs->size()]);
index 275c34d..3616ba0 100644 (file)
  *
  * @param execution The execution synchro
  */
-e_smx_state_t simcall_execution_wait(const smx_activity_t& execution)
+e_smx_state_t simcall_execution_wait(const smx_activity_t& execution, double timeout)
 {
-  return (e_smx_state_t)simcall_BODY_execution_wait(static_cast<simgrid::kernel::activity::ExecImpl*>(execution.get()));
+  return (e_smx_state_t)simcall_BODY_execution_wait(static_cast<simgrid::kernel::activity::ExecImpl*>(execution.get()),
+                                                    timeout);
 }
 
 bool simcall_execution_test(const smx_activity_t& execution)
index 066b3ed..96328c2 100644 (file)
@@ -27,6 +27,18 @@ static inline void simcall_execution_wait__set__execution(smx_simcall_t simcall,
 {
   simgrid::simix::marshal<simgrid::kernel::activity::ExecImpl*>(simcall->args_[0], arg);
 }
+static inline double simcall_execution_wait__get__timeout(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal<double>(simcall->args_[1]);
+}
+static inline double simcall_execution_wait__getraw__timeout(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal_raw<double>(simcall->args_[1]);
+}
+static inline void simcall_execution_wait__set__timeout(smx_simcall_t simcall, double arg)
+{
+  simgrid::simix::marshal<double>(simcall->args_[1], arg);
+}
 static inline int simcall_execution_wait__get__result(smx_simcall_t simcall)
 {
   return simgrid::simix::unmarshal<int>(simcall->result_);
@@ -999,7 +1011,7 @@ static inline void simcall_run_blocking__set__code(smx_simcall_t simcall, std::f
 
 /* The prototype of all simcall handlers, automatically generated for you */
 
-XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
+XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution, double timeout);
 XBT_PRIVATE void simcall_HANDLER_execution_waitany_for(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout);
 XBT_PRIVATE void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
 XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t sender, smx_mailbox_t mbox, double task_size, double rate, unsigned char* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout);
index ba60b6d..4462018 100644 (file)
@@ -39,11 +39,11 @@ inline static R simcall(e_smx_simcall_t call, T const&... t)
   return simgrid::simix::unmarshal<R>(self->simcall.result_);
 }
 
-inline static int simcall_BODY_execution_wait(simgrid::kernel::activity::ExecImpl* execution)
+inline static int simcall_BODY_execution_wait(simgrid::kernel::activity::ExecImpl* execution, double timeout)
 {
   if (0) /* Go to that function to follow the code flow through the simcall barrier */
-    simcall_HANDLER_execution_wait(&SIMIX_process_self()->simcall, execution);
-  return simcall<int, simgrid::kernel::activity::ExecImpl*>(SIMCALL_EXECUTION_WAIT, execution);
+    simcall_HANDLER_execution_wait(&SIMIX_process_self()->simcall, execution, timeout);
+  return simcall<int, simgrid::kernel::activity::ExecImpl*, double>(SIMCALL_EXECUTION_WAIT, execution, timeout);
 }
 
 inline static int simcall_BODY_execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout)
index c014307..df1deab 100644 (file)
@@ -62,7 +62,7 @@ void simgrid::kernel::actor::ActorImpl::simcall_handle(int value) {
     return;
   switch (simcall.call_) {
     case SIMCALL_EXECUTION_WAIT:
-      simcall_HANDLER_execution_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall.args_[0]));
+      simcall_HANDLER_execution_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall.args_[0]), simgrid::simix::unmarshal<double>(simcall.args_[1]));
       break;
 
     case SIMCALL_EXECUTION_WAITANY_FOR:
index 05a4c06..bf2371e 100644 (file)
@@ -35,7 +35,7 @@
 # Last but not the least, you should declare the new simix call in
 # ./include/simgrid/simix.h (otherwise you will get a warning at compile time)
 
-int           execution_wait(simgrid::kernel::activity::ExecImpl* execution) [[block]];
+int           execution_wait(simgrid::kernel::activity::ExecImpl* execution, double timeout) [[block]];
 int           execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout) [[block]];
 bool          execution_test(simgrid::kernel::activity::ExecImpl* execution) [[block]];