*/
bool Task::ready_to_run() const
{
- return not working_ && queued_firings_ > 0;
+ return running_instances_ < parallelism_degree_ && queued_firings_ > 0;
}
/**
void Task::complete()
{
xbt_assert(Actor::is_maestro());
- working_ = false;
+ running_instances_--;
count_++;
on_this_completion(this);
on_completion(this);
- if (current_activity_)
- previous_activity_ = std::move(current_activity_);
for (auto const& t : successors_)
t->receive(this);
if (ready_to_run())
fire();
}
+/** @param n The new parallelism degree of the Task.
+ * @brief Set the parallelism degree of the Task.
+ * @note When increasing the degree the function starts new instances.
+ * When decreasing the degree the function does NOT stop running instances.
+ */
+void Task::set_parallelism_degree(int n)
+{
+ xbt_assert(n > 0, "Parallelism degree of Tasks must be above 0.");
+ simgrid::kernel::actor::simcall_answered([this, n] {
+ parallelism_degree_ = n;
+ while (ready_to_run())
+ fire();
+ });
+}
+
/** @param n The number of firings to enqueue.
* @brief Enqueue firing.
* @note Immediatly fire an activity if possible.
{
simgrid::kernel::actor::simcall_answered([this, n] {
queued_firings_ += n;
- if (ready_to_run())
+ while (ready_to_run())
fire();
});
}
+/** @param name The new name to set.
+ * @brief Set the name of the Task.
+ */
+void Task::set_name(std::string name)
+{
+ name_ = name;
+}
+
/** @param amount The amount to set.
* @brief Set the amout of work to do.
* @note Amount in flop for ExecTask and in bytes for CommTask.
void Task::fire()
{
+ if ((int)current_activities_.size() > parallelism_degree_) {
+ current_activities_.pop_front();
+ }
on_this_start(this);
on_start(this);
- working_ = true;
+ running_instances_++;
queued_firings_ = std::max(queued_firings_ - 1, 0);
if (not tokens_received_.empty())
tokens_received_.pop_front();
/**
* @brief Do one execution of the Task.
- * @note Call the on_this_start() func. Set its working status as true.
+ * @note Call the on_this_start() func.
* Init and start the underlying Activity.
*/
void ExecTask::fire()
Task::fire();
auto exec = Exec::init()->set_name(get_name())->set_flops_amount(get_amount())->set_host(host_);
exec->start();
- exec->on_this_completion_cb([this](Exec const&) { this->complete(); });
- set_current_activity(exec);
+ exec->on_this_completion_cb([this](Exec const&) { complete(); });
+ store_activity(exec);
}
/** @ingroup plugin_task
/**
* @brief Do one execution of the Task.
- * @note Call the on_this_start() func. Set its working status as true.
+ * @note Call the on_this_start() func.
* Init and start the underlying Activity.
*/
void CommTask::fire()
Task::fire();
auto comm = Comm::sendto_init(source_, destination_)->set_name(get_name())->set_payload_size(get_amount());
comm->start();
- comm->on_this_completion_cb([this](Comm const&) { this->complete(); });
- set_current_activity(comm);
+ comm->on_this_completion_cb([this](Comm const&) { complete(); });
+ store_activity(comm);
}
/** @ingroup plugin_task
Task::fire();
auto io = Io::init()->set_name(get_name())->set_size(get_amount())->set_disk(disk_)->set_op_type(type_);
io->start();
- io->on_this_completion_cb([this](Io const&) { this->complete(); });
- set_current_activity(io);
+ io->on_this_completion_cb([this](Io const&) { complete(); });
+ store_activity(io);
}
} // namespace simgrid::s4u