From: Martin Quinson Date: Fri, 11 Nov 2016 23:15:16 +0000 (+0100) Subject: New function: MSG_parallel_task_execute_with_timeout X-Git-Tag: v3_14~221 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/7169524b7d6066c8ff315dfca1c529bf3e39a6b3 New function: MSG_parallel_task_execute_with_timeout This is the fist time that executions can finish with a timeout, and I'm not really proud of the resulting code. Sorry, that's just a quick (fix #115) while I was fixing the platform generation. At some point the whole activity thingy should be reworked, too. --- diff --git a/examples/msg/energy-ptask/energy-ptask.c b/examples/msg/energy-ptask/energy-ptask.c index 01d8ccebfa..3cf07cc8ce 100644 --- a/examples/msg/energy-ptask/energy-ptask.c +++ b/examples/msg/energy-ptask/energy-ptask.c @@ -1,5 +1,4 @@ -/* Copyright (c) 2007-2015. The SimGrid Team. - * All rights reserved. */ +/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -10,8 +9,8 @@ XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example"); /** @addtogroup MSG_examples - * - * - parallel_task/parallel_task.c: Demonstrates the use of @ref MSG_parallel_task_create, to create special + * + * - energy-ptask/energy-ptask.c: Demonstrates the use of @ref MSG_parallel_task_create, to create special * tasks that run on several hosts at the same time. The resulting simulations are very close to what can be * achieved in @ref SD_API, but still allows to use the other features of MSG (it'd be cool to be able to mix * interfaces, but it's not possible ATM). @@ -41,6 +40,20 @@ static int runner(int argc, char *argv[]) MSG_task_destroy(ptask); /* The arrays communication_amounts and computation_amounts are not to be freed manually */ + XBT_INFO("We can do the same with a timeout of one second enabled."); + computation_amounts = xbt_new0(double, hosts_count); + communication_amounts = xbt_new0(double, hosts_count* hosts_count); + for (int i = 0; i < hosts_count; i++) + computation_amounts[i] = 1e9; // 1 Gflop + for (int i = 0; i < hosts_count; i++) + for (int j = i + 1; j < hosts_count; j++) + communication_amounts[i * hosts_count + j] = 1e7; // 10 MB + ptask = + MSG_parallel_task_create("parallel task", hosts_count, hosts, computation_amounts, communication_amounts, NULL); + msg_error_t errcode = MSG_parallel_task_execute_with_timeout(ptask, 1 /* timeout (in seconds)*/); + xbt_assert(errcode == MSG_TIMEOUT, "Woops, this did not timeout as expected... Please report that bug."); + MSG_task_destroy(ptask); + XBT_INFO("Then, build a parallel task involving only computations and no communication (1 Gflop per node)"); computation_amounts = xbt_new0(double, hosts_count); for (int i = 0; i < hosts_count; i++) diff --git a/examples/msg/energy-ptask/energy-ptask.tesh b/examples/msg/energy-ptask/energy-ptask.tesh index 2bfd392fb2..0298dca8c5 100644 --- a/examples/msg/energy-ptask/energy-ptask.tesh +++ b/examples/msg/energy-ptask/energy-ptask.tesh @@ -3,11 +3,12 @@ $ $SG_TEST_EXENV energy-ptask/energy-ptask$EXEEXT ${srcdir:=.}/../platforms/energy_platform.xml --energy "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n" > [ 0.000000] (0:maestro@) Switching to the L07 model to handle parallel tasks. > [ 0.000000] (1:test@MyHost1) First, build a classical parallel task, with 1 Gflop to execute on each node, and 10MB to exchange between each pair -> [300.000000] (1:test@MyHost1) Then, build a parallel task involving only computations and no communication (1 Gflop per node) -> [310.000000] (1:test@MyHost1) Then, build a parallel task with no computation nor communication (synchro only) -> [310.000000] (1:test@MyHost1) Finally, trick the ptask to do a 'remote execution', on host MyHost2 -> [320.000000] (1:test@MyHost1) Goodbye now! -> [320.000000] (0:maestro@) Simulation done. -> [320.000000] (0:maestro@) Total energy of host MyHost1: 38200.000000 Joules -> [320.000000] (0:maestro@) Total energy of host MyHost2: 38400.000000 Joules -> [320.000000] (0:maestro@) Total energy of host MyHost3: 38200.000000 Joules +> [300.000000] (1:test@MyHost1) We can do the same with a timeout of one second enabled. +> [301.000000] (1:test@MyHost1) Then, build a parallel task involving only computations and no communication (1 Gflop per node) +> [311.000000] (1:test@MyHost1) Then, build a parallel task with no computation nor communication (synchro only) +> [311.000000] (1:test@MyHost1) Finally, trick the ptask to do a 'remote execution', on host MyHost2 +> [321.000000] (1:test@MyHost1) Goodbye now! +> [321.000000] (0:maestro@) Simulation done. +> [321.000000] (0:maestro@) Total energy of host MyHost1: 38320.000000 Joules +> [321.000000] (0:maestro@) Total energy of host MyHost2: 38520.000000 Joules +> [321.000000] (0:maestro@) Total energy of host MyHost3: 38320.000000 Joules diff --git a/include/simgrid/msg.h b/include/simgrid/msg.h index a7f1ad3537..421a8837bb 100644 --- a/include/simgrid/msg.h +++ b/include/simgrid/msg.h @@ -356,6 +356,7 @@ XBT_PUBLIC(msg_error_t) MSG_task_destroy(msg_task_t task); XBT_PUBLIC(msg_error_t) MSG_task_execute(msg_task_t task); XBT_PUBLIC(msg_error_t) MSG_parallel_task_execute(msg_task_t task); +XBT_PUBLIC(msg_error_t) MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout); XBT_PUBLIC(void) MSG_task_set_priority(msg_task_t task, double priority); XBT_PUBLIC(void) MSG_task_set_bound(msg_task_t task, double bound); diff --git a/include/simgrid/simix.h b/include/simgrid/simix.h index a0e8e675d3..40f0295ccc 100644 --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@ -80,6 +80,7 @@ typedef enum { SIMIX_FAILED, SIMIX_SRC_HOST_FAILURE, SIMIX_DST_HOST_FAILURE, + SIMIX_TIMEOUT, SIMIX_SRC_TIMEOUT, SIMIX_DST_TIMEOUT, SIMIX_LINK_FAILURE @@ -248,13 +249,9 @@ XBT_PUBLIC(void) simcall_host_set_data(sg_host_t host, void *data); XBT_PUBLIC(smx_activity_t) simcall_execution_start(const char *name, double flops_amount, double priority, double bound); -XBT_PUBLIC(smx_activity_t) simcall_execution_parallel_start(const char *name, - int host_nb, - sg_host_t *host_list, - double *flops_amount, - double *bytes_amount, - double amount, - double rate); +XBT_PUBLIC(smx_activity_t) +simcall_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, + double* bytes_amount, double amount, double rate, double timeout); XBT_PUBLIC(void) simcall_execution_cancel(smx_activity_t execution); XBT_PUBLIC(void) simcall_execution_set_priority(smx_activity_t execution, double priority); XBT_PUBLIC(void) simcall_execution_set_bound(smx_activity_t execution, double bound); diff --git a/src/kernel/activity/SynchroExec.cpp b/src/kernel/activity/SynchroExec.cpp index 2617322f95..31ad7d13b1 100644 --- a/src/kernel/activity/SynchroExec.cpp +++ b/src/kernel/activity/SynchroExec.cpp @@ -21,6 +21,8 @@ simgrid::kernel::activity::Exec::~Exec() { if (surf_exec) surf_exec->unref(); + if (timeoutDetector) + timeoutDetector->unref(); } void simgrid::kernel::activity::Exec::suspend() { @@ -51,6 +53,8 @@ void simgrid::kernel::activity::Exec::post() } else if (surf_exec->getState() == simgrid::surf::Action::State::failed) { /* If the host running the synchro didn't fail, then the synchro was canceled */ state = SIMIX_CANCELED; + } else if (timeoutDetector && timeoutDetector->getState() == simgrid::surf::Action::State::done) { + state = SIMIX_TIMEOUT; } else { state = SIMIX_DONE; } @@ -59,6 +63,10 @@ void simgrid::kernel::activity::Exec::post() surf_exec->unref(); surf_exec = nullptr; } + if (timeoutDetector) { + timeoutDetector->unref(); + timeoutDetector = nullptr; + } /* If there are simcalls associated with the synchro, then answer them */ if (!simcalls.empty()) diff --git a/src/kernel/activity/SynchroExec.hpp b/src/kernel/activity/SynchroExec.hpp index d6694941a9..c0e315f956 100644 --- a/src/kernel/activity/SynchroExec.hpp +++ b/src/kernel/activity/SynchroExec.hpp @@ -24,6 +24,7 @@ namespace activity { sg_host_t host = nullptr; /* The host where the execution takes place. If nullptr, then this is a parallel exec (and only surf knows the hosts) */ surf_action_t surf_exec = nullptr; /* The Surf execution action encapsulated */ + surf::Action* timeoutDetector = nullptr; }; }}} // namespace simgrid::kernel::activity diff --git a/src/msg/msg_gos.cpp b/src/msg/msg_gos.cpp index e9227f098d..6c2ea24a9c 100644 --- a/src/msg/msg_gos.cpp +++ b/src/msg/msg_gos.cpp @@ -42,6 +42,11 @@ msg_error_t MSG_task_execute(msg_task_t task) * or #MSG_HOST_FAILURE otherwise */ msg_error_t MSG_parallel_task_execute(msg_task_t task) +{ + return MSG_parallel_task_execute_with_timeout(task, -1); +} + +msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout) { simdata_task_t simdata = task->simdata; simdata_process_t p_simdata = static_cast(SIMIX_process_self_get_data()); @@ -63,10 +68,9 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task) simdata->setUsed(); if (simdata->host_nb > 0) { - simdata->compute = static_cast( - simcall_execution_parallel_start(task->name, simdata->host_nb,simdata->host_list, - simdata->flops_parallel_amount, simdata->bytes_parallel_amount, - 1.0, -1.0)); + simdata->compute = static_cast(simcall_execution_parallel_start( + task->name, simdata->host_nb, simdata->host_list, simdata->flops_parallel_amount, + simdata->bytes_parallel_amount, 1.0, -1.0, timeout)); XBT_DEBUG("Parallel execution action created: %p", simdata->compute); } else { simdata->compute = static_cast( @@ -89,6 +93,9 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task) case host_error: status = MSG_HOST_FAILURE; break; + case timeout_error: + status = MSG_TIMEOUT; + break; default: throw; } diff --git a/src/simix/libsmx.cpp b/src/simix/libsmx.cpp index a9ad2c9471..4686b80115 100644 --- a/src/simix/libsmx.cpp +++ b/src/simix/libsmx.cpp @@ -97,15 +97,12 @@ smx_activity_t simcall_execution_start(const char *name, * amount between each pair of hosts * \param amount the SURF action amount * \param rate the SURF action rate + * \param timeout timeout * \return A new SIMIX execution synchronization */ -smx_activity_t simcall_execution_parallel_start(const char *name, - int host_nb, - sg_host_t *host_list, - double *flops_amount, - double *bytes_amount, - double amount, - double rate) +smx_activity_t simcall_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, + double* flops_amount, double* bytes_amount, double amount, double rate, + double timeout) { int i,j; /* checking for infinite values */ @@ -122,11 +119,8 @@ smx_activity_t simcall_execution_parallel_start(const char *name, xbt_assert(std::isfinite(amount), "amount is not finite!"); xbt_assert(std::isfinite(rate), "rate is not finite!"); - return simcall_BODY_execution_parallel_start(name, host_nb, host_list, - flops_amount, - bytes_amount, - amount, rate); - + return simcall_BODY_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate, + timeout); } /** diff --git a/src/simix/popping_accessors.h b/src/simix/popping_accessors.h index d911ffccad..9da45f34e8 100644 --- a/src/simix/popping_accessors.h +++ b/src/simix/popping_accessors.h @@ -215,6 +215,14 @@ static inline double simcall_execution_parallel_start__get__rate(smx_simcall_t s static inline void simcall_execution_parallel_start__set__rate(smx_simcall_t simcall, double arg) { simgrid::simix::marshal(simcall->args[6], arg); } +static inline double simcall_execution_parallel_start__get__timeout(smx_simcall_t simcall) +{ + return simgrid::simix::unmarshal(simcall->args[7]); +} +static inline void simcall_execution_parallel_start__set__timeout(smx_simcall_t simcall, double arg) +{ + simgrid::simix::marshal(simcall->args[7], arg); +} static inline smx_activity_t simcall_execution_parallel_start__get__result(smx_simcall_t simcall){ return simgrid::simix::unmarshal(simcall->result); } diff --git a/src/simix/popping_bodies.cpp b/src/simix/popping_bodies.cpp index bebfa0ea67..bf3c72c124 100644 --- a/src/simix/popping_bodies.cpp +++ b/src/simix/popping_bodies.cpp @@ -124,11 +124,17 @@ inline static smx_activity_t simcall_BODY_execution_start(const char* name, doub if (0) simcall_HANDLER_execution_start(&SIMIX_process_self()->simcall, name, flops_amount, priority, bound); return simcall(SIMCALL_EXECUTION_START, name, flops_amount, priority, bound); } - -inline static smx_activity_t simcall_BODY_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate) { - /* Go to that function to follow the code flow through the simcall barrier */ - if (0) SIMIX_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate); - return simcall(SIMCALL_EXECUTION_PARALLEL_START, name, host_nb, host_list, flops_amount, bytes_amount, amount, rate); + + inline static smx_activity_t simcall_BODY_execution_parallel_start(const char* name, int host_nb, + sg_host_t* host_list, double* flops_amount, + double* bytes_amount, double amount, double rate, + double timeout) + { + /* Go to that function to follow the code flow through the simcall barrier */ + if (0) + SIMIX_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate, timeout); + return simcall( + SIMCALL_EXECUTION_PARALLEL_START, name, host_nb, host_list, flops_amount, bytes_amount, amount, rate, timeout); } inline static void simcall_BODY_execution_cancel(smx_activity_t execution) { diff --git a/src/simix/popping_generated.cpp b/src/simix/popping_generated.cpp index 68fb148f87..696168e558 100644 --- a/src/simix/popping_generated.cpp +++ b/src/simix/popping_generated.cpp @@ -176,9 +176,15 @@ case SIMCALL_EXECUTION_START: break; case SIMCALL_EXECUTION_PARALLEL_START: - simgrid::simix::marshal(simcall->result, SIMIX_execution_parallel_start(simgrid::simix::unmarshal(simcall->args[0]), simgrid::simix::unmarshal(simcall->args[1]), simgrid::simix::unmarshal(simcall->args[2]), simgrid::simix::unmarshal(simcall->args[3]), simgrid::simix::unmarshal(simcall->args[4]), simgrid::simix::unmarshal(simcall->args[5]), simgrid::simix::unmarshal(simcall->args[6]))); - SIMIX_simcall_answer(simcall); - break; + simgrid::simix::marshal( + simcall->result, + SIMIX_execution_parallel_start( + simgrid::simix::unmarshal(simcall->args[0]), simgrid::simix::unmarshal(simcall->args[1]), + simgrid::simix::unmarshal(simcall->args[2]), simgrid::simix::unmarshal(simcall->args[3]), + simgrid::simix::unmarshal(simcall->args[4]), simgrid::simix::unmarshal(simcall->args[5]), + simgrid::simix::unmarshal(simcall->args[6]), simgrid::simix::unmarshal(simcall->args[7]))); + SIMIX_simcall_answer(simcall); + break; case SIMCALL_EXECUTION_CANCEL: SIMIX_execution_cancel(simgrid::simix::unmarshal(simcall->args[0])); diff --git a/src/simix/simcalls.in b/src/simix/simcalls.in index f2b8b0fca4..a2f70c7a37 100644 --- a/src/simix/simcalls.in +++ b/src/simix/simcalls.in @@ -53,7 +53,7 @@ int process_join(smx_actor_t process, double timeout) [[block]]; int process_sleep(double duration) [[block]]; smx_activity_t execution_start(const char* name, double flops_amount, double priority, double bound); -smx_activity_t execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate) [[nohandler]]; +smx_activity_t execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate, double timeout) [[nohandler]]; void execution_cancel(smx_activity_t execution) [[nohandler]]; void execution_set_priority(smx_activity_t execution, double priority) [[nohandler]]; void execution_set_bound(smx_activity_t execution, double bound) [[nohandler]]; diff --git a/src/simix/smx_host.cpp b/src/simix/smx_host.cpp index 9147633f9f..bd6371ed10 100644 --- a/src/simix/smx_host.cpp +++ b/src/simix/smx_host.cpp @@ -191,8 +191,9 @@ smx_activity_t SIMIX_execution_start(smx_actor_t issuer, const char *name, doubl return exec; } -smx_activity_t SIMIX_execution_parallel_start(const char *name, int host_nb, sg_host_t *host_list, double *flops_amount, - double *bytes_amount, double amount, double rate){ +smx_activity_t SIMIX_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, + double* bytes_amount, double amount, double rate, double timeout) +{ /* alloc structures and initialize */ simgrid::kernel::activity::Exec *exec = new simgrid::kernel::activity::Exec(name, nullptr); @@ -213,6 +214,10 @@ smx_activity_t SIMIX_execution_parallel_start(const char *name, int host_nb, sg_ if (!MC_is_active() && !MC_record_replay_is_active()) { exec->surf_exec = surf_host_model->executeParallelTask(host_nb, host_list_cpy, flops_amount, bytes_amount, rate); exec->surf_exec->setData(exec); + if (timeout > 0) { + exec->timeoutDetector = host_list[0]->pimpl_cpu->sleep(timeout); + exec->timeoutDetector->setData(exec); + } } XBT_DEBUG("Create parallel execute synchro %p", exec); @@ -284,6 +289,11 @@ void SIMIX_execution_finish(simgrid::kernel::activity::Exec *exec) SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Canceled"); break; + case SIMIX_TIMEOUT: + XBT_DEBUG("SIMIX_execution_finished: execution timeouted"); + SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Timeouted"); + break; + default: xbt_die("Internal error in SIMIX_execution_finish: unexpected synchro state %d", (int)exec->state); diff --git a/src/simix/smx_host_private.h b/src/simix/smx_host_private.h index 230060cdab..d8f3e1930c 100644 --- a/src/simix/smx_host_private.h +++ b/src/simix/smx_host_private.h @@ -54,10 +54,9 @@ XBT_PRIVATE void SIMIX_host_add_auto_restart_process(sg_host_t host, XBT_PRIVATE void SIMIX_host_autorestart(sg_host_t host); XBT_PRIVATE smx_activity_t SIMIX_execution_start(smx_actor_t issuer, const char *name, double flops_amount, double priority, double bound); -XBT_PRIVATE smx_activity_t SIMIX_execution_parallel_start(const char *name, - int host_nb, sg_host_t *host_list, - double *flops_amount, double *bytes_amount, - double amount, double rate); +XBT_PRIVATE smx_activity_t SIMIX_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, + double* flops_amount, double* bytes_amount, double amount, + double rate, double timeout); XBT_PRIVATE void SIMIX_execution_cancel(smx_activity_t synchro); XBT_PRIVATE void SIMIX_execution_set_priority(smx_activity_t synchro, double priority); XBT_PRIVATE void SIMIX_execution_set_bound(smx_activity_t synchro, double bound);