#include "simgrid/s4u/Exec.hpp"
#include "simgrid/s4u/Mailbox.hpp"
#include "src/instr/instr_private.hpp"
-#include "src/kernel/activity/ExecImpl.hpp"
#include "src/msg/msg_private.hpp"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, "Logging specific to MSG (gos)");
-/**
- * @brief Executes a parallel task and waits for its termination.
- *
- * @param task a #msg_task_t to execute on the location on which the process is running.
- *
- * @return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED or #MSG_HOST_FAILURE otherwise
- */
-msg_error_t MSG_parallel_task_execute(msg_task_t task)
-{
- return MSG_parallel_task_execute_with_timeout(task, -1);
-}
-
-msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout)
-{
- msg_error_t status = MSG_OK;
-
- xbt_assert((not task->compute) && not task->is_used(), "This task is executed somewhere else. Go fix your code!");
-
- XBT_DEBUG("Computing on %s", MSG_process_get_name(MSG_process_self()));
-
- if (TRACE_actor_is_enabled())
- simgrid::instr::Container::by_name(instr_pid(MSG_process_self()))->get_state("ACTOR_STATE")->push_event("execute");
-
- try {
- task->set_used();
- simgrid::s4u::ExecPtr e =
- simgrid::s4u::this_actor::exec_init(task->hosts_, task->flops_parallel_amount, task->bytes_parallel_amount)
- ->set_name(task->get_name())
- ->set_tracing_category(task->get_tracing_category())
- ->set_timeout(timeout)
- ->start();
- task->compute = boost::static_pointer_cast<simgrid::kernel::activity::ExecImpl>(e->get_impl());
-
- XBT_DEBUG("Parallel execution action created: %p", task->compute.get());
-
- e->wait();
-
- task->set_not_used();
-
- XBT_DEBUG("Execution task '%s' finished", task->get_cname());
- } catch (simgrid::HostFailureException& e) {
- status = MSG_HOST_FAILURE;
- } catch (simgrid::TimeoutError& e) {
- status = MSG_TIMEOUT;
- } catch (simgrid::CancelException& e) {
- status = MSG_TASK_CANCELED;
- }
-
- /* action ended, set comm and compute = nullptr, the actions is already destroyed in the main function */
- task->flops_amount = 0.0;
- task->comm = nullptr;
- task->compute = nullptr;
-
- if (TRACE_actor_is_enabled())
- simgrid::instr::Container::by_name(instr_pid(MSG_process_self()))->get_state("ACTOR_STATE")->pop_event();
-
- return status;
-}
-
/**
* @brief Receives a task from a mailbox.
*
void* userdata_ = nullptr;
long long int id_;
+ double timeout_ = 0.0;
double priority_ = 1.0;
double bound_ = 0.0; /* Capping for CPU resource, or 0 for no capping */
double rate_ = -1; /* Capping for network resource, or -1 for no capping*/
double get_bound() { return bound_; }
void set_rate(double rate) { rate_ = rate; }
double get_rate() { return rate_; }
+ void set_timeout(double timeout) { timeout_ = timeout; }
s4u::Actor* get_sender();
s4u::Host* get_source();
- kernel::activity::ExecImplPtr compute = nullptr; /* SIMIX modeling of computation */
- s4u::CommPtr comm = nullptr; /* S4U modeling of communication */
- double flops_amount = 0.0; /* Computation size */
- double bytes_amount = 0.0; /* Data size */
-
+ s4u::ExecPtr compute = nullptr; /* S4U modeling of computation */
+ s4u::CommPtr comm = nullptr; /* S4U modeling of communication */
+ double flops_amount = 0.0; /* Computation size */
+ double bytes_amount = 0.0; /* Data size */
/******* Parallel Tasks Only !!!! *******/
bool parallel_ = false;
if (flops_amount <= 0.0)
return MSG_OK;
- set_used();
try {
- s4u::ExecPtr e = s4u::this_actor::exec_init(flops_amount)
- ->set_priority(1 / priority_)
- ->set_bound(bound_)
- ->set_tracing_category(tracing_category_)
- ->start();
- compute = boost::static_pointer_cast<kernel::activity::ExecImpl>(e->get_impl());
+ set_used();
+ if (parallel_)
+ compute = s4u::this_actor::exec_init(hosts_, flops_parallel_amount, bytes_parallel_amount);
+ else
+ compute = s4u::this_actor::exec_init(flops_amount);
- e->wait();
+ compute->set_name(name_)
+ ->set_tracing_category(tracing_category_)
+ ->set_timeout(timeout_)
+ ->set_priority(1 / priority_)
+ ->set_bound(bound_)
+ ->wait();
set_not_used();
XBT_DEBUG("Execution task '%s' finished", get_cname());
*/
msg_error_t MSG_task_execute(msg_task_t task)
{
- return task->is_parallel() ? MSG_parallel_task_execute(task) : task->execute();
+ return task->execute();
+}
+
+/**
+ * @brief Executes a parallel task and waits for its termination.
+ *
+ * @param task a #msg_task_t to execute on the location on which the process is running.
+ *
+ * @return #MSG_OK if the task was successfully completed, #MSG_TASK_CANCELED or #MSG_HOST_FAILURE otherwise
+ */
+msg_error_t MSG_parallel_task_execute(msg_task_t task)
+{
+ return task->execute();
+}
+
+msg_error_t MSG_parallel_task_execute_with_timeout(msg_task_t task, double timeout)
+{
+ task->set_timeout(timeout);
+ return task->execute();
}
+
/**
* @brief Sends a task on a mailbox.
*
xbt_assert((task != nullptr), "Cannot get information from a nullptr task");
if (task->compute) {
// Task in progress
- if (task->is_parallel())
- return task->compute->get_par_remaining_ratio();
- else
- return task->compute->get_seq_remaining_ratio();
+ return task->compute->get_remaining_ratio();
} else {
// Task not started (flops_amount is > 0.0) or finished (flops_amount is set to 0.0)
return task->flops_amount > 0.0 ? 1.0 : 0.0;