auto* faulty = e.host_by_name("Faulty Host");
auto* safe = e.host_by_name("Safe Host");
- sg4::Exec::on_completion.connect([](sg4::Exec const& exec) {
- if (exec.get_state() == sg4::Activity::State::FINISHED)
- XBT_INFO("Activity '%s' is complete (start time: %f, finish time: %f)", exec.get_cname(), exec.get_start_time(),
- exec.get_finish_time());
- if (exec.get_state() == sg4::Activity::State::FAILED) {
- if (exec.is_parallel())
- XBT_INFO("Activity '%s' has failed. %.f %% remain to be done", exec.get_cname(),
- 100 * exec.get_remaining_ratio());
+ sg4::Activity::on_completion.connect([](sg4::Activity& activity) {
+ auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
+ if (exec == nullptr) // Only Execs are concerned here
+ return;
+ if (exec->get_state() == sg4::Activity::State::FINISHED)
+ XBT_INFO("Activity '%s' is complete (start time: %f, finish time: %f)", exec->get_cname(), exec->get_start_time(),
+ exec->get_finish_time());
+ if (exec->get_state() == sg4::Activity::State::FAILED) {
+ if (exec->is_parallel())
+ XBT_INFO("Activity '%s' has failed. %.f %% remain to be done", exec->get_cname(),
+ 100 * exec->get_remaining_ratio());
else
- XBT_INFO("Activity '%s' has failed. %.f flops remain to be done", exec.get_cname(), exec.get_remaining());
+ XBT_INFO("Activity '%s' has failed. %.f flops remain to be done", exec->get_cname(), exec->get_remaining());
}
});
(a.dependencies_solved() ? "solved" : "NOT solved"), (a.is_assigned() ? "assigned" : "NOT assigned"));
});
- simgrid::s4u::Exec::on_completion.connect([](simgrid::s4u::Exec const& exec) {
- XBT_INFO("Activity '%s' is complete (start time: %f, finish time: %f)", exec.get_cname(), exec.get_start_time(),
- exec.get_finish_time());
+ simgrid::s4u::Activity::on_completion.connect([](simgrid::s4u::Activity& activity) {
+ auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
+ if (exec == nullptr) // Only Execs are concerned here
+ return;
+ XBT_INFO("Activity '%s' is complete (start time: %f, finish time: %f)", exec->get_cname(), exec->get_start_time(),
+ exec->get_finish_time());
});
// Create a small DAG: parent->write_output->read_input->child
(exec.is_assigned() ? "assigned" : "NOT assigned"));
});
- simgrid::s4u::Exec::on_completion.connect([](simgrid::s4u::Exec const& exec) {
- XBT_INFO("Activity '%s' is complete (start time: %f, finish time: %f)", exec.get_cname(), exec.get_start_time(),
- exec.get_finish_time());
+ simgrid::s4u::Activity::on_completion.connect([](simgrid::s4u::Activity& activity) {
+ auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
+ if (exec == nullptr) // Only Execs are concerned here
+ return;
+ XBT_INFO("Activity '%s' is complete (start time: %f, finish time: %f)", exec->get_cname(), exec->get_start_time(),
+ exec->get_finish_time());
});
// Define an amount of work that should take 1 second to execute.
Activity() = default;
virtual ~Activity() = default;
- virtual void complete(Activity::State state)
- {
- state_ = state;
- if (state == State::FINISHED)
- release_dependencies();
- }
-
void release_dependencies()
{
while (not successors_.empty()) {
/*! Signal fired each time that the activity fails to start because of a veto (e.g., unsolved dependency or no
* resource assigned) */
static xbt::signal<void(Activity&)> on_veto;
+ /*! Signal fired when theactivity completes (either normally, cancelled or failed) */
+ static xbt::signal<void(Activity&)> on_completion;
void vetoable_start()
{
}
}
+ void complete(Activity::State state)
+ {
+ state_ = state;
+ if (state == State::FINISHED)
+ release_dependencies();
+ on_completion(*this);
+ }
+
static std::set<Activity*>* get_vetoed_activities() { return vetoed_activities_; }
static void set_vetoed_activities(std::set<Activity*>* whereto) { vetoed_activities_ = whereto; }
Comm() = default;
-protected:
- void complete(Activity::State state) override;
-
public:
#ifndef DOXYGEN
friend Mailbox; // Factory of comms
protected:
explicit Exec(kernel::activity::ExecImplPtr pimpl);
- void complete(Activity::State state) override;
void reset();
public:
#endif
/*! Signal fired each time that an execution actually starts (no veto) */
static xbt::signal<void(Exec const&)> on_start;
- /*! Signal fired each time that an execution terminates (either normally, cancelled or failed) */
- static xbt::signal<void(Exec const&)> on_completion;
static ExecPtr init();
Exec* start() override;
protected:
explicit Io(kernel::activity::IoImplPtr pimpl);
- void complete(Activity::State state) override;
-
public:
enum class OpType { READ, WRITE };
static xbt::signal<void(Io const&)> on_start;
- static xbt::signal<void(Io const&)> on_completion;
static IoPtr init();
Io* start() override;
s4u::Exec::on_start.connect([](s4u::Exec const&) {
Container::by_name(instr_pid(*s4u::Actor::self()))->get_state("ACTOR_STATE")->push_event("execute");
});
- s4u::Exec::on_completion.connect([](s4u::Exec const&) {
+ s4u::Activity::on_completion.connect([](s4u::Activity&) {
Container::by_name(instr_pid(*s4u::Actor::self()))->get_state("ACTOR_STATE")->pop_event();
});
s4u::Comm::on_send.connect([](s4u::Comm const&) {
s4u::Comm::on_recv.connect([](s4u::Comm const&) {
Container::by_name(instr_pid(*s4u::Actor::self()))->get_state("ACTOR_STATE")->push_event("receive");
});
- s4u::Comm::on_completion.connect([](s4u::Comm const&) {
- Container::by_name(instr_pid(*s4u::Actor::self()))->get_state("ACTOR_STATE")->pop_event();
- });
s4u::Actor::on_host_change.connect(on_actor_host_change);
}
->get_state("MPI_STATE")
->push_event("computing", new CpuTIData("compute", exec.get_cost()));
});
- s4u::Exec::on_completion.connect([](s4u::Exec const&) {
+ s4u::Activity::on_completion.connect([](s4u::Activity&) {
Container::by_name(std::string("rank-") + std::to_string(s4u::Actor::self()->get_pid()))
->get_state("MPI_STATE")
->pop_event();
if (action->get_activity() != nullptr) {
// If nobody told the interface that the activity has failed, that's because no actor waits on it (maestro
// started it). SimDAG I see you!
- auto* exec = dynamic_cast<activity::ExecImpl*>(action->get_activity());
- if (exec != nullptr && exec->get_actor() == maestro_)
- exec->get_iface()->complete(s4u::Activity::State::FAILED);
-
- auto* io = dynamic_cast<activity::IoImpl*>(action->get_activity());
- if (io != nullptr && io->get_actor() == maestro_)
- io->get_iface()->complete(s4u::Activity::State::FAILED);
+ if (action->get_activity()->get_actor() == maestro_)
+ action->get_activity()->get_iface()->complete(s4u::Activity::State::FAILED);
activity::ActivityImplPtr(action->get_activity())->post();
}
State state_ = State::WAITING; /* State of the activity */
std::list<smx_simcall_t> simcalls_; /* List of simcalls waiting for this activity */
resource::Action* surf_action_ = nullptr;
+ actor::ActorImpl* actor_ = nullptr;
+ s4u::Activity* piface_ = nullptr;
protected:
void inline set_name(const std::string& name)
const std::string& get_name() const { return name_; }
const char* get_cname() const { return name_.c_str(); }
+ void set_actor(actor::ActorImpl* actor) { actor_ = actor; }
+ actor::ActorImpl* get_actor() const { return actor_; }
+
+ void set_iface(s4u::Activity* iface) { piface_ = iface; }
+ s4u::Activity* get_iface() { return piface_; }
+
virtual bool test();
virtual void wait_for(actor::ActorImpl* issuer, double timeout);
virtual ActivityImpl& set_timeout(double) { THROW_UNIMPLEMENTED; }
MailboxImpl* mbox_ = nullptr; /* Rendez-vous where the comm is queued */
s4u::Host* from_ = nullptr; /* Pre-determined only for direct host-to-host communications */
s4u::Host* to_ = nullptr; /* Otherwise, computed at start() time from the actors */
- s4u::Comm* piface_ = nullptr;
public:
enum class Type { SEND, RECEIVE };
explicit CommImpl(Type type) : type_(type) {}
CommImpl(s4u::Host* from, s4u::Host* to, double bytes);
- void set_iface(s4u::Comm* piface) { piface_ = piface; }
- s4u::Comm* get_iface() const { return piface_; }
CommImpl& set_size(double size);
CommImpl& set_src_buff(unsigned char* buff, size_t size);
CommImpl& set_dst_buff(unsigned char* buff, size_t* size);
piface_ = new s4u::Exec(this);
actor::ActorImpl* self = actor::ActorImpl::self();
if (self) {
- actor_ = self;
+ set_actor(self);
self->activities_.emplace_back(this);
}
}
clean_action();
timeout_detector_.reset();
- if (actor_) {
- actor_->activities_.remove(this);
+ if (get_actor() != nullptr) {
+ get_actor()->activities_.remove(this);
}
if (state_ != State::FAILED && cb_id_ >= 0)
s4u::Host::on_state_change.disconnect(cb_id_);
{
switch (state_) {
case State::FAILED:
- piface_->complete(s4u::Activity::State::FAILED);
+ static_cast<s4u::Exec*>(get_iface())->complete(s4u::Activity::State::FAILED);
if (issuer->get_host()->is_on())
issuer->exception_ = std::make_exception_ptr(HostFailureException(XBT_THROW_POINT, "Host failed"));
else /* else, the actor will be killed with no possibility to survive */
class XBT_PUBLIC ExecImpl : public ActivityImpl_T<ExecImpl> {
std::unique_ptr<resource::Action, std::function<void(resource::Action*)>> timeout_detector_{
nullptr, [](resource::Action* a) { a->unref(); }};
- actor::ActorImpl* actor_ = nullptr;
double sharing_penalty_ = 1.0;
double bound_ = 0.0;
double start_time_ = -1.0;
std::vector<s4u::Host*> hosts_;
std::vector<double> flops_amounts_;
std::vector<double> bytes_amounts_;
- s4u::Exec* piface_;
int cb_id_ = -1; // callback id from Host::on_state_change.connect()
public:
ExecImpl();
- s4u::Exec* get_iface() { return piface_; }
- actor::ActorImpl* get_actor() { return actor_; }
ExecImpl& set_timeout(double timeout) override;
ExecImpl& set_bound(double bound);
piface_ = new s4u::Io(this);
actor::ActorImpl* self = actor::ActorImpl::self();
if (self) {
- actor_ = self;
+ set_actor(self);
self->activities_.emplace_back(this);
}
}
switch (state_) {
case State::FAILED:
issuer->context_->set_wannadie();
- piface_->complete(s4u::Activity::State::FAILED);
+ static_cast<s4u::Io*>(get_iface())->complete(s4u::Activity::State::FAILED);
issuer->exception_ = std::make_exception_ptr(StorageFailureException(XBT_THROW_POINT, "Storage failed"));
break;
case State::CANCELED:
namespace activity {
class XBT_PUBLIC IoImpl : public ActivityImpl_T<IoImpl> {
- actor::ActorImpl* actor_ = nullptr;
resource::DiskImpl* disk_ = nullptr;
double sharing_penalty_ = 1.0;
sg_size_t size_ = 0;
s4u::Io::OpType type_ = s4u::Io::OpType::READ;
sg_size_t performed_ioops_ = 0;
resource::Action* timeout_detector_ = nullptr;
- s4u::Io* piface_;
public:
IoImpl();
- s4u::Io* get_iface() { return piface_; }
- actor::ActorImpl* get_actor() { return actor_; }
IoImpl& set_sharing_penalty(double sharing_penalty);
IoImpl& set_timeout(double timeout) override;
}
}
-static void remove_active_exec(s4u::Exec const& task)
+static void remove_active_exec(s4u::Activity& task)
{
- const s4u::VirtualMachine* vm = dynamic_cast<s4u::VirtualMachine*>(task.get_host());
+ auto* exec = dynamic_cast<s4u::Exec*>(&task);
+ if (exec == nullptr)
+ return;
+ const s4u::VirtualMachine* vm = dynamic_cast<s4u::VirtualMachine*>(exec->get_host());
if (vm != nullptr) {
VirtualMachineImpl* vm_impl = vm->get_vm_impl();
vm_impl->remove_active_exec();
{
s4u::Host::on_state_change.connect(host_state_change);
s4u::Exec::on_start.connect(add_active_exec);
- s4u::Exec::on_completion.connect(remove_active_exec);
+ s4u::Activity::on_completion.connect(remove_active_exec);
activity::ActivityImpl::on_resumed.connect(add_active_activity);
activity::ActivityImpl::on_suspended.connect(remove_active_activity);
}
if (activity.get_host() == get_host())
pre_task();
});
- simgrid::s4u::Exec::on_completion.connect([this](simgrid::s4u::Exec const& activity) {
+ simgrid::s4u::Activity::on_completion.connect([this](simgrid::s4u::Activity& activity) {
+ auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
+ if (exec == nullptr) // Only Execs are concerned here
+ return;
// For more than one host (not yet supported), we can access the host via
// simcalls_.front()->issuer->get_iface()->get_host()
- if (activity.get_host() == get_host() && iteration_running) {
- comp_timer += activity.get_finish_time() - activity.get_start_time();
+ if (exec->get_host() == get_host() && iteration_running) {
+ comp_timer += exec->get_finish_time() - exec->get_start_time();
}
});
// FIXME I think that this fires at the same time for all hosts, so when the src sends something,
XBT_WARN("HostLoad plugin currently does not support executions on several hosts");
}
});
- simgrid::s4u::Exec::on_completion.connect([](simgrid::s4u::Exec const& activity) {
- if (activity.get_host_number() == 1) { // We only run on one host
- simgrid::s4u::Host* host = activity.get_host();
+ simgrid::s4u::Activity::on_completion.connect([](simgrid::s4u::Activity& activity) {
+ auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
+ if (exec == nullptr) // Only Execs are concerned here
+ return;
+ if (exec->get_host_number() == 1) { // We only run on one host
+ simgrid::s4u::Host* host = exec->get_host();
const simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(host);
if (vm != nullptr)
host = vm->get_pm();
xbt_assert(host != nullptr);
host->extension<HostLoad>()->update();
- }
- else { // This runs on multiple hosts
+ } else { // This runs on multiple hosts
XBT_WARN("HostLoad plugin currently does not support executions on several hosts");
}
});
}
}
-static void on_exec_completion(simgrid::s4u::Exec const& e)
+static void on_exec_completion(simgrid::s4u::Activity& e)
{
- auto exec = static_cast<simgrid::kernel::activity::ExecImpl*>(e.get_impl());
+ auto exec = dynamic_cast<simgrid::kernel::activity::ExecImpl*>(e.get_impl());
+ if (exec == nullptr)
+ return;
const simgrid::s4u::VirtualMachine* vm = dynamic_cast<simgrid::s4u::VirtualMachine*>(exec->get_host());
if (vm == nullptr)
return;
simgrid::kernel::resource::VirtualMachineImpl::extension_create<DirtyPageTrackingExt>();
simgrid::s4u::VirtualMachine::on_creation.connect(&on_virtual_machine_creation);
simgrid::s4u::Exec::on_start.connect(&on_exec_creation);
- simgrid::s4u::Exec::on_completion.connect(&on_exec_completion);
+ simgrid::s4u::Activity::on_completion.connect(&on_exec_completion);
}
}
namespace s4u {
xbt::signal<void(Activity&)> Activity::on_veto;
+xbt::signal<void(Activity&)> Activity::on_completion;
std::set<Activity*>* Activity::vetoed_activities_ = nullptr;
xbt::signal<void(Comm const&)> Comm::on_recv;
xbt::signal<void(Comm const&)> Comm::on_completion;
-void Comm::complete(Activity::State state)
-{
- Activity::complete(state);
- on_completion(*this);
-}
-
Comm::~Comm()
{
if (state_ == State::STARTED && not detached_ &&
CommPtr Comm::sendto_init(Host* from, Host* to)
{
CommPtr res(new Comm());
+ res->sender_ = kernel::actor::ActorImpl::self();
res->from_ = from;
res->to_ = to;
if (suspended_)
pimpl_->suspend();
- if (not detached_)
- static_cast<kernel::activity::CommImpl*>(pimpl_.get())->set_iface(this);
+ if (not detached_) {
+ pimpl_->set_iface(this);
+ pimpl_->set_actor(sender_);
+ }
+
state_ = State::STARTED;
return this;
}
namespace simgrid {
namespace s4u {
xbt::signal<void(Exec const&)> Exec::on_start;
-xbt::signal<void(Exec const&)> Exec::on_completion;
Exec::Exec(kernel::activity::ExecImplPtr pimpl)
{
pimpl_ = pimpl;
}
-void Exec::complete(Activity::State state)
-{
- Activity::complete(state);
- on_completion(*this);
-}
void Exec::reset()
{
boost::static_pointer_cast<kernel::activity::ExecImpl>(pimpl_)->reset();
}
});
pimpl->set_cb_id(cb_id);
- return ExecPtr(pimpl->get_iface());
+ return ExecPtr(static_cast<Exec*>(pimpl->get_iface()));
}
Exec* Exec::start()
namespace simgrid {
namespace s4u {
xbt::signal<void(Io const&)> Io::on_start;
-xbt::signal<void(Io const&)> Io::on_completion;
Io::Io(kernel::activity::IoImplPtr pimpl)
{
pimpl_ = pimpl;
}
-void Io::complete(Activity::State state)
-{
- Activity::complete(state);
- on_completion(*this);
-}
-
IoPtr Io::init()
{
auto pimpl = kernel::activity::IoImplPtr(new kernel::activity::IoImpl());
- return IoPtr(pimpl->get_iface());
+ return IoPtr(static_cast<Io*>(pimpl->get_iface()));
}
Io* Io::start()
xbt_assert(argc > 1, "Usage: %s platform_file\n\nExample: %s two_clusters.xml", argv[0], argv[0]);
e.load_platform(argv[1]);
- simgrid::s4u::Exec::on_completion.connect([](simgrid::s4u::Exec const& exec) {
- XBT_INFO("Exec '%s' start time: %f, finish time: %f", exec.get_cname(), exec.get_start_time(),
- exec.get_finish_time());
+ simgrid::s4u::Activity::on_completion.connect([](simgrid::s4u::Activity& activity) {
+ auto* exec = dynamic_cast<simgrid::s4u::Exec*>(&activity);
+ if (exec == nullptr) // Only Execs are concerned here
+ return;
+ XBT_INFO("Exec '%s' start time: %f, finish time: %f", exec->get_cname(), exec->get_start_time(),
+ exec->get_finish_time());
});
/* creation of the activities and their dependencies */