examples/s4u/exec-ptask/s4u-exec-ptask
examples/s4u/exec-remote/s4u-exec-remote
examples/s4u/exec-waitany/s4u-exec-waitany
+examples/s4u/exec-waitfor/s4u-exec-waitfor
examples/s4u/io-async/s4u-io-async
examples/s4u/io-file-remote/s4u-io-file-remote
examples/s4u/io-file-system/s4u-io-file-system
- Actor: Merge signals on_migration_start/end into on_host_change
- Actor: Rename migrate() into set_host()
- Disk: Allow users to get the read and write nominal bandwidth values
+- Exec: Implement wait_for(timeout)
XML:
- Parse errors now raise a simgrid::ParseError that you may want to catch.
include examples/s4u/exec-remote/s4u-exec-remote.tesh
include examples/s4u/exec-waitany/s4u-exec-waitany.cpp
include examples/s4u/exec-waitany/s4u-exec-waitany.tesh
+include examples/s4u/exec-waitfor/s4u-exec-waitfor.cpp
+include examples/s4u/exec-waitfor/s4u-exec-waitfor.tesh
include examples/s4u/io-async/s4u-io-async.cpp
include examples/s4u/io-async/s4u-io-async.tesh
include examples/s4u/io-disk-raw/s4u-io-disk-raw.cpp
cloud-capping cloud-migration cloud-simple
energy-exec energy-boot energy-link energy-vm energy-exec-ptask
engine-filtering
- exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany
+ exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany exec-waitfor
io-async io-file-system io-file-remote io-disk-raw
platform-failures platform-profile platform-properties
plugin-hostload
--- /dev/null
+/* Copyright (c) 2019. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitfor, "Messages specific for this s4u example");
+
+static void worker()
+{
+ simgrid::s4u::ExecPtr exec;
+ double amount = 5 * simgrid::s4u::this_actor::get_host()->get_speed();
+ XBT_INFO("Create an activity that should run for 5 seconds");
+
+ exec = simgrid::s4u::this_actor::exec_async(amount);
+
+ /* Now that execution is started, wait for 3 seconds. */
+ XBT_INFO("But let it end after 3 seconds");
+ try {
+ exec->wait_for(3);
+ XBT_INFO("Execution complete");
+ } catch (simgrid::TimeoutException&) {
+ XBT_INFO("Execution Timeout!");
+ }
+
+ /* do it again, but this time with a timeout greater than the duration of the execution */
+ XBT_INFO("Create another activity that should run for 5 seconds and wait for it for 6 seconds");
+ exec = simgrid::s4u::this_actor::exec_async(amount);
+ try {
+ exec->wait_for(6);
+ XBT_INFO("Execution complete");
+ } catch (simgrid::TimeoutException&) {
+ XBT_INFO("Execution Timeout!");
+ }
+
+ XBT_INFO("Finally test with a parallel execution");
+ auto hosts = simgrid::s4u::Engine::get_instance()->get_all_hosts();
+ size_t hosts_count = hosts.size();
+ std::vector<double> computation_amounts;
+ std::vector<double> communication_amounts;
+
+ computation_amounts.assign(hosts_count, 1e9 /*1Gflop*/);
+ communication_amounts.assign(hosts_count * hosts_count, 0);
+ for (size_t i = 0; i < hosts_count; i++)
+ for (size_t j = i + 1; j < hosts_count; j++)
+ communication_amounts[i * hosts_count + j] = 1e7; // 10 MB
+
+ exec = simgrid::s4u::this_actor::exec_init(hosts, computation_amounts, communication_amounts);
+ try {
+ exec->wait_for(2);
+ XBT_INFO("Parallel Execution complete");
+ } catch (simgrid::TimeoutException&) {
+ XBT_INFO("Parallel Execution Timeout!");
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid::s4u::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+ simgrid::s4u::Actor::create("worker", simgrid::s4u::Host::by_name("Tremblay"), worker);
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+$ ${bindir:=.}/s4u-exec-waitfor ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
+> [ 0.000000] [ worker] Create an activity that should run for 5 seconds
+> [ 0.000000] [ worker] But let it end after 3 seconds
+> [ 3.000000] [ worker] Execution Timeout!
+> [ 3.000000] [ worker] Create another activity that should run for 5 seconds and wait for it for 6 seconds
+> [ 8.000000] [ worker] Execution complete
+> [ 8.000000] [ worker] Finally test with a parallel execution
+> [ 10.000000] [ worker] Parallel Execution Timeout!
/******************************* Host simcalls ********************************/
#ifdef __cplusplus
-XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution);
+XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution, double timeout);
XBT_PUBLIC unsigned int simcall_execution_waitany_for(simgrid::kernel::activity::ExecImpl* execs[], size_t count,
double timeout);
XBT_PUBLIC bool simcall_execution_test(const smx_activity_t& execution);
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_process);
-void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro)
+void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro, double timeout)
{
XBT_DEBUG("Wait for execution of synchro %p, state %d", synchro, (int)synchro->state_);
+ xbt_assert(std::isfinite(timeout), "timeout is not finite!");
/* Associate this simcall to the synchro */
synchro->register_simcall(simcall);
/* set surf's synchro */
if (MC_is_active() || MC_record_replay_is_active()) {
- synchro->state_ = simgrid::kernel::activity::State::DONE;
+ int idx = SIMCALL_GET_MC_VALUE(*simcall);
+ if (idx == 0) {
+ synchro->state_ = simgrid::kernel::activity::State::DONE;
+ } else {
+ /* If we reached this point, the wait simcall must have a timeout */
+ /* Otherwise it shouldn't be enabled and executed by the MC */
+ if (timeout < 0.0)
+ THROW_IMPOSSIBLE;
+ synchro->state_ = simgrid::kernel::activity::State::TIMEOUT;
+ }
synchro->finish();
return;
}
/* If the synchro is already finished then perform the error handling */
- if (synchro->state_ != simgrid::kernel::activity::State::RUNNING)
+ if (synchro->state_ != simgrid::kernel::activity::State::RUNNING) {
synchro->finish();
+ } else { /* we need a sleep action (even when there is no timeout) to be notified of host failures */
+ synchro->set_timeout(timeout);
+ }
}
void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro)
void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
const std::vector<double>& bytes_amounts)
{
- parallel_execute(hosts, flops_amounts, bytes_amounts, -1);
+ exec_init(hosts, flops_amounts, bytes_amounts)->wait();
}
void parallel_execute(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
const std::vector<double>& bytes_amounts, double timeout)
+{
+ exec_init(hosts, flops_amounts, bytes_amounts)->wait_for(timeout);
+}
+
+ExecPtr exec_init(double flops_amount)
+{
+ return ExecPtr(new ExecSeq(get_host(), flops_amount));
+}
+
+ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
+ const std::vector<double>& bytes_amounts)
{
xbt_assert(hosts.size() > 0, "Your parallel executions must span over at least one host.");
xbt_assert(hosts.size() == flops_amounts.size() || flops_amounts.empty(),
xbt_assert(std::all_of(bytes_amounts.begin(), bytes_amounts.end(), [](double elm) { return std::isfinite(elm); }),
"flops_amounts comprises infinite values!");
- exec_init(hosts, flops_amounts, bytes_amounts)->set_timeout(timeout)->wait();
-}
-
-ExecPtr exec_init(double flops_amount)
-{
- return ExecPtr(new ExecSeq(get_host(), flops_amount));
-}
-
-ExecPtr exec_init(const std::vector<s4u::Host*>& hosts, const std::vector<double>& flops_amounts,
- const std::vector<double>& bytes_amounts)
-{
return ExecPtr(new ExecPar(hosts, flops_amounts, bytes_amounts));
}
}
Exec* Exec::wait()
+{
+ return this->wait_for(-1);
+}
+
+Exec* Exec::wait_for(double timeout)
{
if (state_ == State::INITED)
start();
- simcall_execution_wait(pimpl_);
+ simcall_execution_wait(pimpl_, timeout);
state_ = State::FINISHED;
on_completion(*Actor::self(), *this);
return this;
}
-Exec* Exec::wait_for(double)
-{
- THROW_UNIMPLEMENTED;
-}
-
int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
{
std::unique_ptr<kernel::activity::ExecImpl* []> rexecs(new kernel::activity::ExecImpl*[execs->size()]);
*
* @param execution The execution synchro
*/
-e_smx_state_t simcall_execution_wait(const smx_activity_t& execution)
+e_smx_state_t simcall_execution_wait(const smx_activity_t& execution, double timeout)
{
- return (e_smx_state_t)simcall_BODY_execution_wait(static_cast<simgrid::kernel::activity::ExecImpl*>(execution.get()));
+ return (e_smx_state_t)simcall_BODY_execution_wait(static_cast<simgrid::kernel::activity::ExecImpl*>(execution.get()),
+ timeout);
}
bool simcall_execution_test(const smx_activity_t& execution)
{
simgrid::simix::marshal<simgrid::kernel::activity::ExecImpl*>(simcall->args_[0], arg);
}
+static inline double simcall_execution_wait__get__timeout(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<double>(simcall->args_[1]);
+}
+static inline double simcall_execution_wait__getraw__timeout(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal_raw<double>(simcall->args_[1]);
+}
+static inline void simcall_execution_wait__set__timeout(smx_simcall_t simcall, double arg)
+{
+ simgrid::simix::marshal<double>(simcall->args_[1], arg);
+}
static inline int simcall_execution_wait__get__result(smx_simcall_t simcall)
{
return simgrid::simix::unmarshal<int>(simcall->result_);
/* The prototype of all simcall handlers, automatically generated for you */
-XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
+XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution, double timeout);
XBT_PRIVATE void simcall_HANDLER_execution_waitany_for(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout);
XBT_PRIVATE void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t sender, smx_mailbox_t mbox, double task_size, double rate, unsigned char* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout);
return simgrid::simix::unmarshal<R>(self->simcall.result_);
}
-inline static int simcall_BODY_execution_wait(simgrid::kernel::activity::ExecImpl* execution)
+inline static int simcall_BODY_execution_wait(simgrid::kernel::activity::ExecImpl* execution, double timeout)
{
if (0) /* Go to that function to follow the code flow through the simcall barrier */
- simcall_HANDLER_execution_wait(&SIMIX_process_self()->simcall, execution);
- return simcall<int, simgrid::kernel::activity::ExecImpl*>(SIMCALL_EXECUTION_WAIT, execution);
+ simcall_HANDLER_execution_wait(&SIMIX_process_self()->simcall, execution, timeout);
+ return simcall<int, simgrid::kernel::activity::ExecImpl*, double>(SIMCALL_EXECUTION_WAIT, execution, timeout);
}
inline static int simcall_BODY_execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout)
return;
switch (simcall.call_) {
case SIMCALL_EXECUTION_WAIT:
- simcall_HANDLER_execution_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall.args_[0]));
+ simcall_HANDLER_execution_wait(&simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall.args_[0]), simgrid::simix::unmarshal<double>(simcall.args_[1]));
break;
case SIMCALL_EXECUTION_WAITANY_FOR:
# Last but not the least, you should declare the new simix call in
# ./include/simgrid/simix.h (otherwise you will get a warning at compile time)
-int execution_wait(simgrid::kernel::activity::ExecImpl* execution) [[block]];
+int execution_wait(simgrid::kernel::activity::ExecImpl* execution, double timeout) [[block]];
int execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout) [[block]];
bool execution_test(simgrid::kernel::activity::ExecImpl* execution) [[block]];