From: Arnaud Giersch Date: Mon, 5 Jun 2023 20:42:27 +0000 (+0200) Subject: Various cleanups in Task plugin. X-Git-Tag: v3.34~54 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/0c84caf099147dcdc1a7ccfef467998a425b35aa Various cleanups in Task plugin. * reduce number of calls to simcall_answered * use xbt::signal for the "on_this_*" signals * reduce scope for static member "inited" * remove intrusive_ptr_* function for derived classes --- diff --git a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp index 27038a38fa..6ebca129da 100644 --- a/examples/cpp/task-switch-host/s4u-task-switch-host.cpp +++ b/examples/cpp/task-switch-host/s4u-task-switch-host.cpp @@ -54,7 +54,7 @@ int main(int argc, char* argv[]) // Add a function to be called before each executions of comm0 // This function modifies the graph of tasks by adding or removing // successors to comm0 - comm0->on_this_start([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) { + comm0->on_this_start_cb([exec1, exec2, jupiter, fafard](simgrid::plugins::Task* t) { auto* comm0 = dynamic_cast(t); static int count = 0; if (count % 2 == 0) { diff --git a/examples/python/task-switch-host/task-switch-host.py b/examples/python/task-switch-host/task-switch-host.py index 3bd21e66e5..a95db1645f 100644 --- a/examples/python/task-switch-host/task-switch-host.py +++ b/examples/python/task-switch-host/task-switch-host.py @@ -81,7 +81,7 @@ if __name__ == '__main__': # Add a function to be called before each executions of comm0 # This function modifies the graph of tasks by adding or removing # successors to comm0 - comm0.on_this_start(lambda t: switch(t, [jupiter, fafard], [exec1,exec2])) + comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2])) # Enqueue two executions for task exec1 comm0.enqueue_execs(4) diff --git a/include/simgrid/plugins/task.hpp b/include/simgrid/plugins/task.hpp index e9fa1afffd..bf8609d40d 100644 --- a/include/simgrid/plugins/task.hpp +++ b/include/simgrid/plugins/task.hpp @@ -18,16 +18,10 @@ XBT_PUBLIC void intrusive_ptr_release(Task* o); XBT_PUBLIC void intrusive_ptr_add_ref(Task* o); class ExecTask; using ExecTaskPtr = boost::intrusive_ptr; -XBT_PUBLIC void intrusive_ptr_release(ExecTask* e); -XBT_PUBLIC void intrusive_ptr_add_ref(ExecTask* e); class CommTask; using CommTaskPtr = boost::intrusive_ptr; -XBT_PUBLIC void intrusive_ptr_release(CommTask* c); -XBT_PUBLIC void intrusive_ptr_add_ref(CommTask* c); class IoTask; using IoTaskPtr = boost::intrusive_ptr; -XBT_PUBLIC void intrusive_ptr_release(IoTask* i); -XBT_PUBLIC void intrusive_ptr_add_ref(IoTask* i); struct ExtendedAttributeActivity { static simgrid::xbt::Extension EXTENSION_ID; @@ -35,12 +29,9 @@ struct ExtendedAttributeActivity { }; class Task { - static bool inited_; std::set successors_ = {}; std::map predecessors_ = {}; - void add_predecessor(Task* predecessor); - void remove_predecessor(Task* predecessor); bool ready_to_run() const; void receive(Task* source); void complete(); @@ -52,8 +43,8 @@ protected: int count_ = 0; bool working_ = false; s4u::ActivityPtr current_activity_; - std::vector> end_func_handlers_; - std::vector> start_func_handlers_; + xbt::signal on_this_start_; + xbt::signal on_this_end_; explicit Task(const std::string& name); virtual ~Task() = default; virtual void fire() = 0; @@ -73,8 +64,8 @@ public: void remove_successor(TaskPtr t); void remove_all_successors(); const std::set& get_successors() const { return successors_; } - void on_this_start(const std::function& func); - void on_this_end(const std::function& func); + void on_this_start_cb(const std::function& func); + void on_this_end_cb(const std::function& func); int get_count() const; /** Add a callback fired before a task activity start. @@ -110,8 +101,6 @@ public: s4u::Host* get_host() const { return host_; } ExecTaskPtr set_flops(double flops); double get_flops() const { return get_amount(); } - friend void inline intrusive_ptr_release(ExecTask* e) { intrusive_ptr_release(static_cast(e)); } - friend void inline intrusive_ptr_add_ref(ExecTask* e) { intrusive_ptr_add_ref(static_cast(e)); } }; class CommTask : public Task { @@ -130,8 +119,6 @@ public: s4u::Host* get_destination() const { return destination_; } CommTaskPtr set_bytes(double bytes); double get_bytes() const { return get_amount(); } - friend void inline intrusive_ptr_release(CommTask* c) { intrusive_ptr_release(static_cast(c)); } - friend void inline intrusive_ptr_add_ref(CommTask* c) { intrusive_ptr_add_ref(static_cast(c)); } }; class IoTask : public Task { @@ -149,9 +136,6 @@ public: double get_bytes() { return get_amount(); } IoTaskPtr set_op_type(s4u::Io::OpType type); s4u::Io::OpType get_op_type() { return type_; } - - friend void inline intrusive_ptr_release(IoTask* i) { intrusive_ptr_release(static_cast(i)); } - friend void inline intrusive_ptr_add_ref(IoTask* i) { intrusive_ptr_add_ref(static_cast(i)); } }; } // namespace simgrid::plugins #endif diff --git a/src/bindings/python/simgrid_python.cpp b/src/bindings/python/simgrid_python.cpp index d0ce338f9f..5f0f33599c 100644 --- a/src/bindings/python/simgrid_python.cpp +++ b/src/bindings/python/simgrid_python.cpp @@ -956,10 +956,10 @@ PYBIND11_MODULE(simgrid, m) py::call_guard(), py::arg("op"), "Remove a successor of this task.") .def("remove_all_successors", &Task::remove_all_successors, py::call_guard(), "Remove all successors of this task.") - .def("on_this_start", py::overload_cast&>(&Task::on_this_start), py::arg("func"), - "Add a callback called when this task starts.") - .def("on_this_end", py::overload_cast&>(&Task::on_this_end), py::arg("func"), - "Add a callback called when this task ends.") + .def("on_this_start_cb", py::overload_cast&>(&Task::on_this_start_cb), + py::arg("func"), "Add a callback called when this task starts.") + .def("on_this_end_cb", py::overload_cast&>(&Task::on_this_end_cb), + py::arg("func"), "Add a callback called when this task ends.") .def( "__repr__", [](const TaskPtr op) { return "Task(" + op->get_name() + ")"; }, "Textual representation of the Task"); diff --git a/src/plugins/task.cpp b/src/plugins/task.cpp index c464f04276..c84f83dd44 100644 --- a/src/plugins/task.cpp +++ b/src/plugins/task.cpp @@ -27,30 +27,13 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(Task, kernel, "Logging specific to the task plug namespace simgrid::plugins { +xbt::Extension ExtendedAttributeActivity::EXTENSION_ID; + xbt::signal Task::on_start; xbt::signal Task::on_end; Task::Task(const std::string& name) : name_(name) {} -/** - * @param predecessor The Task to add. - * @brief Add a predecessor to this Task. - */ -void Task::add_predecessor(Task* predecessor) -{ - if (predecessors_.find(predecessor) == predecessors_.end()) - simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_[predecessor] = 0; }); -} - -/** - * @param predecessor The Task to remove. - * @brief Remove a predecessor from this Task. - */ -void Task::remove_predecessor(Task* predecessor) -{ - simgrid::kernel::actor::simcall_answered([this, predecessor] { predecessors_.erase(predecessor); }); -} - /** * @brief Return True if the Task can start a new Activity. * @note The Task is ready if not already doing something and there is at least one execution waiting in queue. @@ -69,21 +52,18 @@ bool Task::ready_to_run() const void Task::receive(Task* source) { XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str()); - auto it = predecessors_.find(source); - simgrid::kernel::actor::simcall_answered([this, it] { - it->second++; - bool enough_tokens = true; - for (auto const& [key, val] : predecessors_) - if (val < 1) { - enough_tokens = false; - break; - } - if (enough_tokens) { - for (auto& [key, val] : predecessors_) - val--; - enqueue_execs(1); + predecessors_[source]++; + bool enough_tokens = true; + for (auto const& [key, val] : predecessors_) + if (val < 1) { + enough_tokens = false; + break; } - }); + if (enough_tokens) { + for (auto& [key, val] : predecessors_) + val--; + enqueue_execs(1); + } } /** @@ -97,12 +77,10 @@ void Task::receive(Task* source) */ void Task::complete() { - simgrid::kernel::actor::simcall_answered([this] { - working_ = false; - count_++; - }); - for (auto const& end_func : end_func_handlers_) - end_func(this); + xbt_assert(s4u::Actor::is_maestro()); + working_ = false; + count_++; + on_this_end_(this); Task::on_end(this); for (auto const& t : successors_) t->receive(this); @@ -116,9 +94,11 @@ void Task::complete() */ void Task::init() { - if (Task::inited_) + static bool inited = false; + if (inited) return; - Task::inited_ = true; + + inited = true; ExtendedAttributeActivity::EXTENSION_ID = simgrid::s4u::Activity::extension_create(); simgrid::s4u::Exec::on_completion_cb( [](simgrid::s4u::Exec const& exec) { exec.extension()->task_->complete(); }); @@ -159,8 +139,10 @@ void Task::set_amount(double amount) */ void Task::add_successor(TaskPtr successor) { - simgrid::kernel::actor::simcall_answered([this, successor] { successors_.insert(successor.get()); }); - successor->add_predecessor(this); + simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] { + successors_.insert(successor_p); + successor_p->predecessors_.try_emplace(this, 0); + }); } /** @ingroup plugin_task @@ -170,8 +152,10 @@ void Task::add_successor(TaskPtr successor) */ void Task::remove_successor(TaskPtr successor) { - simgrid::kernel::actor::simcall_answered([this, successor] { successors_.erase(successor.get()); }); - successor->remove_predecessor(this); + simgrid::kernel::actor::simcall_answered([this, successor_p = successor.get()] { + successor_p->predecessors_.erase(this); + successors_.erase(successor_p); + }); } void Task::remove_all_successors() @@ -190,9 +174,9 @@ void Task::remove_all_successors() * @brief Set a function to be called before each execution. * @note The function is called before the underlying Activity starts. */ -void Task::on_this_start(const std::function& func) +void Task::on_this_start_cb(const std::function& func) { - simgrid::kernel::actor::simcall_answered([this, &func] { start_func_handlers_.push_back(func); }); + simgrid::kernel::actor::simcall_answered([this, &func] { on_this_start_.connect(func); }); } /** @ingroup plugin_task @@ -200,9 +184,9 @@ void Task::on_this_start(const std::function& func) * @brief Set a function to be called after each execution. * @note The function is called after the underlying Activity ends, but before sending tokens to successors. */ -void Task::on_this_end(const std::function& func) +void Task::on_this_end_cb(const std::function& func) { - simgrid::kernel::actor::simcall_answered([this, &func] { end_func_handlers_.push_back(func); }); + simgrid::kernel::actor::simcall_answered([this, &func] { on_this_end_.connect(func); }); } /** @ingroup plugin_task @@ -241,13 +225,10 @@ ExecTaskPtr ExecTask::init(const std::string& name, double flops, s4u::Host* hos */ void ExecTask::fire() { - for (auto const& start_func : start_func_handlers_) - start_func(this); + on_this_start_(this); Task::on_start(this); - kernel::actor::simcall_answered([this] { - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - }); + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); s4u::ExecPtr exec = s4u::Exec::init(); exec->set_name(name_); exec->set_flops_amount(amount_); @@ -255,7 +236,7 @@ void ExecTask::fire() exec->start(); exec->extension_set(new ExtendedAttributeActivity()); exec->extension()->task_ = this; - kernel::actor::simcall_answered([this, exec] { current_activity_ = exec; }); + current_activity_ = exec; } /** @ingroup plugin_task @@ -305,20 +286,17 @@ CommTaskPtr CommTask::init(const std::string& name, double bytes, s4u::Host* sou */ void CommTask::fire() { - for (auto const& start_func : start_func_handlers_) - start_func(this); + on_this_start_(this); Task::on_start(this); - kernel::actor::simcall_answered([this] { - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - }); + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); s4u::CommPtr comm = s4u::Comm::sendto_init(source_, destination_); comm->set_name(name_); comm->set_payload_size(amount_); comm->start(); comm->extension_set(new ExtendedAttributeActivity()); comm->extension()->task_ = this; - kernel::actor::simcall_answered([this, comm] { current_activity_ = comm; }); + current_activity_ = comm; } /** @ingroup plugin_task @@ -399,13 +377,10 @@ IoTaskPtr IoTask::set_op_type(s4u::Io::OpType type) void IoTask::fire() { - for (auto const& start_func : start_func_handlers_) - start_func(this); + on_this_start_(this); Task::on_start(this); - kernel::actor::simcall_answered([this] { - working_ = true; - queued_execs_ = std::max(queued_execs_ - 1, 0); - }); + working_ = true; + queued_execs_ = std::max(queued_execs_ - 1, 0); s4u::IoPtr io = s4u::Io::init(); io->set_name(name_); io->set_size(amount_); @@ -414,11 +389,7 @@ void IoTask::fire() io->start(); io->extension_set(new ExtendedAttributeActivity()); io->extension()->task_ = this; - kernel::actor::simcall_answered([this, io] { current_activity_ = io; }); + current_activity_ = io; } } // namespace simgrid::plugins - -simgrid::xbt::Extension - simgrid::plugins::ExtendedAttributeActivity::EXTENSION_ID; -bool simgrid::plugins::Task::inited_ = false;