+namespace simgrid {
+namespace msg {
+
+Task::Task(std::string name, double flops_amount, double bytes_amount, void* data)
+ : name_(std::move(name)), userdata_(data), flops_amount(flops_amount), bytes_amount(bytes_amount)
+{
+ static std::atomic_ullong counter{0};
+ id_ = counter++;
+ if (MC_is_active())
+ MC_ignore_heap(&(id_), sizeof(id_));
+}
+
+Task::Task(std::string name, std::vector<s4u::Host*> hosts, std::vector<double> flops_amount,
+ std::vector<double> bytes_amount, void* data)
+ : Task(std::move(name), 1.0, 0, data)
+{
+ parallel_ = true;
+ hosts_ = std::move(hosts);
+ flops_parallel_amount = std::move(flops_amount);
+ bytes_parallel_amount = std::move(bytes_amount);
+}
+
+Task* Task::create(std::string name, double flops_amount, double bytes_amount, void* data)
+{
+ return new Task(std::move(name), flops_amount, bytes_amount, data);
+}
+
+Task* Task::create_parallel(std::string name, int host_nb, const msg_host_t* host_list, double* flops_amount,
+ double* bytes_amount, void* data)
+{
+ std::vector<s4u::Host*> hosts;
+ std::vector<double> flops;
+ std::vector<double> bytes;
+
+ for (int i = 0; i < host_nb; i++) {
+ hosts.push_back(host_list[i]);
+ if (flops_amount != nullptr)
+ flops.push_back(flops_amount[i]);
+ if (bytes_amount != nullptr) {
+ for (int j = 0; j < host_nb; j++)
+ bytes.push_back(bytes_amount[host_nb * i + j]);
+ }
+ }
+ return new Task(std::move(name), std::move(hosts), std::move(flops), std::move(bytes), data);
+}
+
+msg_error_t Task::execute()
+{
+ /* checking for infinite values */
+ xbt_assert(std::isfinite(flops_amount), "flops_amount is not finite!");
+
+ msg_error_t status = MSG_OK;
+
+ set_used();
+ try {
+ s4u::ExecPtr e = s4u::this_actor::exec_init(flops_amount)
+ ->set_priority(1 / priority_)
+ ->set_bound(bound_)
+ ->set_tracing_category(tracing_category_)
+ ->start();
+ compute = boost::static_pointer_cast<kernel::activity::ExecImpl>(e->get_impl());
+
+ e->wait();
+
+ set_not_used();
+ XBT_DEBUG("Execution task '%s' finished", get_cname());
+ } catch (HostFailureException& e) {
+ status = MSG_HOST_FAILURE;
+ } catch (TimeoutError& e) {
+ status = MSG_TIMEOUT;
+ } catch (CancelException& e) {
+ status = MSG_TASK_CANCELED;
+ }
+
+ /* action ended, set comm and compute = nullptr, the actions is already destroyed in the main function */
+ flops_amount = 0.0;
+ comm = nullptr;
+ compute = nullptr;
+
+ return status;
+}
+
+Comm* Task::send_async(std::string alias, void_f_pvoid_t cleanup, bool detached)
+{
+ TRACE_msg_task_put_start(this);
+
+ /* Prepare the task to send */
+ set_used();
+ this->comm = nullptr;
+ msg_global->sent_msg++;
+
+ s4u::CommPtr comm = s4u::Mailbox::by_name(alias)->put_init(this, bytes_amount)->set_rate(get_rate());
+ this->comm = comm;
+
+ if (detached)
+ comm->detach(cleanup);
+ else
+ comm->start();
+
+ if (TRACE_is_enabled() && has_tracing_category())
+ simgrid::simix::simcall([comm, this] { comm->get_impl()->set_category(std::move(tracing_category_)); });
+
+ if (not detached)
+ return new Comm(this, nullptr, comm);
+ else
+ return nullptr;
+}
+
+void Task::cancel()
+{
+ if (compute) {
+ simgrid::simix::simcall([this] { compute->cancel(); });
+ } else if (comm) {
+ comm->cancel();
+ }
+ set_not_used();
+}
+
+void Task::set_priority(double priority)
+{
+ xbt_assert(std::isfinite(1.0 / priority), "priority is not finite!");
+ priority_ = 1.0 / priority;
+}
+
+s4u::Actor* Task::get_sender()
+{
+ return comm ? comm->get_sender().get() : nullptr;
+}
+
+s4u::Host* Task::get_source()
+{
+ return comm ? comm->get_sender()->get_host() : nullptr;
+}
+
+void Task::set_used()
+{
+ if (is_used_)
+ report_multiple_use();
+ is_used_ = true;
+}
+
+void Task::report_multiple_use() const