XBT_PUBLIC void intrusive_ptr_add_ref(Task* o);
class ExecTask;
using ExecTaskPtr = boost::intrusive_ptr<ExecTask>;
-XBT_PUBLIC void intrusive_ptr_release(ExecTask* e);
-XBT_PUBLIC void intrusive_ptr_add_ref(ExecTask* e);
class CommTask;
using CommTaskPtr = boost::intrusive_ptr<CommTask>;
-XBT_PUBLIC void intrusive_ptr_release(CommTask* c);
-XBT_PUBLIC void intrusive_ptr_add_ref(CommTask* c);
class IoTask;
using IoTaskPtr = boost::intrusive_ptr<IoTask>;
-XBT_PUBLIC void intrusive_ptr_release(IoTask* i);
-XBT_PUBLIC void intrusive_ptr_add_ref(IoTask* i);
struct ExtendedAttributeActivity {
static simgrid::xbt::Extension<simgrid::s4u::Activity, ExtendedAttributeActivity> EXTENSION_ID;
};
class Task {
- static bool inited_;
std::set<Task*> successors_ = {};
std::map<Task*, unsigned int> predecessors_ = {};
- void add_predecessor(Task* predecessor);
- void remove_predecessor(Task* predecessor);
bool ready_to_run() const;
void receive(Task* source);
void complete();
int count_ = 0;
bool working_ = false;
s4u::ActivityPtr current_activity_;
- std::vector<std::function<void(Task*)>> end_func_handlers_;
- std::vector<std::function<void(Task*)>> start_func_handlers_;
+ xbt::signal<void(Task*)> on_this_start_;
+ xbt::signal<void(Task*)> on_this_end_;
explicit Task(const std::string& name);
virtual ~Task() = default;
virtual void fire() = 0;
void remove_successor(TaskPtr t);
void remove_all_successors();
const std::set<Task*>& get_successors() const { return successors_; }
- void on_this_start(const std::function<void(Task*)>& func);
- void on_this_end(const std::function<void(Task*)>& func);
+ void on_this_start_cb(const std::function<void(Task*)>& func);
+ void on_this_end_cb(const std::function<void(Task*)>& func);
int get_count() const;
/** Add a callback fired before a task activity start.
s4u::Host* get_host() const { return host_; }
ExecTaskPtr set_flops(double flops);
double get_flops() const { return get_amount(); }
- friend void inline intrusive_ptr_release(ExecTask* e) { intrusive_ptr_release(static_cast<Task*>(e)); }
- friend void inline intrusive_ptr_add_ref(ExecTask* e) { intrusive_ptr_add_ref(static_cast<Task*>(e)); }
};
class CommTask : public Task {
s4u::Host* get_destination() const { return destination_; }
CommTaskPtr set_bytes(double bytes);
double get_bytes() const { return get_amount(); }
- friend void inline intrusive_ptr_release(CommTask* c) { intrusive_ptr_release(static_cast<Task*>(c)); }
- friend void inline intrusive_ptr_add_ref(CommTask* c) { intrusive_ptr_add_ref(static_cast<Task*>(c)); }
};
class IoTask : public Task {
double get_bytes() { return get_amount(); }
IoTaskPtr set_op_type(s4u::Io::OpType type);
s4u::Io::OpType get_op_type() { return type_; }
-
- friend void inline intrusive_ptr_release(IoTask* i) { intrusive_ptr_release(static_cast<Task*>(i)); }
- friend void inline intrusive_ptr_add_ref(IoTask* i) { intrusive_ptr_add_ref(static_cast<Task*>(i)); }
};
} // namespace simgrid::plugins
#endif
py::call_guard<py::gil_scoped_release>(), py::arg("op"), "Remove a successor of this task.")
.def("remove_all_successors", &Task::remove_all_successors, py::call_guard<py::gil_scoped_release>(),
"Remove all successors of this task.")
- .def("on_this_start", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start), py::arg("func"),
- "Add a callback called when this task starts.")
- .def("on_this_end", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end), py::arg("func"),
- "Add a callback called when this task ends.")
+ .def("on_this_start_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_start_cb),
+ py::arg("func"), "Add a callback called when this task starts.")
+ .def("on_this_end_cb", py::overload_cast<const std::function<void(Task*)>&>(&Task::on_this_end_cb),
+ py::arg("func"), "Add a callback called when this task ends.")
.def(
"__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; },
"Textual representation of the Task");
namespace simgrid::plugins {
+xbt::Extension<s4u::Activity, ExtendedAttributeActivity> ExtendedAttributeActivity::EXTENSION_ID;
+
xbt::signal<void(Task*)> Task::on_start;
xbt::signal<void(Task*)> Task::on_end;
Task::Task(const std::string& name) : name_(name) {}
-/**
- * @param predecessor The Task to add.
- * @brief Add a predecessor to this Task.
- */
-void Task::add_predecessor(Task* predecessor)
-{
- if (predecessors_.find(predecessor) == predecessors_.end())
- simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_[predecessor] = 0; });
-}
-
-/**
- * @param predecessor The Task to remove.
- * @brief Remove a predecessor from this Task.
- */
-void Task::remove_predecessor(Task* predecessor)
-{
- simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_.erase(predecessor); });
-}
-
/**
* @brief Return True if the Task can start a new Activity.
* @note The Task is ready if not already doing something and there is at least one execution waiting in queue.
void Task::receive(Task* source)
{
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
- auto it = predecessors_.find(source);
- simgrid::kernel::actor::simcall_answered([this, it] {
- it->second++;
- bool enough_tokens = true;
- for (auto const& [key, val] : predecessors_)
- if (val < 1) {
- enough_tokens = false;
- break;
- }
- if (enough_tokens) {
- for (auto& [key, val] : predecessors_)
- val--;
- enqueue_execs(1);
+ predecessors_[source]++;
+ bool enough_tokens = true;
+ for (auto const& [key, val] : predecessors_)
+ if (val < 1) {
+ enough_tokens = false;
+ break;
}
- });
+ if (enough_tokens) {
+ for (auto& [key, val] : predecessors_)
+ val--;
+ enqueue_execs(1);
+ }
}
/**
*/
void Task::complete()
{
- simgrid::kernel::actor::simcall_answered([this] {
- working_ = false;
- count_++;
- });
- for (auto const& end_func : end_func_handlers_)
- end_func(this);
+ xbt_assert(s4u::Actor::is_maestro());
+ working_ = false;
+ count_++;
+ on_this_end_(this);
Task::on_end(this);
for (auto const& t : successors_)
t->receive(this);
*/
void Task::init()
{
- if (Task::inited_)
+ static bool inited = false;
+ if (inited)
return;
- Task::inited_ = true;
+
+ inited = true;
ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create<ExtendedAttributeActivity>();
simgrid::s4u::Exec::on_completion_cb(
[](simgrid::s4u::Exec const& exec) { exec.extension<ExtendedAttributeActivity>()->task_->complete(); });
*/
void Task::add_successor(TaskPtr successor)
{
- simgrid::kernel::actor::simcall_answered([this, successor] { successors_.insert(successor.get()); });
- successor->add_predecessor(this);
+ simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+ successors_.insert(successor_p);
+ successor_p->predecessors_.try_emplace(this, 0);
+ });
}
/** @ingroup plugin_task
*/
void Task::remove_successor(TaskPtr successor)
{
- simgrid::kernel::actor::simcall_answered([this, successor] { successors_.erase(successor.get()); });
- successor->remove_predecessor(this);
+ simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] {
+ successor_p->predecessors_.erase(this);
+ successors_.erase(successor_p);
+ });
}
void Task::remove_all_successors()
* @brief Set a function to be called before each execution.
* @note The function is called before the underlying Activity starts.
*/
-void Task::on_this_start(const std::function<void(Task*)>& func)
+void Task::on_this_start_cb(const std::function<void(Task*)>& func)
{
- simgrid::kernel::actor::simcall_answered([this, &func] { start_func_handlers_.push_back(func); });
+ simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); });
}
/** @ingroup plugin_task
* @brief Set a function to be called after each execution.
* @note The function is called after the underlying Activity ends, but before sending tokens to successors.
*/
-void Task::on_this_end(const std::function<void(Task*)>& func)
+void Task::on_this_end_cb(const std::function<void(Task*)>& func)
{
- simgrid::kernel::actor::simcall_answered([this, &func] { end_func_handlers_.push_back(func); });
+ simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); });
}
/** @ingroup plugin_task
*/
void ExecTask::fire()
{
- for (auto const& start_func : start_func_handlers_)
- start_func(this);
+ on_this_start_(this);
Task::on_start(this);
- kernel::actor::simcall_answered([this] {
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- });
+ working_ = true;
+ queued_execs_ = std::max(queued_execs_ - 1, 0);
s4u::ExecPtr exec = s4u::Exec::init();
exec->set_name(name_);
exec->set_flops_amount(amount_);
exec->start();
exec->extension_set(new ExtendedAttributeActivity());
exec->extension<ExtendedAttributeActivity>()->task_ = this;
- kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; });
+ current_activity_ = exec;
}
/** @ingroup plugin_task
*/
void CommTask::fire()
{
- for (auto const& start_func : start_func_handlers_)
- start_func(this);
+ on_this_start_(this);
Task::on_start(this);
- kernel::actor::simcall_answered([this] {
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- });
+ working_ = true;
+ queued_execs_ = std::max(queued_execs_ - 1, 0);
s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_);
comm->set_name(name_);
comm->set_payload_size(amount_);
comm->start();
comm->extension_set(new ExtendedAttributeActivity());
comm->extension<ExtendedAttributeActivity>()->task_ = this;
- kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; });
+ current_activity_ = comm;
}
/** @ingroup plugin_task
void IoTask::fire()
{
- for (auto const& start_func : start_func_handlers_)
- start_func(this);
+ on_this_start_(this);
Task::on_start(this);
- kernel::actor::simcall_answered([this] {
- working_ = true;
- queued_execs_ = std::max(queued_execs_ - 1, 0);
- });
+ working_ = true;
+ queued_execs_ = std::max(queued_execs_ - 1, 0);
s4u::IoPtr io = s4u::Io::init();
io->set_name(name_);
io->set_size(amount_);
io->start();
io->extension_set(new ExtendedAttributeActivity());
io->extension<ExtendedAttributeActivity>()->task_ = this;
- kernel::actor::simcall_answered([this, io] { current_activity_ = io; });
+ current_activity_ = io;
}
} // namespace simgrid::plugins
-
-simgrid::xbt::Extension<simgrid::s4u::Activity, simgrid::plugins::ExtendedAttributeActivity>
- simgrid::plugins::ExtendedAttributeActivity::EXTENSION_ID;
-bool simgrid::plugins::Task::inited_ = false;