From 794197ca26e1237d004cb9f3b3fc9f2f754f4ea1 Mon Sep 17 00:00:00 2001 From: Frederic Suter Date: Fri, 1 Mar 2019 10:03:36 +0100 Subject: [PATCH] mess with MSG tasks --- include/simgrid/s4u/Comm.hpp | 2 + src/msg/msg_gos.cpp | 151 ++++++------------------- src/msg/msg_private.hpp | 63 +++++++---- src/msg/msg_task.cpp | 213 ++++++++++++++++++++++++++++------- src/s4u/s4u_Comm.cpp | 6 + 5 files changed, 256 insertions(+), 179 deletions(-) diff --git a/include/simgrid/s4u/Comm.hpp b/include/simgrid/s4u/Comm.hpp index 91f2ab26e8..d858df952b 100644 --- a/include/simgrid/s4u/Comm.hpp +++ b/include/simgrid/s4u/Comm.hpp @@ -116,6 +116,8 @@ public: /** Retrieve the size of the received data. Not to be mixed with @ref Activity::set_remaining() */ size_t get_dst_data_size(); + s4u::ActorPtr get_sender(); + #ifndef DOXYGEN XBT_ATTRIB_DEPRECATED_v324("Please use Comm::wait_for()") void wait(double t) override { wait_for(t); } XBT_ATTRIB_DEPRECATED_v323("Please use Comm::set_rate()") Activity* setRate(double rate) diff --git a/src/msg/msg_gos.cpp b/src/msg/msg_gos.cpp index a965bea065..2b8836bfb8 100644 --- a/src/msg/msg_gos.cpp +++ b/src/msg/msg_gos.cpp @@ -3,37 +3,23 @@ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ -#include "simgrid/Exception.hpp" #include +#include "simgrid/Exception.hpp" #include "simgrid/s4u/Comm.hpp" #include "simgrid/s4u/Mailbox.hpp" #include "src/instr/instr_private.hpp" #include "src/kernel/activity/ExecImpl.hpp" #include "src/msg/msg_private.hpp" -#include "src/simix/smx_private.hpp" /* MSG_task_listen looks inside the rdv directly. Not clean. */ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)"); -/** - * @brief Executes a task and waits for its termination. - * - * This function is used for describing the behavior of a process. It takes only one parameter. - * @param task a #msg_task_t to execute on the location on which the process is running. - * @return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED or #MSG_HOST_FAILURE otherwise - */ -msg_error_t MSG_task_execute(msg_task_t task) -{ - return MSG_parallel_task_execute(task); -} - /** * @brief Executes a parallel task and waits for its termination. * * @param task a #msg_task_t to execute on the location on which the process is running. * - * @return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED - * or #MSG_HOST_FAILURE otherwise + * @return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED or #MSG_HOST_FAILURE otherwise */ msg_error_t MSG_parallel_task_execute(msg_task_t task) { @@ -45,11 +31,11 @@ msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeo e_smx_state_t comp_state; msg_error_t status = MSG_OK; - xbt_assert((not task->compute) && not task->is_used, "This task is executed somewhere else. Go fix your code!"); + xbt_assert((not task->compute) && not task->is_used(), "This task is executed somewhere else. Go fix your code!"); XBT_DEBUG("Computing on %s", MSG_process_get_name(MSG_process_self())); - if (task->flops_amount <= 0.0 && not task->host_nb) { + if (task->flops_amount <= 0.0 && not task->hosts_.empty()) { return MSG_OK; } @@ -59,25 +45,13 @@ msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeo try { task->set_used(); - if (task->host_nb > 0) { - task->compute = boost::static_pointer_cast( - simcall_execution_parallel_start(std::move(task->get_name()), task->host_nb, task->host_list, - task->flops_parallel_amount, task->bytes_parallel_amount, -1.0, timeout)); - XBT_DEBUG("Parallel execution action created: %p", task->compute.get()); - if (task->has_tracing_category()) - simgrid::simix::simcall([task] { task->compute->set_category(std::move(task->get_tracing_category())); }); - } else { - sg_host_t host = MSG_process_get_host(MSG_process_self()); - task->compute = simgrid::simix::simcall([task, host] { - return simgrid::kernel::activity::ExecImplPtr(new simgrid::kernel::activity::ExecImpl( - std::move(task->get_name()), std::move(task->get_tracing_category()), host)); - }); - /* checking for infinite values */ - xbt_assert(std::isfinite(task->flops_amount), "flops_amount is not finite!"); - xbt_assert(std::isfinite(task->priority), "priority is not finite!"); - - task->compute->start(task->flops_amount, task->priority, task->bound); - } + task->compute = boost::static_pointer_cast(simcall_execution_parallel_start( + std::move(task->get_name()), task->hosts_.size(), task->hosts_.data(), + (task->flops_parallel_amount.empty() ? nullptr : task->flops_parallel_amount.data()), + (task->bytes_parallel_amount.empty() ? nullptr : task->bytes_parallel_amount.data()), -1.0, timeout)); + XBT_DEBUG("Parallel execution action created: %p", task->compute.get()); + if (task->has_tracing_category()) + simgrid::simix::simcall([task] { task->compute->set_category(std::move(task->get_tracing_category())); }); comp_state = simcall_execution_wait(task->compute); @@ -128,10 +102,8 @@ msg_error_t MSG_task_receive(msg_task_t * task, const char *alias) * @param alias name of the mailbox to receive the task from * @param rate limit the reception to rate bandwidth (byte/sec) * - * The rate parameter can be used to receive a task with a limited - * bandwidth (smaller than the physical available value). Use - * MSG_task_receive() if you don't limit the rate (or pass -1 as a - * rate value do disable this feature). + * The rate parameter can be used to receive a task with a limited bandwidth (smaller than the physical available + * value). Use MSG_task_receive() if you don't limit the rate (or pass -1 as a rate value do disable this feature). * * @return Returns * #MSG_OK if the task was successfully received, @@ -179,7 +151,7 @@ msg_error_t MSG_task_receive_with_timeout(msg_task_t * task, const char *alias, * #MSG_OK if the task was successfully received, * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise. */ -msg_error_t MSG_task_receive_with_timeout_bounded(msg_task_t * task, const char *alias, double timeout,double rate) +msg_error_t MSG_task_receive_with_timeout_bounded(msg_task_t* task, const char* alias, double timeout, double rate) { return MSG_task_receive_ext_bounded(task, alias, timeout, nullptr, rate); } @@ -215,10 +187,8 @@ msg_error_t MSG_task_receive_ext(msg_task_t * task, const char *alias, double ti * @param host a #msg_host_t host from where the task was sent * @param rate limit the reception to rate bandwidth (byte/sec) * - * The rate parameter can be used to receive a task with a limited - * bandwidth (smaller than the physical available value). Use - * MSG_task_receive_ext() if you don't limit the rate (or pass -1 as a - * rate value do disable this feature). + * The rate parameter can be used to receive a task with a limited bandwidth (smaller than the physical available + * value). Use MSG_task_receive_ext() if you don't limit the rate (or pass -1 as a rate value do disable this feature). * * @return Returns * #MSG_OK if the task was successfully received, @@ -276,17 +246,15 @@ msg_error_t MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, d static inline msg_comm_t MSG_task_isend_internal(msg_task_t task, const char* alias, void_f_pvoid_t cleanup, bool detached) { - msg_process_t myself = MSG_process_self(); - simgrid::s4u::MailboxPtr mailbox = simgrid::s4u::Mailbox::by_name(alias); TRACE_msg_task_put_start(task); /* Prepare the task to send */ - task->sender = myself; task->set_used(); task->comm = nullptr; msg_global->sent_msg++; - simgrid::s4u::CommPtr comm = mailbox->put_init(task, task->bytes_amount)->set_rate(task->rate); + simgrid::s4u::CommPtr comm = + simgrid::s4u::Mailbox::by_name(alias)->put_init(task, task->bytes_amount)->set_rate(task->get_rate()); task->comm = comm; if (detached) comm->detach(cleanup); @@ -331,7 +299,7 @@ msg_comm_t MSG_task_isend(msg_task_t task, const char *alias) */ msg_comm_t MSG_task_isend_bounded(msg_task_t task, const char *alias, double maxrate) { - task->rate = maxrate; + task->set_rate(maxrate); return MSG_task_isend_internal(task, alias, nullptr, false); } @@ -381,7 +349,7 @@ void MSG_task_dsend(msg_task_t task, const char *alias, void_f_pvoid_t cleanup) */ void MSG_task_dsend_bounded(msg_task_t task, const char *alias, void_f_pvoid_t cleanup, double maxrate) { - task->rate = maxrate; + task->set_rate(maxrate); MSG_task_dsend(task, alias, cleanup); } @@ -471,8 +439,7 @@ int MSG_comm_test(msg_comm_t comm) * @brief This function checks if a communication is finished. * @param comms a vector of communications * @return the position of the finished communication if any - * (but it may have failed, use MSG_comm_get_status() to know its status), - * or -1 if none is finished + * (but it may have failed, use MSG_comm_get_status() to know its status), or -1 if none is finished */ int MSG_comm_testany(xbt_dynar_t comms) { @@ -624,8 +591,7 @@ int MSG_comm_waitany(xbt_dynar_t comms) /** * @brief Returns the error (if any) that occurred during a finished communication. * @param comm a finished communication - * @return the status of the communication, or #MSG_OK if no error occurred - * during the communication + * @return the status of the communication, or #MSG_OK if no error occurred during the communication */ msg_error_t MSG_comm_get_status(msg_comm_t comm) { @@ -686,10 +652,8 @@ msg_error_t MSG_task_send(msg_task_t task, const char *alias) * This is a blocking function, the execution flow will be blocked until the task is sent. The maxrate parameter allows * the application to limit the bandwidth utilization of network links when sending the task. * - * The maxrate parameter can be used to send a task with a limited - * bandwidth (smaller than the physical available value). Use - * MSG_task_send() if you don't limit the rate (or pass -1 as a rate - * value do disable this feature). + * The maxrate parameter can be used to send a task with a limited bandwidth (smaller than the physical available + * value). Use MSG_task_send() if you don't limit the rate (or pass -1 as a rate value do disable this feature). * * @param task the task to be sent * @param alias the mailbox name to where the task is sent @@ -700,7 +664,7 @@ msg_error_t MSG_task_send(msg_task_t task, const char *alias) */ msg_error_t MSG_task_send_bounded(msg_task_t task, const char *alias, double maxrate) { - task->rate = maxrate; + task->set_rate(maxrate); return MSG_task_send(task, alias); } @@ -723,7 +687,6 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl TRACE_msg_task_put_start(task); /* Prepare the task to send */ - task->sender = MSG_process_self(); task->set_used(); msg_global->sent_msg++; @@ -731,7 +694,7 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl /* Try to send it */ try { simgrid::s4u::CommPtr comm = - simgrid::s4u::Mailbox::by_name(alias)->put_init(task, task->bytes_amount)->set_rate(task->rate); + simgrid::s4u::Mailbox::by_name(alias)->put_init(task, task->bytes_amount)->set_rate(task->get_rate()); task->comm = comm; comm->start(); if (TRACE_is_enabled() && task->has_tracing_category()) @@ -760,10 +723,9 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl * * This is a blocking function, the execution flow will be blocked until the task is sent or the timeout is achieved. * - * The maxrate parameter can be used to send a task with a limited - * bandwidth (smaller than the physical available value). Use - * MSG_task_send_with_timeout() if you don't limit the rate (or pass -1 as a rate - * value do disable this feature). + * The maxrate parameter can be used to send a task with a limited bandwidth (smaller than the physical available + * value). Use MSG_task_send_with_timeout() if you don't limit the rate (or pass -1 as a rate value do disable this + * feature). * * @param task the task to be sent * @param alias the mailbox name to where the task is sent @@ -775,7 +737,7 @@ msg_error_t MSG_task_send_with_timeout(msg_task_t task, const char *alias, doubl */ msg_error_t MSG_task_send_with_timeout_bounded(msg_task_t task, const char *alias, double timeout, double maxrate) { - task->rate = maxrate; + task->set_rate(maxrate); return MSG_task_send_with_timeout(task, alias, timeout); } @@ -785,56 +747,17 @@ msg_error_t MSG_task_send_with_timeout_bounded(msg_task_t task, const char *alia * @param alias the name of the mailbox to be considered * * @return Returns the PID of sender process, - * -1 if there is no communication in the mailbox. + * -1 if there is no communication in the mailbox.#include + * */ int MSG_task_listen_from(const char *alias) { + /* looks inside the rdv directly. Not clean. */ simgrid::kernel::activity::CommImplPtr comm = simgrid::s4u::Mailbox::by_name(alias)->front(); - return comm ? MSG_process_get_PID(static_cast(comm->src_buff_)->sender) : -1; -} - -/** - * @brief Sets the tracing category of a task. - * - * This function should be called after the creation of a MSG task, to define the category of that task. The - * first parameter task must contain a task that was created with the function #MSG_task_create. The second - * parameter category must contain a category that was previously declared with the function #TRACE_category - * (or with #TRACE_category_with_color). - * - * See @ref outcomes_vizu for details on how to trace the (categorized) resource utilization. - * - * @param task the task that is going to be categorized - * @param category the name of the category to be associated to the task - * - * @see MSG_task_get_category, TRACE_category, TRACE_category_with_color - */ -void MSG_task_set_category (msg_task_t task, const char *category) -{ - xbt_assert(not task->has_tracing_category(), "Task %p(%s) already has a category (%s).", task, task->get_cname(), - task->get_tracing_category().c_str()); - - // if user provides a nullptr category, task is no longer traced - if (category == nullptr) { - task->set_tracing_category(""); - XBT_DEBUG("MSG task %p(%s), category removed", task, task->get_cname()); - } else { - // set task category - task->set_tracing_category(category); - XBT_DEBUG("MSG task %p(%s), category %s", task, task->get_cname(), task->get_tracing_category().c_str()); - } + if (comm && comm->src_actor_) + return comm->src_actor_->get_pid(); + else + return -1; } -/** - * @brief Gets the current tracing category of a task. - * - * @param task the task to be considered - * - * @see MSG_task_set_category - * - * @return Returns the name of the tracing category of the given task, nullptr otherwise - */ -const char *MSG_task_get_category (msg_task_t task) -{ - return task->get_tracing_category().c_str(); -} diff --git a/src/msg/msg_private.hpp b/src/msg/msg_private.hpp index 0b832dedf7..ccacb308e6 100644 --- a/src/msg/msg_private.hpp +++ b/src/msg/msg_private.hpp @@ -6,11 +6,14 @@ #ifndef MSG_PRIVATE_HPP #define MSG_PRIVATE_HPP +#include "simgrid/Exception.hpp" #include "simgrid/msg.h" #include "src/kernel/activity/CommImpl.hpp" #include "src/kernel/activity/ExecImpl.hpp" #include +#include + /**************** datatypes **********************************/ namespace simgrid { namespace msg { @@ -20,21 +23,33 @@ class Task { void* userdata_ = nullptr; long long int id_; + double priority_ = 1.0; + double bound_ = 0.0; /* Capping for CPU resource, or 0 for no capping */ + double rate_ = -1; /* Capping for network resource, or -1 for no capping*/ + bool is_used_ = false; /* Indicates whether the task is used in SIMIX currently */ + + explicit Task(std::string name, double flops_amount, double bytes_amount, void* data); + explicit Task(std::string name, std::vector hosts, std::vector flops_amount, + std::vector bytes_amount, void* data); + + void report_multiple_use() const; + public: - explicit Task(std::string name, double flops_amount, double bytes_amount, void* data) - : name_(std::move(name)), userdata_(data), flops_amount(flops_amount), bytes_amount(bytes_amount) - { - static std::atomic_ullong counter{0}; - id_ = counter++; - if (MC_is_active()) - MC_ignore_heap(&(id_), sizeof(id_)); - } + static Task* create(std::string name, double flops_amount, double bytes_amount, void* data); + static Task* create_parallel(std::string name, int host_nb, const msg_host_t* host_list, double* flops_amount, + double* bytes_amount, void* data); + msg_error_t execute(); + void cancel(); + Task(const Task&) = delete; Task& operator=(const Task&) = delete; - ~Task(); - void set_used(); - void set_not_used() { this->is_used = false; } + ~Task() = default; + bool is_used() { return is_used_; } + bool is_parallel() { return parallel_; } + + void set_used(); + void set_not_used() { this->is_used_ = false; } const std::string& get_name() const { return name_; } const char* get_cname() { return name_.c_str(); } void set_name(const char* new_name) { name_ = std::string(new_name); } @@ -44,27 +59,27 @@ public: void* get_user_data() { return userdata_; } void set_user_data(void* data) { userdata_ = data; } long long int get_id() { return id_; } + double get_priority() { return priority_; } + void set_priority(double priority); + void set_bound(double bound) { bound_ = bound; } + double get_bound() { return bound_; } + void set_rate(double rate) { rate_ = rate; } + double get_rate() { return rate_; } + + s4u::Actor* get_sender(); + s4u::Host* get_source(); kernel::activity::ExecImplPtr compute = nullptr; /* SIMIX modeling of computation */ s4u::CommPtr comm = nullptr; /* S4U modeling of communication */ double flops_amount = 0.0; /* Computation size */ double bytes_amount = 0.0; /* Data size */ - msg_process_t sender = nullptr; - msg_process_t receiver = nullptr; - double priority = 1.0; - double bound = 0.0; /* Capping for CPU resource, or 0 for no capping */ - double rate = -1; /* Capping for network resource, or -1 for no capping*/ - bool is_used = false; /* Indicates whether the task is used in SIMIX currently */ - int host_nb = 0; /* ==0 if sequential task; parallel task if not */ /******* Parallel Tasks Only !!!! *******/ - sg_host_t* host_list = nullptr; - double* flops_parallel_amount = nullptr; - double* bytes_parallel_amount = nullptr; - -private: - void report_multiple_use() const; + bool parallel_ = false; + std::vector hosts_; + std::vector flops_parallel_amount; + std::vector bytes_parallel_amount; }; class Comm { diff --git a/src/msg/msg_task.cpp b/src/msg/msg_task.cpp index fe32c4b8bb..54afd3e29c 100644 --- a/src/msg/msg_task.cpp +++ b/src/msg/msg_task.cpp @@ -5,27 +5,127 @@ #include "msg_private.hpp" #include "src/simix/smx_private.hpp" -#include -#include #include +#include + +#include +#include XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_task, msg, "Logging specific to MSG (task)"); namespace simgrid { namespace msg { -Task::~Task() + +Task::Task(std::string name, double flops_amount, double bytes_amount, void* data) + : name_(std::move(name)), userdata_(data), flops_amount(flops_amount), bytes_amount(bytes_amount) +{ + static std::atomic_ullong counter{0}; + id_ = counter++; + if (MC_is_active()) + MC_ignore_heap(&(id_), sizeof(id_)); +} + +Task::Task(std::string name, std::vector hosts, std::vector flops_amount, + std::vector bytes_amount, void* data) + : Task(std::move(name), 1.0, 0, data) +{ + parallel_ = true; + hosts_ = std::move(hosts); + flops_parallel_amount = std::move(flops_amount); + bytes_parallel_amount = std::move(bytes_amount); +} + +Task* Task::create(std::string name, double flops_amount, double bytes_amount, void* data) +{ + return new Task(std::move(name), flops_amount, bytes_amount, data); +} + +Task* Task::create_parallel(std::string name, int host_nb, const msg_host_t* host_list, double* flops_amount, + double* bytes_amount, void* data) +{ + std::vector hosts; + std::vector flops; + std::vector bytes; + + for (int i = 0; i < host_nb; i++) { + hosts.push_back(host_list[i]); + if (flops_amount != nullptr) + flops.push_back(flops_amount[i]); + if (bytes_amount != nullptr) { + for (int j = 0; j < host_nb; j++) + bytes.push_back(bytes_amount[host_nb * i + j]); + } + } + return new Task(std::move(name), std::move(hosts), std::move(flops), std::move(bytes), data); +} + +msg_error_t Task::execute() +{ + /* checking for infinite values */ + xbt_assert(std::isfinite(flops_amount), "flops_amount is not finite!"); + + msg_error_t status = MSG_OK; + s4u::Host* host = SIMIX_process_self()->get_host(); + + set_used(); + + compute = simix::simcall([this, host] { + return kernel::activity::ExecImplPtr(new kernel::activity::ExecImpl(name_, tracing_category_, host)); + }); + + try { + compute->start(flops_amount, priority_, bound_); + e_smx_state_t comp_state = simcall_execution_wait(compute); + + set_not_used(); + XBT_DEBUG("Execution task '%s' finished in state %d", get_cname(), (int)comp_state); + } catch (HostFailureException& e) { + status = MSG_HOST_FAILURE; + } catch (TimeoutError& e) { + status = MSG_TIMEOUT; + } catch (CancelException& e) { + status = MSG_TASK_CANCELED; + } + + /* action ended, set comm and compute = nullptr, the actions is already destroyed in the main function */ + flops_amount = 0.0; + comm = nullptr; + compute = nullptr; + + return status; +} + +void Task::cancel() +{ + if (compute) { + simgrid::simix::simcall([this] { compute->cancel(); }); + } else if (comm) { + comm->cancel(); + } + set_not_used(); +} + +void Task::set_priority(double priority) +{ + xbt_assert(std::isfinite(1.0 / priority), "priority is not finite!"); + priority_ = 1.0 / priority; +} + +s4u::Actor* Task::get_sender() { - /* parallel tasks only */ - delete[] host_list; - delete[] flops_parallel_amount; - delete[] bytes_parallel_amount; + return comm ? comm->get_sender().get() : nullptr; +} + +s4u::Host* Task::get_source() +{ + return comm ? comm->get_sender()->get_host() : nullptr; } void Task::set_used() { - if (this->is_used) - this->report_multiple_use(); - this->is_used = true; + if (is_used_) + report_multiple_use(); + is_used_ = true; } void Task::report_multiple_use() const @@ -60,7 +160,7 @@ void Task::report_multiple_use() const */ msg_task_t MSG_task_create(const char *name, double flop_amount, double message_size, void *data) { - return new simgrid::msg::Task(name ? name : "", flop_amount, message_size, data); + return simgrid::msg::Task::create(name ? std::string(name) : "", flop_amount, message_size, data); } /** @brief Creates a new parallel task @@ -87,22 +187,7 @@ msg_task_t MSG_parallel_task_create(const char *name, int host_nb, const msg_hos { // Task's flops amount is set to an arbitrary value > 0.0 to be able to distinguish, in // MSG_task_get_remaining_work_ratio(), a finished task and a task that has not started yet. - msg_task_t task = MSG_task_create(name, 1.0, 0, data); - - /* Simulator Data specific to parallel tasks */ - task->host_nb = host_nb; - task->host_list = new sg_host_t[host_nb]; - std::copy_n(host_list, host_nb, task->host_list); - if (flops_amount != nullptr) { - task->flops_parallel_amount = new double[host_nb]; - std::copy_n(flops_amount, host_nb, task->flops_parallel_amount); - } - if (bytes_amount != nullptr) { - task->bytes_parallel_amount = new double[host_nb * host_nb]; - std::copy_n(bytes_amount, host_nb * host_nb, task->bytes_parallel_amount); - } - - return task; + return simgrid::msg::Task::create_parallel(name ? name : "", host_nb, host_list, flops_amount, bytes_amount, data); } /** @brief Return the user data of the given task */ @@ -134,13 +219,13 @@ void MSG_task_set_copy_callback(void (*callback) (msg_task_t task, msg_process_t /** @brief Returns the sender of the given task */ msg_process_t MSG_task_get_sender(msg_task_t task) { - return task->sender; + return task->get_sender(); } /** @brief Returns the source (the sender's host) of the given task */ msg_host_t MSG_task_get_source(msg_task_t task) { - return task->sender->get_host(); + return task->get_source(); } /** @brief Returns the name of the given task. */ @@ -155,6 +240,18 @@ void MSG_task_set_name(msg_task_t task, const char *name) task->set_name(name); } +/** + * @brief Executes a task and waits for its termination. + * + * This function is used for describing the behavior of a process. It takes only one parameter. + * @param task a #msg_task_t to execute on the location on which the process is running. + * @return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED or #MSG_HOST_FAILURE otherwise + */ +msg_error_t MSG_task_execute(msg_task_t task) +{ + return task->execute(); +} + /** @brief Destroys the given task. * * You should free user data, if any, @b before calling this destructor. @@ -167,9 +264,9 @@ void MSG_task_set_name(msg_task_t task, const char *name) */ msg_error_t MSG_task_destroy(msg_task_t task) { - if (task->is_used) { + if (task->is_used()) { /* the task is being sent or executed: cancel it first */ - MSG_task_cancel(task); + task->cancel(); } /* free main structures */ @@ -185,13 +282,7 @@ msg_error_t MSG_task_destroy(msg_task_t task) msg_error_t MSG_task_cancel(msg_task_t task) { xbt_assert((task != nullptr), "Cannot cancel a nullptr task"); - - if (task->compute) { - simgrid::simix::simcall([task] { task->compute->cancel(); }); - } else if (task->comm) { - task->comm->cancel(); - } - task->set_not_used(); + task->cancel(); return MSG_OK; } @@ -275,8 +366,7 @@ double MSG_task_get_bytes_amount(msg_task_t task) */ void MSG_task_set_priority(msg_task_t task, double priority) { - task->priority = 1 / priority; - xbt_assert(std::isfinite(task->priority), "priority is not finite!"); + task->set_priority(priority); } /** @brief Changes the maximum CPU utilization of a computation task (in flops/s). @@ -287,5 +377,46 @@ void MSG_task_set_bound(msg_task_t task, double bound) { if (bound < 1e-12) /* close enough to 0 without any floating precision surprise */ XBT_INFO("bound == 0 means no capping (i.e., unlimited)."); - task->bound = bound; + task->set_bound(bound); +} + +/** + * @brief Sets the tracing category of a task. + * + * This function should be called after the creation of a MSG task, to define the category of that task. The + * first parameter task must contain a task that was =created with the function #MSG_task_create. The second + * parameter category must contain a category that was previously declared with the function #TRACE_category + * (or with #TRACE_category_with_color). + * + * See @ref outcomes_vizu for details on how to trace the (categorized) resource utilization. + * + * @param task the task that is going to be categorized + * @param category the name of the category to be associated to the task + * + * @see MSG_task_get_category, TRACE_category, TRACE_category_with_color + */ +void MSG_task_set_category(msg_task_t task, const char* category) +{ + xbt_assert(not task->has_tracing_category(), "Task %p(%s) already has a category (%s).", task, task->get_cname(), + task->get_tracing_category().c_str()); + + // if user provides a nullptr category, task is no longer traced + if (category == nullptr) { + task->set_tracing_category(""); + XBT_DEBUG("MSG task %p(%s), category removed", task, task->get_cname()); + } else { + // set task category + task->set_tracing_category(category); + XBT_DEBUG("MSG task %p(%s), category %s", task, task->get_cname(), task->get_tracing_category().c_str()); + } +} + +/** + * @brief Gets the current tracing category of a task. (@see MSG_task_set_category) + * @param task the task to be considered + * @return Returns the name of the tracing category of the given task, "" otherwise + */ +const char* MSG_task_get_category(msg_task_t task) +{ + return task->get_tracing_category().c_str(); } diff --git a/src/s4u/s4u_Comm.cpp b/src/s4u/s4u_Comm.cpp index 8f0fc8890b..a841ac44ac 100644 --- a/src/s4u/s4u_Comm.cpp +++ b/src/s4u/s4u_Comm.cpp @@ -4,6 +4,7 @@ * under the terms of the license (GNU LGPL) which comes with this package. */ #include "src/msg/msg_private.hpp" +#include "src/simix/ActorImpl.hpp" #include "xbt/log.h" #include "simgrid/Exception.hpp" @@ -227,6 +228,11 @@ MailboxPtr Comm::get_mailbox() return mailbox_; } +ActorPtr Comm::get_sender() +{ + return sender_ ? sender_->iface() : nullptr; +} + void intrusive_ptr_release(simgrid::s4u::Comm* c) { if (c->refcount_.fetch_sub(1, std::memory_order_release) == 1) { -- 2.20.1