-/* Copyright (c) 2007-2015. The SimGrid Team.
- * All rights reserved. */
+/* Copyright (c) 2007-2016. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example");
/** @addtogroup MSG_examples
- *
- * - <b>parallel_task/parallel_task.c</b>: Demonstrates the use of @ref MSG_parallel_task_create, to create special
+ *
+ * - <b>energy-ptask/energy-ptask.c</b>: Demonstrates the use of @ref MSG_parallel_task_create, to create special
* tasks that run on several hosts at the same time. The resulting simulations are very close to what can be
* achieved in @ref SD_API, but still allows to use the other features of MSG (it'd be cool to be able to mix
* interfaces, but it's not possible ATM).
MSG_task_destroy(ptask);
/* The arrays communication_amounts and computation_amounts are not to be freed manually */
+ XBT_INFO("We can do the same with a timeout of one second enabled.");
+ computation_amounts = xbt_new0(double, hosts_count);
+ communication_amounts = xbt_new0(double, hosts_count* hosts_count);
+ for (int i = 0; i < hosts_count; i++)
+ computation_amounts[i] = 1e9; // 1 Gflop
+ for (int i = 0; i < hosts_count; i++)
+ for (int j = i + 1; j < hosts_count; j++)
+ communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
+ ptask =
+ MSG_parallel_task_create("parallel task", hosts_count, hosts, computation_amounts, communication_amounts, NULL);
+ msg_error_t errcode = MSG_parallel_task_execute_with_timeout(ptask, 1 /* timeout (in seconds)*/);
+ xbt_assert(errcode == MSG_TIMEOUT, "Woops, this did not timeout as expected... Please report that bug.");
+ MSG_task_destroy(ptask);
+
XBT_INFO("Then, build a parallel task involving only computations and no communication (1 Gflop per node)");
computation_amounts = xbt_new0(double, hosts_count);
for (int i = 0; i < hosts_count; i++)
$ $SG_TEST_EXENV energy-ptask/energy-ptask$EXEEXT ${srcdir:=.}/../platforms/energy_platform.xml --energy "--log=root.fmt:[%10.6r]%e(%i:%P@%h)%e%m%n"
> [ 0.000000] (0:maestro@) Switching to the L07 model to handle parallel tasks.
> [ 0.000000] (1:test@MyHost1) First, build a classical parallel task, with 1 Gflop to execute on each node, and 10MB to exchange between each pair
-> [300.000000] (1:test@MyHost1) Then, build a parallel task involving only computations and no communication (1 Gflop per node)
-> [310.000000] (1:test@MyHost1) Then, build a parallel task with no computation nor communication (synchro only)
-> [310.000000] (1:test@MyHost1) Finally, trick the ptask to do a 'remote execution', on host MyHost2
-> [320.000000] (1:test@MyHost1) Goodbye now!
-> [320.000000] (0:maestro@) Simulation done.
-> [320.000000] (0:maestro@) Total energy of host MyHost1: 38200.000000 Joules
-> [320.000000] (0:maestro@) Total energy of host MyHost2: 38400.000000 Joules
-> [320.000000] (0:maestro@) Total energy of host MyHost3: 38200.000000 Joules
+> [300.000000] (1:test@MyHost1) We can do the same with a timeout of one second enabled.
+> [301.000000] (1:test@MyHost1) Then, build a parallel task involving only computations and no communication (1 Gflop per node)
+> [311.000000] (1:test@MyHost1) Then, build a parallel task with no computation nor communication (synchro only)
+> [311.000000] (1:test@MyHost1) Finally, trick the ptask to do a 'remote execution', on host MyHost2
+> [321.000000] (1:test@MyHost1) Goodbye now!
+> [321.000000] (0:maestro@) Simulation done.
+> [321.000000] (0:maestro@) Total energy of host MyHost1: 38320.000000 Joules
+> [321.000000] (0:maestro@) Total energy of host MyHost2: 38520.000000 Joules
+> [321.000000] (0:maestro@) Total energy of host MyHost3: 38320.000000 Joules
XBT_PUBLIC(msg_error_t) MSG_task_execute(msg_task_t task);
XBT_PUBLIC(msg_error_t) MSG_parallel_task_execute(msg_task_t task);
+XBT_PUBLIC(msg_error_t) MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout);
XBT_PUBLIC(void) MSG_task_set_priority(msg_task_t task, double priority);
XBT_PUBLIC(void) MSG_task_set_bound(msg_task_t task, double bound);
SIMIX_FAILED,
SIMIX_SRC_HOST_FAILURE,
SIMIX_DST_HOST_FAILURE,
+ SIMIX_TIMEOUT,
SIMIX_SRC_TIMEOUT,
SIMIX_DST_TIMEOUT,
SIMIX_LINK_FAILURE
XBT_PUBLIC(smx_activity_t) simcall_execution_start(const char *name,
double flops_amount,
double priority, double bound);
-XBT_PUBLIC(smx_activity_t) simcall_execution_parallel_start(const char *name,
- int host_nb,
- sg_host_t *host_list,
- double *flops_amount,
- double *bytes_amount,
- double amount,
- double rate);
+XBT_PUBLIC(smx_activity_t)
+simcall_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount,
+ double* bytes_amount, double amount, double rate, double timeout);
XBT_PUBLIC(void) simcall_execution_cancel(smx_activity_t execution);
XBT_PUBLIC(void) simcall_execution_set_priority(smx_activity_t execution, double priority);
XBT_PUBLIC(void) simcall_execution_set_bound(smx_activity_t execution, double bound);
{
if (surf_exec)
surf_exec->unref();
+ if (timeoutDetector)
+ timeoutDetector->unref();
}
void simgrid::kernel::activity::Exec::suspend()
{
} else if (surf_exec->getState() == simgrid::surf::Action::State::failed) {
/* If the host running the synchro didn't fail, then the synchro was canceled */
state = SIMIX_CANCELED;
+ } else if (timeoutDetector && timeoutDetector->getState() == simgrid::surf::Action::State::done) {
+ state = SIMIX_TIMEOUT;
} else {
state = SIMIX_DONE;
}
surf_exec->unref();
surf_exec = nullptr;
}
+ if (timeoutDetector) {
+ timeoutDetector->unref();
+ timeoutDetector = nullptr;
+ }
/* If there are simcalls associated with the synchro, then answer them */
if (!simcalls.empty())
sg_host_t host = nullptr; /* The host where the execution takes place. If nullptr, then this is a parallel exec (and only surf knows the hosts) */
surf_action_t surf_exec = nullptr; /* The Surf execution action encapsulated */
+ surf::Action* timeoutDetector = nullptr;
};
}}} // namespace simgrid::kernel::activity
* or #MSG_HOST_FAILURE otherwise
*/
msg_error_t MSG_parallel_task_execute(msg_task_t task)
+{
+ return MSG_parallel_task_execute_with_timeout(task, -1);
+}
+
+msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout)
{
simdata_task_t simdata = task->simdata;
simdata_process_t p_simdata = static_cast<simdata_process_t>(SIMIX_process_self_get_data());
simdata->setUsed();
if (simdata->host_nb > 0) {
- simdata->compute = static_cast<simgrid::kernel::activity::Exec*>(
- simcall_execution_parallel_start(task->name, simdata->host_nb,simdata->host_list,
- simdata->flops_parallel_amount, simdata->bytes_parallel_amount,
- 1.0, -1.0));
+ simdata->compute = static_cast<simgrid::kernel::activity::Exec*>(simcall_execution_parallel_start(
+ task->name, simdata->host_nb, simdata->host_list, simdata->flops_parallel_amount,
+ simdata->bytes_parallel_amount, 1.0, -1.0, timeout));
XBT_DEBUG("Parallel execution action created: %p", simdata->compute);
} else {
simdata->compute = static_cast<simgrid::kernel::activity::Exec*>(
case host_error:
status = MSG_HOST_FAILURE;
break;
+ case timeout_error:
+ status = MSG_TIMEOUT;
+ break;
default:
throw;
}
* amount between each pair of hosts
* \param amount the SURF action amount
* \param rate the SURF action rate
+ * \param timeout timeout
* \return A new SIMIX execution synchronization
*/
-smx_activity_t simcall_execution_parallel_start(const char *name,
- int host_nb,
- sg_host_t *host_list,
- double *flops_amount,
- double *bytes_amount,
- double amount,
- double rate)
+smx_activity_t simcall_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list,
+ double* flops_amount, double* bytes_amount, double amount, double rate,
+ double timeout)
{
int i,j;
/* checking for infinite values */
xbt_assert(std::isfinite(amount), "amount is not finite!");
xbt_assert(std::isfinite(rate), "rate is not finite!");
- return simcall_BODY_execution_parallel_start(name, host_nb, host_list,
- flops_amount,
- bytes_amount,
- amount, rate);
-
+ return simcall_BODY_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate,
+ timeout);
}
/**
static inline void simcall_execution_parallel_start__set__rate(smx_simcall_t simcall, double arg) {
simgrid::simix::marshal<double>(simcall->args[6], arg);
}
+static inline double simcall_execution_parallel_start__get__timeout(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<double>(simcall->args[7]);
+}
+static inline void simcall_execution_parallel_start__set__timeout(smx_simcall_t simcall, double arg)
+{
+ simgrid::simix::marshal<double>(simcall->args[7], arg);
+}
static inline smx_activity_t simcall_execution_parallel_start__get__result(smx_simcall_t simcall){
return simgrid::simix::unmarshal<smx_activity_t>(simcall->result);
}
if (0) simcall_HANDLER_execution_start(&SIMIX_process_self()->simcall, name, flops_amount, priority, bound);
return simcall<smx_activity_t, const char*, double, double, double>(SIMCALL_EXECUTION_START, name, flops_amount, priority, bound);
}
-
-inline static smx_activity_t simcall_BODY_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate) {
- /* Go to that function to follow the code flow through the simcall barrier */
- if (0) SIMIX_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate);
- return simcall<smx_activity_t, const char*, int, sg_host_t*, double*, double*, double, double>(SIMCALL_EXECUTION_PARALLEL_START, name, host_nb, host_list, flops_amount, bytes_amount, amount, rate);
+
+ inline static smx_activity_t simcall_BODY_execution_parallel_start(const char* name, int host_nb,
+ sg_host_t* host_list, double* flops_amount,
+ double* bytes_amount, double amount, double rate,
+ double timeout)
+ {
+ /* Go to that function to follow the code flow through the simcall barrier */
+ if (0)
+ SIMIX_execution_parallel_start(name, host_nb, host_list, flops_amount, bytes_amount, amount, rate, timeout);
+ return simcall<smx_activity_t, const char*, int, sg_host_t*, double*, double*, double, double, double>(
+ SIMCALL_EXECUTION_PARALLEL_START, name, host_nb, host_list, flops_amount, bytes_amount, amount, rate, timeout);
}
inline static void simcall_BODY_execution_cancel(smx_activity_t execution) {
break;
case SIMCALL_EXECUTION_PARALLEL_START:
- simgrid::simix::marshal<smx_activity_t>(simcall->result, SIMIX_execution_parallel_start(simgrid::simix::unmarshal<const char*>(simcall->args[0]), simgrid::simix::unmarshal<int>(simcall->args[1]), simgrid::simix::unmarshal<sg_host_t*>(simcall->args[2]), simgrid::simix::unmarshal<double*>(simcall->args[3]), simgrid::simix::unmarshal<double*>(simcall->args[4]), simgrid::simix::unmarshal<double>(simcall->args[5]), simgrid::simix::unmarshal<double>(simcall->args[6])));
- SIMIX_simcall_answer(simcall);
- break;
+ simgrid::simix::marshal<smx_activity_t>(
+ simcall->result,
+ SIMIX_execution_parallel_start(
+ simgrid::simix::unmarshal<const char*>(simcall->args[0]), simgrid::simix::unmarshal<int>(simcall->args[1]),
+ simgrid::simix::unmarshal<sg_host_t*>(simcall->args[2]), simgrid::simix::unmarshal<double*>(simcall->args[3]),
+ simgrid::simix::unmarshal<double*>(simcall->args[4]), simgrid::simix::unmarshal<double>(simcall->args[5]),
+ simgrid::simix::unmarshal<double>(simcall->args[6]), simgrid::simix::unmarshal<double>(simcall->args[7])));
+ SIMIX_simcall_answer(simcall);
+ break;
case SIMCALL_EXECUTION_CANCEL:
SIMIX_execution_cancel(simgrid::simix::unmarshal<smx_activity_t>(simcall->args[0]));
int process_sleep(double duration) [[block]];
smx_activity_t execution_start(const char* name, double flops_amount, double priority, double bound);
-smx_activity_t execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate) [[nohandler]];
+smx_activity_t execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount, double* bytes_amount, double amount, double rate, double timeout) [[nohandler]];
void execution_cancel(smx_activity_t execution) [[nohandler]];
void execution_set_priority(smx_activity_t execution, double priority) [[nohandler]];
void execution_set_bound(smx_activity_t execution, double bound) [[nohandler]];
return exec;
}
-smx_activity_t SIMIX_execution_parallel_start(const char *name, int host_nb, sg_host_t *host_list, double *flops_amount,
- double *bytes_amount, double amount, double rate){
+smx_activity_t SIMIX_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list, double* flops_amount,
+ double* bytes_amount, double amount, double rate, double timeout)
+{
/* alloc structures and initialize */
simgrid::kernel::activity::Exec *exec = new simgrid::kernel::activity::Exec(name, nullptr);
if (!MC_is_active() && !MC_record_replay_is_active()) {
exec->surf_exec = surf_host_model->executeParallelTask(host_nb, host_list_cpy, flops_amount, bytes_amount, rate);
exec->surf_exec->setData(exec);
+ if (timeout > 0) {
+ exec->timeoutDetector = host_list[0]->pimpl_cpu->sleep(timeout);
+ exec->timeoutDetector->setData(exec);
+ }
}
XBT_DEBUG("Create parallel execute synchro %p", exec);
SMX_EXCEPTION(simcall->issuer, cancel_error, 0, "Canceled");
break;
+ case SIMIX_TIMEOUT:
+ XBT_DEBUG("SIMIX_execution_finished: execution timeouted");
+ SMX_EXCEPTION(simcall->issuer, timeout_error, 0, "Timeouted");
+ break;
+
default:
xbt_die("Internal error in SIMIX_execution_finish: unexpected synchro state %d",
(int)exec->state);
XBT_PRIVATE void SIMIX_host_autorestart(sg_host_t host);
XBT_PRIVATE smx_activity_t SIMIX_execution_start(smx_actor_t issuer, const char *name,
double flops_amount, double priority, double bound);
-XBT_PRIVATE smx_activity_t SIMIX_execution_parallel_start(const char *name,
- int host_nb, sg_host_t *host_list,
- double *flops_amount, double *bytes_amount,
- double amount, double rate);
+XBT_PRIVATE smx_activity_t SIMIX_execution_parallel_start(const char* name, int host_nb, sg_host_t* host_list,
+ double* flops_amount, double* bytes_amount, double amount,
+ double rate, double timeout);
XBT_PRIVATE void SIMIX_execution_cancel(smx_activity_t synchro);
XBT_PRIVATE void SIMIX_execution_set_priority(smx_activity_t synchro, double priority);
XBT_PRIVATE void SIMIX_execution_set_bound(smx_activity_t synchro, double bound);