cloud-capping cloud-migration cloud-simple
energy-exec energy-boot energy-link energy-vm
engine-filtering
- exec-async exec-basic exec-dvfs exec-ptask exec-remote
+ exec-async exec-basic exec-dvfs exec-ptask exec-remote exec-waitany
io-async io-file-system io-file-remote io-storage-raw
platform-failures platform-profile platform-properties
plugin-hostload
--- /dev/null
+/* Copyright (c) 2019. The SimGrid Team. All rights reserved. */
+
+/* This program is free software; you can redistribute it and/or modify it
+ * under the terms of the license (GNU LGPL) which comes with this package. */
+
+#include "simgrid/s4u.hpp"
+#include <cstdlib>
+#include <iostream>
+#include <string>
+
+XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_exec_waitany, "Messages specific for this s4u example");
+
+static void worker(bool with_timeout)
+{
+ /* Vector in which we store all pending executions*/
+ std::vector<simgrid::s4u::ExecPtr> pending_executions;
+
+ for (int i = 0; i < 3; i++) {
+ std::string name = std::string("Exec-") + std::to_string(i);
+ double amount = (6 * (i % 2) + i + 1) * simgrid::s4u::this_actor::get_host()->get_speed();
+
+ simgrid::s4u::ExecPtr exec = simgrid::s4u::this_actor::exec_init(amount)->set_name(name);
+ pending_executions.push_back(exec);
+ exec->start();
+
+ XBT_INFO("Activity %s has started for %.0f seconds", name.c_str(),
+ amount / simgrid::s4u::this_actor::get_host()->get_speed());
+ }
+
+ /* Now that executions were initiated, wait for their completion, in order of termination.
+ *
+ * This loop waits for first terminating execution with wait_any() and remove it with erase(), until all execs are
+ * terminated.
+ */
+ while (not pending_executions.empty()) {
+ int pos;
+ if (with_timeout)
+ pos = simgrid::s4u::Exec::wait_any_for(&pending_executions, 4);
+ else
+ pos = simgrid::s4u::Exec::wait_any(&pending_executions);
+
+ if (pos < 0) {
+ XBT_INFO("Do not wait any longer for an activity");
+ pending_executions.clear();
+ } else {
+ XBT_INFO("Activity '%s' (at position %d) is complete", pending_executions[pos]->get_cname(), pos);
+ pending_executions.erase(pending_executions.begin() + pos);
+ }
+ XBT_INFO("%lu activities remain pending", pending_executions.size());
+ }
+}
+
+int main(int argc, char* argv[])
+{
+ simgrid::s4u::Engine e(&argc, argv);
+ e.load_platform(argv[1]);
+ simgrid::s4u::Actor::create("worker", simgrid::s4u::Host::by_name("Tremblay"), worker, false);
+ simgrid::s4u::Actor::create("worker_timeout", simgrid::s4u::Host::by_name("Tremblay"), worker, true);
+ e.run();
+
+ return 0;
+}
--- /dev/null
+#!/usr/bin/env tesh
+
+! output sort 19
+$ ${bindir:=.}/s4u-exec-waitany ${platfdir}/multicore_machine.xml "--log=root.fmt:[%10.6r]%e[%14P]%e%m%n"
+> [ 0.000000] [ worker] Activity Exec-0 has started for 1 seconds
+> [ 0.000000] [worker_timeout] Activity Exec-0 has started for 1 seconds
+> [ 0.000000] [ worker] Activity Exec-1 has started for 8 seconds
+> [ 0.000000] [worker_timeout] Activity Exec-1 has started for 8 seconds
+> [ 0.000000] [ worker] Activity Exec-2 has started for 3 seconds
+> [ 0.000000] [worker_timeout] Activity Exec-2 has started for 3 seconds
+> [ 1.000000] [worker_timeout] Activity 'Exec-0' (at position 0) is complete
+> [ 1.000000] [worker_timeout] 2 activities remain pending
+> [ 1.000000] [ worker] Activity 'Exec-0' (at position 0) is complete
+> [ 1.000000] [ worker] 2 activities remain pending
+> [ 3.000000] [worker_timeout] Activity 'Exec-2' (at position 1) is complete
+> [ 3.000000] [worker_timeout] 1 activities remain pending
+> [ 3.000000] [ worker] Activity 'Exec-2' (at position 1) is complete
+> [ 3.000000] [ worker] 1 activities remain pending
+> [ 7.000000] [worker_timeout] Do not wait any longer for an activity
+> [ 7.000000] [worker_timeout] 0 activities remain pending
+> [ 8.000000] [ worker] Activity 'Exec-1' (at position 0) is complete
+> [ 8.000000] [ worker] 0 activities remain pending
Exec* wait() override;
Exec* wait_for(double timeout) override;
+ /*! take a vector of s4u::ExecPtr and return when one of them is finished.
+ * The return value is the rank of the first finished ExecPtr. */
+ static int wait_any(std::vector<ExecPtr>* execs) { return wait_any_for(execs, -1); }
+ /*! Same as wait_any, but with a timeout. If the timeout occurs, parameter last is returned.*/
+ static int wait_any_for(std::vector<ExecPtr>* execs, double timeout);
+
bool test() override;
ExecPtr set_bound(double bound);
ExecPtr set_tracing_category(const std::string& category);
ExecPtr set_timeout(double timeout);
Exec* cancel() override;
+ std::string get_name() const { return name_; }
+ const char* get_cname() const { return name_.c_str(); }
XBT_ATTRIB_DEPRECATED_v324("Please use Exec::wait_for()") void wait(double t) override { wait_for(t); }
};
/******************************* Host simcalls ********************************/
#ifdef __cplusplus
XBT_PUBLIC e_smx_state_t simcall_execution_wait(const smx_activity_t& execution);
+XBT_PUBLIC unsigned int simcall_execution_waitany_for(simgrid::kernel::activity::ExecImpl* execs[], size_t count,
+ double timeout);
XBT_PUBLIC bool simcall_execution_test(const smx_activity_t& execution);
#endif
#include "simgrid/s4u/Host.hpp"
+#include <boost/range/algorithm.hpp>
+
XBT_LOG_EXTERNAL_DEFAULT_CATEGORY(simix_process);
void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* synchro)
simcall_execution_test__set__result(simcall, res);
}
+void simcall_HANDLER_execution_waitany_for(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execs[],
+ size_t count, double timeout)
+{
+ if (timeout < 0.0) {
+ simcall->timer = nullptr;
+ } else {
+ simcall->timer = simgrid::simix::Timer::set(SIMIX_get_clock() + timeout, [simcall]() {
+ simgrid::kernel::activity::ExecImpl** execs = simcall_execution_waitany_for__get__execs(simcall);
+ size_t count = simcall_execution_waitany_for__get__count(simcall);
+
+ for (size_t i = 0; i < count; i++) {
+ // Remove the first occurence of simcall:
+ auto* exec = execs[i];
+ auto j = boost::range::find(exec->simcalls_, simcall);
+ if (j != exec->simcalls_.end())
+ exec->simcalls_.erase(j);
+ }
+ simcall_execution_waitany_for__set__result(simcall, -1);
+ SIMIX_simcall_answer(simcall);
+ });
+ }
+
+ for (size_t i = 0; i < count; i++) {
+ /* associate this simcall to the the synchro */
+ auto* exec = execs[i];
+ exec->simcalls_.push_back(simcall);
+
+ /* see if the synchro is already finished */
+ if (exec->state_ != SIMIX_WAITING && exec->state_ != SIMIX_RUNNING) {
+ exec->finish();
+ break;
+ }
+ }
+}
+
namespace simgrid {
namespace kernel {
namespace activity {
while (not simcalls_.empty()) {
smx_simcall_t simcall = simcalls_.front();
simcalls_.pop_front();
+
+ /* If a waitany simcall is waiting for this synchro to finish, then remove it from the other synchros in the waitany
+ * list. Afterwards, get the position of the actual synchro in the waitany list and return it as the result of the
+ * simcall */
+
+ if (simcall->call == SIMCALL_NONE) // FIXME: maybe a better way to handle this case
+ continue; // if process handling comm is killed
+ if (simcall->call == SIMCALL_EXECUTION_WAITANY_FOR) {
+ simgrid::kernel::activity::ExecImpl** execs = simcall_execution_waitany_for__get__execs(simcall);
+ size_t count = simcall_execution_waitany_for__get__count(simcall);
+
+ for (size_t i = 0; i < count; i++) {
+ // Remove the first occurence of simcall:
+ auto* exec = execs[i];
+ auto j = boost::range::find(exec->simcalls_, simcall);
+ if (j != exec->simcalls_.end())
+ exec->simcalls_.erase(j);
+
+ if (simcall->timer) {
+ simcall->timer->remove();
+ simcall->timer = nullptr;
+ }
+ }
+
+ if (not MC_is_active() && not MC_record_replay_is_active()) {
+ ExecImpl** element = std::find(execs, execs + count, this);
+ int rank = (element != execs + count) ? element - execs : -1;
+ simcall_execution_waitany_for__set__result(simcall, rank);
+ }
+ }
+
switch (state_) {
case SIMIX_DONE:
}
simcall->issuer->waiting_synchro = nullptr;
- simcall_execution_wait__set__result(simcall, state_);
-
/* Fail the process if the host is down */
if (simcall->issuer->get_host()->is_on())
SIMIX_simcall_answer(simcall);
THROW_UNIMPLEMENTED;
}
+int Exec::wait_any_for(std::vector<ExecPtr>* execs, double timeout)
+{
+ std::unique_ptr<kernel::activity::ExecImpl* []> rexecs(new kernel::activity::ExecImpl*[execs->size()]);
+ std::transform(begin(*execs), end(*execs), rexecs.get(),
+ [](const ExecPtr& exec) { return static_cast<kernel::activity::ExecImpl*>(exec->pimpl_.get()); });
+ return simcall_execution_waitany_for(rexecs.get(), execs->size(), timeout);
+}
+
Exec* Exec::cancel()
{
simix::simcall([this] { boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->cancel(); });
return simcall_BODY_execution_test(static_cast<simgrid::kernel::activity::ExecImpl*>(execution.get()));
}
+unsigned int simcall_execution_waitany_for(simgrid::kernel::activity::ExecImpl* execs[], size_t count, double timeout)
+{
+ return simcall_BODY_execution_waitany_for(execs, count, timeout);
+}
+
void simcall_process_join(smx_actor_t process, double timeout)
{
simcall_BODY_process_join(process, timeout);
simgrid::simix::marshal<int>(simcall->result, result);
}
+static inline simgrid::kernel::activity::ExecImpl** simcall_execution_waitany_for__get__execs(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl**>(simcall->args[0]);
+}
+static inline simgrid::kernel::activity::ExecImpl** simcall_execution_waitany_for__getraw__execs(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal_raw<simgrid::kernel::activity::ExecImpl**>(simcall->args[0]);
+}
+static inline void simcall_execution_waitany_for__set__execs(smx_simcall_t simcall,
+ simgrid::kernel::activity::ExecImpl** arg)
+{
+ simgrid::simix::marshal<simgrid::kernel::activity::ExecImpl**>(simcall->args[0], arg);
+}
+static inline size_t simcall_execution_waitany_for__get__count(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<size_t>(simcall->args[1]);
+}
+static inline size_t simcall_execution_waitany_for__getraw__count(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal_raw<size_t>(simcall->args[1]);
+}
+static inline void simcall_execution_waitany_for__set__count(smx_simcall_t simcall, size_t arg)
+{
+ simgrid::simix::marshal<size_t>(simcall->args[1], arg);
+}
+static inline double simcall_execution_waitany_for__get__timeout(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<double>(simcall->args[2]);
+}
+static inline double simcall_execution_waitany_for__getraw__timeout(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal_raw<double>(simcall->args[2]);
+}
+static inline void simcall_execution_waitany_for__set__timeout(smx_simcall_t simcall, double arg)
+{
+ simgrid::simix::marshal<double>(simcall->args[2], arg);
+}
+static inline int simcall_execution_waitany_for__get__result(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal<int>(simcall->result);
+}
+static inline int simcall_execution_waitany_for__getraw__result(smx_simcall_t simcall)
+{
+ return simgrid::simix::unmarshal_raw<int>(simcall->result);
+}
+static inline void simcall_execution_waitany_for__set__result(smx_simcall_t simcall, int result)
+{
+ simgrid::simix::marshal<int>(simcall->result, result);
+}
+
static inline simgrid::kernel::activity::ExecImpl* simcall_execution_test__get__execution(smx_simcall_t simcall)
{
return simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall->args[0]);
XBT_PRIVATE void simcall_HANDLER_process_join(smx_simcall_t simcall, smx_actor_t process, double timeout);
XBT_PRIVATE void simcall_HANDLER_process_sleep(smx_simcall_t simcall, double duration);
XBT_PRIVATE void simcall_HANDLER_execution_wait(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
+XBT_PRIVATE void simcall_HANDLER_execution_waitany_for(smx_simcall_t simcall,
+ simgrid::kernel::activity::ExecImpl** execs, size_t count,
+ double timeout);
XBT_PRIVATE void simcall_HANDLER_execution_test(smx_simcall_t simcall, simgrid::kernel::activity::ExecImpl* execution);
XBT_PRIVATE void simcall_HANDLER_comm_send(smx_simcall_t simcall, smx_actor_t sender, smx_mailbox_t mbox,
double task_size, double rate, unsigned char* src_buff, size_t src_buff_size,
XBT_PRIVATE void simcall_HANDLER_sem_acquire(smx_simcall_t simcall, smx_sem_t sem);
XBT_PRIVATE void simcall_HANDLER_sem_acquire_timeout(smx_simcall_t simcall, smx_sem_t sem, double timeout);
XBT_PRIVATE void simcall_HANDLER_io_wait(smx_simcall_t simcall, simgrid::kernel::activity::IoImpl* io);
-XBT_PRIVATE int simcall_HANDLER_mc_random(smx_simcall_t simcall, int min, int max);
\ No newline at end of file
+XBT_PRIVATE int simcall_HANDLER_mc_random(smx_simcall_t simcall, int min, int max);
return simcall<int, simgrid::kernel::activity::ExecImpl*>(SIMCALL_EXECUTION_WAIT, execution);
}
+inline static int simcall_BODY_execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count,
+ double timeout)
+{
+ if (0) /* Go to that function to follow the code flow through the simcall barrier */
+ simcall_HANDLER_execution_waitany_for(&SIMIX_process_self()->simcall, execs, count, timeout);
+ return simcall<int, simgrid::kernel::activity::ExecImpl**, size_t, double>(SIMCALL_EXECUTION_WAITANY_FOR, execs,
+ count, timeout);
+}
+
inline static bool simcall_BODY_execution_test(simgrid::kernel::activity::ExecImpl* execution)
{
if (0) /* Go to that function to follow the code flow through the simcall barrier */
SIMCALL_PROCESS_JOIN,
SIMCALL_PROCESS_SLEEP,
SIMCALL_EXECUTION_WAIT,
+ SIMCALL_EXECUTION_WAITANY_FOR,
SIMCALL_EXECUTION_TEST,
SIMCALL_COMM_SEND,
SIMCALL_COMM_ISEND,
"SIMCALL_PROCESS_JOIN",
"SIMCALL_PROCESS_SLEEP",
"SIMCALL_EXECUTION_WAIT",
+ "SIMCALL_EXECUTION_WAITANY_FOR",
"SIMCALL_EXECUTION_TEST",
"SIMCALL_COMM_SEND",
"SIMCALL_COMM_ISEND",
simcall_HANDLER_execution_wait(simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall->args[0]));
break;
+case SIMCALL_EXECUTION_WAITANY_FOR:
+ simcall_HANDLER_execution_waitany_for(
+ simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl**>(simcall->args[0]),
+ simgrid::simix::unmarshal<size_t>(simcall->args[1]), simgrid::simix::unmarshal<double>(simcall->args[2]));
+ break;
+
case SIMCALL_EXECUTION_TEST:
simcall_HANDLER_execution_test(simcall, simgrid::simix::unmarshal<simgrid::kernel::activity::ExecImpl*>(simcall->args[0]));
break;
int process_sleep(double duration) [[block]];
int execution_wait(simgrid::kernel::activity::ExecImpl* execution) [[block]];
+int execution_waitany_for(simgrid::kernel::activity::ExecImpl** execs, size_t count, double timeout) [[block]];
bool execution_test(simgrid::kernel::activity::ExecImpl* execution) [[block]];
void comm_send(smx_actor_t sender, smx_mailbox_t mbox, double task_size, double rate, unsigned char* src_buff, size_t src_buff_size, simix_match_func_t match_fun, simix_copy_data_func_t copy_data_fun, void* data, double timeout) [[block]];