$ ${bindir:=.}/s4u-task-parallelism ${platfdir}/three_multicore_hosts.xml
> [0.000000] [task_parallelism/INFO] Task exec_A start
-> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
> [100.000000] [task_parallelism/INFO] Task exec_A start
+> [100.000000] [task_parallelism/INFO] Task exec_A finished (1)
> [200.000000] [task_parallelism/INFO] Task exec_A finished (2)
> [300.000000] [task_parallelism/INFO] Task exec_A start
> [300.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
> [400.000000] [task_parallelism/INFO] Task exec_A start
-> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
> [400.000000] [task_parallelism/INFO] Task exec_A start
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (3)
+> [400.000000] [task_parallelism/INFO] Task exec_A finished (4)
> [500.000000] [task_parallelism/INFO] Task exec_A finished (5)
> [500.000000] [task_parallelism/INFO] Task exec_A finished (6)
> [600.000000] [task_parallelism/INFO] Task exec_A start
-> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
> [700.000000] [task_parallelism/INFO] Task exec_A start
+> [700.000000] [task_parallelism/INFO] Task exec_A finished (7)
> [800.000000] [task_parallelism/INFO] Task exec_A finished (8)
> [900.000000] [task_parallelism/INFO] Task exec_A start
> [900.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
> [1000.000000] [task_parallelism/INFO] Task exec_A start
-> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
> [1000.000000] [task_parallelism/INFO] Task exec_A start
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (9)
+> [1000.000000] [task_parallelism/INFO] Task exec_A finished (10)
+> [1100.000000] [task_parallelism/INFO] Task exec_A start
> [1100.000000] [task_parallelism/INFO] Task exec_A finished (11)
> [1100.000000] [task_parallelism/INFO] Task exec_A finished (12)
-> [1100.000000] [task_parallelism/INFO] Task exec_A start
-> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
> [1200.000000] [task_parallelism/INFO] Task exec_A start
+> [1200.000000] [task_parallelism/INFO] Task exec_A finished (13)
> [1250.000000] [task_parallelism/INFO] Task exec_A start
> [1250.000000] [task_parallelism/INFO] Task exec_A start
-> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
> [1300.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1300.000000] [task_parallelism/INFO] Task exec_A finished (14)
> [1350.000000] [task_parallelism/INFO] Task exec_A start
-> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
> [1350.000000] [task_parallelism/INFO] Task exec_A start
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (15)
+> [1350.000000] [task_parallelism/INFO] Task exec_A finished (16)
> [1400.000000] [task_parallelism/INFO] Task exec_A finished (17)
> [1450.000000] [task_parallelism/INFO] Task exec_A finished (18)
> [1450.000000] [task_parallelism/INFO] Task exec_A finished (19)
\ No newline at end of file
Alternatively we: remove/add the link between SA and SA_to_B2
add/remove the link between SA and SA_to_B1
*/
- SA->on_this_start_cb([SA_to_B1, SA_to_B2](sg4::Task* t) {
+ SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) {
int count = t->get_count();
sg4::CommTaskPtr comm;
- if (count % 2 == 0) {
+ if (count % 2 == 1) {
t->remove_successor(SA_to_B2);
t->add_successor(SA_to_B1);
comm = SA_to_B1;
t->add_successor(SA_to_B2);
comm = SA_to_B2;
}
- std::vector<double> amount = {1e3, 1e6, 1e9};
+ std::vector<double> amount = {1e9, 1e3, 1e6};
+ // XBT_INFO("Comm %f", amount[count % 3]);
comm->set_amount(amount[count % 3]);
auto token = std::make_shared<sg4::Token>();
token->set_data(new double(amount[count % 3]));
});
// The token sent by SA is forwarded by both communication tasks
- SA_to_B1->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
- SA_to_B2->on_this_start_cb([&SA](sg4::Task* t) { t->set_token(t->get_next_token_from(SA)); });
+ SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) {
+ t->set_token(t->get_token_from(SA));
+ t->deque_token_from(SA);
+ });
+ SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) {
+ t->set_token(t->get_token_from(SA));
+ t->deque_token_from(SA);
+ });
/* B1 and B2 read the value of the token received by their predecessors
and use it to adapt their amount of work to do.
*/
- B1->on_this_start_cb([SA_to_B1](sg4::Task* t) {
- auto data = t->get_next_token_from(SA_to_B1)->get_unique_data<double>();
+ B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) {
+ auto data = t->get_token_from(SA_to_B1)->get_data<double>();
+ t->deque_token_from(SA_to_B1);
t->set_amount(*data * 10);
});
- B2->on_this_start_cb([SA_to_B2](sg4::Task* t) {
- auto data = t->get_next_token_from(SA_to_B2)->get_unique_data<double>();
+ B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) {
+ auto data = t->get_token_from(SA_to_B2)->get_data<double>();
+ t->deque_token_from(SA_to_B2);
t->set_amount(*data * 10);
});
> [1.798442] [task_storm/INFO] Task SB_to_B3 finished (5)
> [2.619232] [task_storm/INFO] Task B3 finished (1)
> [6.743624] [task_storm/INFO] Task B3 finished (2)
-> [10.868015] [task_storm/INFO] Task B3 finished (3)
> [10.868015] [task_storm/INFO] Task B4 finished (1)
+> [10.868015] [task_storm/INFO] Task B3 finished (3)
> [14.992407] [task_storm/INFO] Task B3 finished (4)
-> [19.116799] [task_storm/INFO] Task B3 finished (5)
> [19.116799] [task_storm/INFO] Task B4 finished (2)
+> [19.116799] [task_storm/INFO] Task B3 finished (5)
> [23.241190] [task_storm/INFO] Task B4 finished (3)
> [27.365582] [task_storm/INFO] Task B4 finished (4)
> [31.489974] [task_storm/INFO] Task B4 finished (5)
// successors to comm0
comm0->on_this_start_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) {
static int count = 0;
- if (count % 2 == 0) {
+ if (count % 2 == 0)
comm0->set_destination(jupiter);
+ else
+ comm0->set_destination(fafard);
+ count++;
+ });
+
+ comm0->on_this_completion_cb([&comm0, exec1, exec2, jupiter, fafard](const sg4::Task*) {
+ static int count = 0;
+ if (count % 2 == 0) {
comm0->add_successor(exec1);
comm0->remove_successor(exec2);
} else {
- comm0->set_destination(fafard);
comm0->add_successor(exec2);
comm0->remove_successor(exec1);
}
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
if __name__ == '__main__':
args = parse()
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
if __name__ == '__main__':
args = parse()
> [11.714617112501687] CommTask(comm) finished (1)
> [20.388399000968448] ExecTask(exec1) finished (2)
> [21.90881661298591] CommTask(comm) finished (2)
-> [24.82146412938331] ExecTask(exec2) finished (1)
-> [37.92831114626493] ExecTask(exec2) finished (2)
+> [24.821464129383305] ExecTask(exec2) finished (1)
+> [37.928311146264925] ExecTask(exec2) finished (2)
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
-def switch(t, hosts, execs):
- comm0.destination = hosts[t.count % 2]
- comm0.remove_successor(execs[t.count % 2 - 1])
- comm0.add_successor(execs[t.count % 2])
+def switch_destination(t, hosts):
+ t.destination = hosts[switch_destination.count % 2]
+ switch_destination.count += 1
+switch_destination.count = 0
+
+def switch_successor(t, execs):
+ t.remove_successor(execs[t.get_count() % 2])
+ t.add_successor(execs[t.get_count() % 2 - 1])
if __name__ == '__main__':
args = parse()
exec1.add_successor(comm1)
exec2.add_successor(comm2)
- # Add a function to be called when tasks end for log purpose
+ # Add a callback when tasks end for log purpose
Task.on_completion_cb(callback)
- # Add a function to be called before each firing of comm0
- # This function modifies the graph of tasks by adding or removing
- # successors to comm0
- comm0.on_this_start_cb(lambda t: switch(t, [jupiter, fafard], [exec1,exec2]))
+ # Add a callback before each firing of comm0
+ # It switches the destination of comm0
+ comm0.on_this_start_cb(lambda t: switch_destination(t, [jupiter, fafard]))
+
+ # Add a callback before comm0 send tokens to successors
+ # It switches the successor of comm0
+ comm0.on_this_completion_cb(lambda t: switch_successor(t, [exec1,exec2]))
# Enqueue two firings for task exec1
comm0.enqueue_firings(4)
return parser.parse_args()
def callback(t):
- print(f'[{Engine.clock}] {t} finished ({t.count})')
+ print(f'[{Engine.clock}] {t} finished ({t.get_count()})')
def variable_load(t):
print('--- Small load ---')
void receive(Task* source);
std::shared_ptr<Token> token_ = nullptr;
- std::deque<std::map<TaskPtr, std::shared_ptr<Token>>> tokens_received_;
+ std::map<TaskPtr, std::deque<std::shared_ptr<Token>>> tokens_received_;
std::map<std::string, std::deque<ActivityPtr>> current_activities_ = {
{"instance_0", {}}, {"dispatcher", {}}, {"collector", {}}};
void set_load_balancing_function(std::function<std::string()> func);
void set_token(std::shared_ptr<Token> token);
- std::shared_ptr<Token> get_next_token_from(TaskPtr t) const { return tokens_received_.front().at(t); }
+ std::shared_ptr<Token> get_token_from(TaskPtr t) const { return tokens_received_.at(t).front(); }
+ std::deque<std::shared_ptr<Token>> get_tokens_from(TaskPtr t) const { return tokens_received_.at(t); }
+ void deque_token_from(TaskPtr t);
void add_successor(TaskPtr t);
void remove_successor(TaskPtr t);
},
"Add a callback called when each task ends.")
.def_property_readonly("name", &Task::get_name, "The name of this task (read-only).")
- .def_property_readonly("count", &Task::get_count, "The execution count of this task (read-only).")
.def_property_readonly("successors", &Task::get_successors, "The successors of this task (read-only).")
.def_property("amount", &Task::get_amount, &Task::set_amount, "The amount of work to do for this task.")
+ .def(
+ "get_count", [](const TaskPtr t) { return t->get_count("instance_0"); },
+ "The execution count of this task instance_0.")
+ .def(
+ "get_count", [](const TaskPtr t, const std::string& instance) { return t->get_count(instance); },
+ "The execution count of this task instance.")
.def("enqueue_firings", py::overload_cast<int>(&Task::enqueue_firings), py::call_guard<py::gil_scoped_release>(),
py::arg("n"), "Enqueue firings for this task.")
.def("add_successor", py::overload_cast<TaskPtr>(&Task::add_successor), py::call_guard<py::gil_scoped_release>(),
XBT_DEBUG("Task %s received a token from %s", name_.c_str(), source->name_.c_str());
auto source_count = predecessors_[source];
predecessors_[source]++;
- if (tokens_received_.size() <= queued_firings_["dispatcher"] + source_count)
- tokens_received_.emplace_back();
- tokens_received_[queued_firings_["dispatcher"] + source_count][source] = source->token_;
- bool enough_tokens = true;
+ tokens_received_[source].push_back(source->token_);
+ bool enough_tokens = true;
for (auto const& [key, val] : predecessors_)
if (val < 1) {
enough_tokens = false;
running_instances_[instance] = running_instances_[instance] - 1;
count_[instance] = count_[instance] + 1;
if (instance == "collector") {
+ // XBT_INFO("Trigger on completion: %s - %s", get_cname(), instance.c_str());
on_this_completion(this);
on_completion(this);
for (auto const& t : successors_)
/** @param token The token to set.
* @brief Set the token to send to successors.
- * @note The token is passed to each successor after the task end, i.e., after the on_end callback.
+ * @note The token is passed to each successor after the task end, i.e., after the on_completion callback.
*/
void Task::set_token(std::shared_ptr<Token> token)
{
simgrid::kernel::actor::simcall_answered([this, token] { token_ = token; });
}
+void Task::deque_token_from(TaskPtr t)
+{
+ simgrid::kernel::actor::simcall_answered([this, &t] { tokens_received_.at(t).pop_front(); });
+}
+
void Task::fire(std::string instance)
{
if ((int)current_activities_[instance].size() > parallelism_degree_[instance]) {
current_activities_[instance].pop_front();
}
- if (instance == "dispatcher") {
+ if (instance != "dispatcher" and instance != "collector") {
on_this_start(this);
on_start(this);
}
running_instances_[instance]++;
queued_firings_[instance] = std::max(queued_firings_[instance] - 1, 0);
- if (not tokens_received_.empty())
- tokens_received_.pop_front();
}
/** @param successor The Task to add.