Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
New function: MSG_parallel_task_execute_with_timeout
authorMartin Quinson <martin.quinson@loria.fr>
Fri, 11 Nov 2016 23:15:16 +0000 (00:15 +0100)
committerMartin Quinson <martin.quinson@loria.fr>
Fri, 11 Nov 2016 23:15:28 +0000 (00:15 +0100)
This is the fist time that executions can finish with a timeout, and
I'm not really proud of the resulting code. Sorry, that's just a quick
(fix #115) while I was fixing the platform generation.

At some point the whole activity thingy should be reworked, too.

14 files changed:
examples/msg/energy-ptask/energy-ptask.c
examples/msg/energy-ptask/energy-ptask.tesh
include/simgrid/msg.h
include/simgrid/simix.h
src/kernel/activity/SynchroExec.cpp
src/kernel/activity/SynchroExec.hpp
src/msg/msg_gos.cpp
src/simix/libsmx.cpp
src/simix/popping_accessors.h
src/simix/popping_bodies.cpp
src/simix/popping_generated.cpp
src/simix/simcalls.in
src/simix/smx_host.cpp
src/simix/smx_host_private.h

index 01d8cce..3cf07cc 100644 (file)
@@ -1,5 +1,4 @@
-/* Copyright (c) 2007-2015. The SimGrid Team.
- * All rights reserved.                                                     */
+/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved.          */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
@@ -10,8 +9,8 @@
 XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example");
 
 /** @addtogroup MSG_examples
- * 
- * - <b>parallel_task/parallel_task.c</b>: Demonstrates the use of @ref MSG_parallel_task_create, to create special
+ *
+ * - <b>energy-ptask/energy-ptask.c</b>: Demonstrates the use of @ref MSG_parallel_task_create, to create special
  *   tasks that run on several hosts at the same time. The resulting simulations are very close to what can be
  *   achieved in @ref SD_API, but still allows to use the other features of MSG (it'd be cool to be able to mix
  *   interfaces, but it's not possible ATM).
@@ -41,6 +40,20 @@ static int runner(int argc, char *argv[])
   MSG_task_destroy(ptask);
   /* The arrays communication_amounts and computation_amounts are not to be freed manually */
 
+  XBT_INFO("We can do the same with a timeout of one second enabled.");
+  computation_amounts   = xbt_new0(double, hosts_count);
+  communication_amounts = xbt_new0(double, hosts_count* hosts_count);
+  for (int i               = 0; i < hosts_count; i++)
+    computation_amounts[i] = 1e9; // 1 Gflop
+  for (int i = 0; i < hosts_count; i++)
+    for (int j                                   = i + 1; j < hosts_count; j++)
+      communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
+  ptask =
+      MSG_parallel_task_create("parallel task", hosts_count, hosts, computation_amounts, communication_amounts, NULL);
+  msg_error_t errcode = MSG_parallel_task_execute_with_timeout(ptask, 1 /* timeout (in seconds)*/);
+  xbt_assert(errcode == MSG_TIMEOUT, "Woops, this did not timeout as expected... Please report that bug.");
+  MSG_task_destroy(ptask);
+
   XBT_INFO("Then, build a parallel task involving only computations and no communication (1 Gflop per node)");
   computation_amounts = xbt_new0(double, hosts_count);
   for (int i = 0; i < hosts_count; i++)
index 2bfd392..0298dca 100644 (file)
@@ -3,11 +3,12 @@
 $ $SG_TEST_EXENV energy-ptask/energy-ptask$EXEEXT ${srcdir:=.}/../platforms/energy_platform.xml --energy "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
 > [  0.000000] (0:maestro@) Switching to the L07 model to handle parallel tasks.
 > [  0.000000] (1:test@MyHost1) First, build a classical parallel task, with 1 Gflop to execute on each node, and 10MB to exchange between each pair
-> [300.000000] (1:test@MyHost1) Then, build a parallel task involving only computations and no communication (1 Gflop per node)
-> [310.000000] (1:test@MyHost1) Then, build a parallel task with no computation nor communication (synchro only)
-> [310.000000] (1:test@MyHost1) Finally, trick the ptask to do a 'remote execution', on host MyHost2
-> [320.000000] (1:test@MyHost1) Goodbye now!
-> [320.000000] (0:maestro@) Simulation done.
-> [320.000000] (0:maestro@) Total energy of host MyHost1: 38200.000000 Joules
-> [320.000000] (0:maestro@) Total energy of host MyHost2: 38400.000000 Joules
-> [320.000000] (0:maestro@) Total energy of host MyHost3: 38200.000000 Joules
+> [300.000000] (1:test@MyHost1) We can do the same with a timeout of one second enabled.
+> [301.000000] (1:test@MyHost1) Then, build a parallel task involving only computations and no communication (1 Gflop per node)
+> [311.000000] (1:test@MyHost1) Then, build a parallel task with no computation nor communication (synchro only)
+> [311.000000] (1:test@MyHost1) Finally, trick the ptask to do a 'remote execution', on host MyHost2
+> [321.000000] (1:test@MyHost1) Goodbye now!
+> [321.000000] (0:maestro@) Simulation done.
+> [321.000000] (0:maestro@) Total energy of host MyHost1: 38320.000000 Joules
+> [321.000000] (0:maestro@) Total energy of host MyHost2: 38520.000000 Joules
+> [321.000000] (0:maestro@) Total energy of host MyHost3: 38320.000000 Joules
index a7f1ad3..421a883 100644 (file)
@@ -356,6 +356,7 @@ XBT_PUBLIC(msg_error_t) MSG_task_destroy(msg_task_t task);
 
 XBT_PUBLIC(msg_error_t) MSG_task_execute(msg_task_t task);
 XBT_PUBLIC(msg_error_t) MSG_parallel_task_execute(msg_task_t task);
+XBT_PUBLIC(msg_error_t) MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout);
 XBT_PUBLIC(void) MSG_task_set_priority(msg_task_t task, double priority);
 XBT_PUBLIC(void) MSG_task_set_bound(msg_task_t task, double bound);
 
index a0e8e67..40f0295 100644 (file)
@@ -80,6 +80,7 @@ typedef enum {
   SIMIX_FAILED,
   SIMIX_SRC_HOST_FAILURE,
   SIMIX_DST_HOST_FAILURE,
+  SIMIX_TIMEOUT,
   SIMIX_SRC_TIMEOUT,
   SIMIX_DST_TIMEOUT,
   SIMIX_LINK_FAILURE
@@ -248,13 +249,9 @@ XBT_PUBLIC(void) simcall_host_set_data(sg_host_t host, void *data);
 XBT_PUBLIC(smx_activity_t) simcall_execution_start(const char *name,
                                                 double flops_amount,
                                                 double priority, double bound);
-XBT_PUBLIC(smx_activity_t) simcall_execution_parallel_start(const char *name,
-                                                     int host_nb,
-                                                     sg_host_t *host_list,
-                                                     double *flops_amount,
-                                                     double *bytes_amount,
-                                                     double amount,
-                                                     double rate);
+XBT_PUBLIC(smx_activity_t)
+simcall_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount,
+                                 double* bytes_amount, double amount, double rate, double timeout);
 XBT_PUBLIC(void) simcall_execution_cancel(smx_activity_t execution);
 XBT_PUBLIC(void) simcall_execution_set_priority(smx_activity_t execution, double priority);
 XBT_PUBLIC(void) simcall_execution_set_bound(smx_activity_t execution, double bound);
index 2617322..31ad7d1 100644 (file)
@@ -21,6 +21,8 @@ simgrid::kernel::activity::Exec::~Exec()
 {
   if (surf_exec)
     surf_exec->unref();
+  if (timeoutDetector)
+    timeoutDetector->unref();
 }
 void simgrid::kernel::activity::Exec::suspend()
 {
@@ -51,6 +53,8 @@ void simgrid::kernel::activity::Exec::post()
   } else if (surf_exec->getState() == simgrid::surf::Action::State::failed) {
     /* If the host running the synchro didn't fail, then the synchro was canceled */
     state = SIMIX_CANCELED;
+  } else if (timeoutDetector && timeoutDetector->getState() == simgrid::surf::Action::State::done) {
+    state = SIMIX_TIMEOUT;
   } else {
     state = SIMIX_DONE;
   }
@@ -59,6 +63,10 @@ void simgrid::kernel::activity::Exec::post()
     surf_exec->unref();
     surf_exec = nullptr;
   }
+  if (timeoutDetector) {
+    timeoutDetector->unref();
+    timeoutDetector = nullptr;
+  }
 
   /* If there are simcalls associated with the synchro, then answer them */
   if (!simcalls.empty())
index d669494..c0e315f 100644 (file)
@@ -24,6 +24,7 @@ namespace activity {
 
     sg_host_t host = nullptr; /* The host where the execution takes place. If nullptr, then this is a parallel exec (and only surf knows the hosts) */
     surf_action_t surf_exec = nullptr; /* The Surf execution action encapsulated */
+    surf::Action* timeoutDetector = nullptr;
   };
 
 }}} // namespace simgrid::kernel::activity
index e9227f0..6c2ea24 100644 (file)
@@ -42,6 +42,11 @@ msg_error_t MSG_task_execute(msg_task_t task)
  * or #MSG_HOST_FAILURE otherwise
  */
 msg_error_t MSG_parallel_task_execute(msg_task_t task)
+{
+  return MSG_parallel_task_execute_with_timeout(task, -1);
+}
+
+msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout)
 {
   simdata_task_t simdata = task->simdata;
   simdata_process_t p_simdata = static_cast<simdata_process_t>(SIMIX_process_self_get_data());
@@ -63,10 +68,9 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task)
     simdata->setUsed();
 
     if (simdata->host_nb > 0) {
-      simdata->compute = static_cast<simgrid::kernel::activity::Exec*>(
-          simcall_execution_parallel_start(task->name, simdata->host_nb,simdata->host_list,
-                                                       simdata->flops_parallel_amount, simdata->bytes_parallel_amount,
-                                                       1.0, -1.0));
+      simdata->compute = static_cast<simgrid::kernel::activity::Exec*>(simcall_execution_parallel_start(
+          task->name, simdata->host_nb, simdata->host_list, simdata->flops_parallel_amount,
+          simdata->bytes_parallel_amount, 1.0, -1.0, timeout));
       XBT_DEBUG("Parallel execution action created: %p", simdata->compute);
     } else {
       simdata->compute = static_cast<simgrid::kernel::activity::Exec*>(
@@ -89,6 +93,9 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task)
     case host_error:
       status = MSG_HOST_FAILURE;
       break;
+    case timeout_error:
+      status = MSG_TIMEOUT;
+      break;
     default:
       throw;
     }
index a9ad2c9..4686b80 100644 (file)
@@ -97,15 +97,12 @@ smx_activity_t simcall_execution_start(const char *name,
  * amount between each pair of hosts
  * \param amount the SURF action amount
  * \param rate the SURF action rate
+ * \param timeout timeout
  * \return A new SIMIX execution synchronization
  */
-smx_activity_t simcall_execution_parallel_start(const char *name,
-                                         int host_nb,
-                                         sg_host_t *host_list,
-                                         double *flops_amount,
-                                         double *bytes_amount,
-                                         double amount,
-                                         double rate)
+smx_activity_t simcall_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list,
+                                                double* flops_amount, double* bytes_amount, double amount, double rate,
+                                                double timeout)
 {
   int i,j;
   /* checking for infinite values */
@@ -122,11 +119,8 @@ smx_activity_t simcall_execution_parallel_start(const char *name,
   xbt_assert(std::isfinite(amount), "amount is not finite!");
   xbt_assert(std::isfinite(rate), "rate is not finite!");
 
-  return simcall_BODY_execution_parallel_start(name, host_nb, host_list,
-                                            flops_amount,
-                                            bytes_amount,
-                                            amount, rate);
-
+  return simcall_BODY_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate,
+                                               timeout);
 }
 
 /**
index d911ffc..9da45f3 100644 (file)
@@ -215,6 +215,14 @@ static inline double simcall_execution_parallel_start__get__rate(smx_simcall_t s
 static inline void simcall_execution_parallel_start__set__rate(smx_simcall_t simcall, double arg) {
     simgrid::simix::marshal<double>(simcall->args[6], arg);
 }
+static inline double simcall_execution_parallel_start__get__timeout(smx_simcall_t simcall)
+{
+  return simgrid::simix::unmarshal<double>(simcall->args[7]);
+}
+static inline void simcall_execution_parallel_start__set__timeout(smx_simcall_t simcall, double arg)
+{
+  simgrid::simix::marshal<double>(simcall->args[7], arg);
+}
 static inline smx_activity_t simcall_execution_parallel_start__get__result(smx_simcall_t simcall){
     return simgrid::simix::unmarshal<smx_activity_t>(simcall->result);
 }
index bebfa0e..bf3c72c 100644 (file)
@@ -124,11 +124,17 @@ inline static smx_activity_t simcall_BODY_execution_start(const char* name, doub
     if (0) simcall_HANDLER_execution_start(&SIMIX_process_self()->simcall, name, flops_amount, priority, bound);
     return simcall<smx_activity_t, const char*, double, double, double>(SIMCALL_EXECUTION_START, name, flops_amount, priority, bound);
   }
-  
-inline static smx_activity_t simcall_BODY_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate) {
-    /* Go to that function to follow the code flow through the simcall barrier */
-    if (0) SIMIX_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate);
-    return simcall<smx_activity_t, const char*, int, sg_host_t*, double*, double*, double, double>(SIMCALL_EXECUTION_PARALLEL_START, name, host_nb, host_list, flops_amount, bytes_amount, amount, rate);
+
+  inline static smx_activity_t simcall_BODY_execution_parallel_start(const char* name, int host_nb,
+                                                                     sg_host_t* host_list, double* flops_amount,
+                                                                     double* bytes_amount, double amount, double rate,
+                                                                     double timeout)
+  {
+    /* Go to that function to follow the code flow through the simcall barrier */
+    if (0)
+      SIMIX_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate, timeout);
+    return simcall<smx_activity_t, const char*, int, sg_host_t*, double*, double*, double, double, double>(
+        SIMCALL_EXECUTION_PARALLEL_START, name, host_nb, host_list, flops_amount, bytes_amount, amount, rate, timeout);
   }
   
 inline static void simcall_BODY_execution_cancel(smx_activity_t execution) {
index 68fb148..696168e 100644 (file)
@@ -176,9 +176,15 @@ case SIMCALL_EXECUTION_START:
       break;
 
 case SIMCALL_EXECUTION_PARALLEL_START:
-      simgrid::simix::marshal<smx_activity_t>(simcall->result, SIMIX_execution_parallel_start(simgrid::simix::unmarshal<const char*>(simcall->args[0]), simgrid::simix::unmarshal<int>(simcall->args[1]), simgrid::simix::unmarshal<sg_host_t*>(simcall->args[2]), simgrid::simix::unmarshal<double*>(simcall->args[3]), simgrid::simix::unmarshal<double*>(simcall->args[4]), simgrid::simix::unmarshal<double>(simcall->args[5]), simgrid::simix::unmarshal<double>(simcall->args[6])));
-      SIMIX_simcall_answer(simcall);
-      break;
+  simgrid::simix::marshal<smx_activity_t>(
+      simcall->result,
+      SIMIX_execution_parallel_start(
+          simgrid::simix::unmarshal<const char*>(simcall->args[0]), simgrid::simix::unmarshal<int>(simcall->args[1]),
+          simgrid::simix::unmarshal<sg_host_t*>(simcall->args[2]), simgrid::simix::unmarshal<double*>(simcall->args[3]),
+          simgrid::simix::unmarshal<double*>(simcall->args[4]), simgrid::simix::unmarshal<double>(simcall->args[5]),
+          simgrid::simix::unmarshal<double>(simcall->args[6]), simgrid::simix::unmarshal<double>(simcall->args[7])));
+  SIMIX_simcall_answer(simcall);
+  break;
 
 case SIMCALL_EXECUTION_CANCEL:
       SIMIX_execution_cancel(simgrid::simix::unmarshal<smx_activity_t>(simcall->args[0]));
index f2b8b0f..a2f70c7 100644 (file)
@@ -53,7 +53,7 @@ int  process_join(smx_actor_t process, double timeout) [[block]];
 int  process_sleep(double duration) [[block]];
 
 smx_activity_t execution_start(const char* name, double flops_amount, double priority, double bound);
-smx_activity_t execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate) [[nohandler]];
+smx_activity_t execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate, double timeout) [[nohandler]];
 void          execution_cancel(smx_activity_t execution) [[nohandler]];
 void          execution_set_priority(smx_activity_t execution, double priority) [[nohandler]];
 void          execution_set_bound(smx_activity_t execution, double bound) [[nohandler]];
index 9147633..bd6371e 100644 (file)
@@ -191,8 +191,9 @@ smx_activity_t SIMIX_execution_start(smx_actor_t issuer, const char *name, doubl
   return exec;
 }
 
-smx_activity_t SIMIX_execution_parallel_start(const char *name, int host_nb, sg_host_t *host_list, double *flops_amount,
-                                             double *bytes_amount, double amount, double rate){
+smx_activity_t SIMIX_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount,
+                                              double* bytes_amount, double amount, double rate, double timeout)
+{
 
   /* alloc structures and initialize */
   simgrid::kernel::activity::Exec *exec = new simgrid::kernel::activity::Exec(name, nullptr);
@@ -213,6 +214,10 @@ smx_activity_t SIMIX_execution_parallel_start(const char *name, int host_nb, sg_
   if (!MC_is_active() && !MC_record_replay_is_active()) {
     exec->surf_exec = surf_host_model->executeParallelTask(host_nb, host_list_cpy, flops_amount, bytes_amount, rate);
     exec->surf_exec->setData(exec);
+    if (timeout > 0) {
+      exec->timeoutDetector = host_list[0]->pimpl_cpu->sleep(timeout);
+      exec->timeoutDetector->setData(exec);
+    }
   }
   XBT_DEBUG("Create parallel execute synchro %p", exec);
 
@@ -284,6 +289,11 @@ void SIMIX_execution_finish(simgrid::kernel::activity::Exec *exec)
         SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Canceled");
         break;
 
+      case SIMIX_TIMEOUT:
+        XBT_DEBUG("SIMIX_execution_finished: execution timeouted");
+        SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Timeouted");
+        break;
+
       default:
         xbt_die("Internal error in SIMIX_execution_finish: unexpected synchro state %d",
             (int)exec->state);
index 230060c..d8f3e19 100644 (file)
@@ -54,10 +54,9 @@ XBT_PRIVATE void SIMIX_host_add_auto_restart_process(sg_host_t host,
 XBT_PRIVATE void SIMIX_host_autorestart(sg_host_t host);
 XBT_PRIVATE smx_activity_t SIMIX_execution_start(smx_actor_t issuer, const char *name,
     double flops_amount, double priority, double bound);
-XBT_PRIVATE smx_activity_t SIMIX_execution_parallel_start(const char *name,
-    int host_nb, sg_host_t *host_list,
-    double *flops_amount, double *bytes_amount,
-    double amount, double rate);
+XBT_PRIVATE smx_activity_t SIMIX_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list,
+                                                          double* flops_amount, double* bytes_amount, double amount,
+                                                          double rate, double timeout);
 XBT_PRIVATE void SIMIX_execution_cancel(smx_activity_t synchro);
 XBT_PRIVATE void SIMIX_execution_set_priority(smx_activity_t synchro, double priority);
 XBT_PRIVATE void SIMIX_execution_set_bound(smx_activity_t synchro, double bound);