X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/0c14e681436e252f680310c7696b3ba17a349706..0e9c0448c6566825b170b98ecff716b098bda10e:/src/msg/msg_task.c diff --git a/src/msg/msg_task.c b/src/msg/msg_task.c index a833cf50f9..aee793c7d1 100644 --- a/src/msg/msg_task.c +++ b/src/msg/msg_task.c @@ -1,10 +1,11 @@ -/* Copyright (c) 2004, 2005, 2006, 2007, 2008, 2009, 2010. The SimGrid Team. +/* Copyright (c) 2004-2013. The SimGrid Team. * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ #include "msg_private.h" +#include "simix/smx_private.h" #include "xbt/sysdep.h" #include "xbt/log.h" @@ -24,9 +25,9 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_task, msg, /********************************* Task **************************************/ /** \ingroup m_task_management - * \brief Creates a new #m_task_t. + * \brief Creates a new #msg_task_t. * - * A constructor for #m_task_t taking four arguments and returning the + * A constructor for #msg_task_t taking four arguments and returning the corresponding object. * \param name a name for the object. It is for user-level information and can be NULL. @@ -39,31 +40,35 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_task, msg, * \param data a pointer to any data may want to attach to the new object. It is for user-level information and can be NULL. It can be retrieved with the function \ref MSG_task_get_data. - * \see m_task_t + * \see msg_task_t * \return The new corresponding object. */ -m_task_t MSG_task_create(const char *name, double compute_duration, +msg_task_t MSG_task_create(const char *name, double compute_duration, double message_size, void *data) { - m_task_t task = xbt_new(s_m_task_t, 1); + msg_task_t task = xbt_new(s_msg_task_t, 1); simdata_task_t simdata = xbt_new(s_simdata_task_t, 1); task->simdata = simdata; + /* Task structure */ task->name = xbt_strdup(name); task->data = data; /* Simulator Data */ - simdata->host_nb = 0; - simdata->computation_amount = compute_duration; + simdata->compute = NULL; + simdata->comm = NULL; simdata->message_size = message_size; - simdata->rate = -1.0; - simdata->priority = 1.0; - simdata->isused = 0; + simdata->computation_amount = compute_duration; simdata->sender = NULL; simdata->receiver = NULL; - simdata->compute = NULL; - simdata->comm = NULL; + simdata->source = NULL; + simdata->priority = 1.0; + simdata->bound = 0; + simdata->affinity_mask_db = xbt_dict_new_homogeneous(NULL); + simdata->rate = -1.0; + simdata->isused = 0; + simdata->host_nb = 0; simdata->host_list = NULL; simdata->comp_amount = NULL; simdata->comm_amount = NULL; @@ -74,11 +79,52 @@ m_task_t MSG_task_create(const char *name, double compute_duration, return task; } +/** \ingroup m_task_management + * \brief Creates a new #msg_task_t (a parallel one....). + * + * A constructor for #msg_task_t taking six arguments and returning the + corresponding object. + * \param name a name for the object. It is for user-level information + and can be NULL. + * \param host_nb the number of hosts implied in the parallel task. + * \param host_list an array of \p host_nb msg_host_t. + * \param computation_amount an array of \p host_nb + doubles. computation_amount[i] is the total number of operations + that have to be performed on host_list[i]. + * \param communication_amount an array of \p host_nb* \p host_nb doubles. + * \param data a pointer to any data may want to attach to the new + object. It is for user-level information and can be NULL. It can + be retrieved with the function \ref MSG_task_get_data. + * \see msg_task_t + * \return The new corresponding object. + */ +msg_task_t +MSG_parallel_task_create(const char *name, int host_nb, + const msg_host_t * host_list, + double *computation_amount, + double *communication_amount, void *data) +{ + msg_task_t task = MSG_task_create(name, 0, 0, data); + simdata_task_t simdata = task->simdata; + int i; + + /* Simulator Data specific to parallel tasks */ + simdata->host_nb = host_nb; + simdata->host_list = xbt_new0(smx_host_t, host_nb); + simdata->comp_amount = computation_amount; + simdata->comm_amount = communication_amount; + + for (i = 0; i < host_nb; i++) + simdata->host_list[i] = host_list[i]; + + return task; +} + /*************** Begin GPU ***************/ /** \ingroup m_task_management - * \brief Creates a new #m_gpu_task_t. + * \brief Creates a new #msg_gpu_task_t. - * A constructor for #m_gpu_task_t taking four arguments and returning + * A constructor for #msg_gpu_task_t taking four arguments and returning a pointer to the new created GPU task. * \param name a name for the object. It is for user-level information @@ -93,13 +139,13 @@ m_task_t MSG_task_create(const char *name, double compute_duration, * \param collect_latency time in seconds to transfer result from the GPU back to the CPU (host) when done - * \see m_gpu_task_t + * \see msg_gpu_task_t * \return The new corresponding object. */ -m_gpu_task_t MSG_gpu_task_create(const char *name, double compute_duration, +msg_gpu_task_t MSG_gpu_task_create(const char *name, double compute_duration, double dispatch_latency, double collect_latency) { - m_gpu_task_t task = xbt_new(s_m_gpu_task_t, 1); + msg_gpu_task_t task = xbt_new(s_msg_gpu_task_t, 1); simdata_gpu_task_t simdata = xbt_new(s_simdata_gpu_task_t, 1); task->simdata = simdata; /* Task structure */ @@ -120,12 +166,12 @@ m_gpu_task_t MSG_gpu_task_create(const char *name, double compute_duration, /*************** End GPU ***************/ /** \ingroup m_task_management - * \brief Return the user data of a #m_task_t. + * \brief Return the user data of a #msg_task_t. * * This function checks whether \a task is a valid pointer or not and return the user data associated to \a task if it is possible. */ -void *MSG_task_get_data(m_task_t task) +void *MSG_task_get_data(msg_task_t task) { xbt_assert((task != NULL), "Invalid parameter"); @@ -133,12 +179,12 @@ void *MSG_task_get_data(m_task_t task) } /** \ingroup m_task_management - * \brief Sets the user data of a #m_task_t. + * \brief Sets the user data of a #msg_task_t. * * This function allows to associate a new pointer to the user data associated of \a task. */ -void MSG_task_set_data(m_task_t task, void *data) +void MSG_task_set_data(msg_task_t task, void *data) { xbt_assert((task != NULL), "Invalid parameter"); @@ -150,7 +196,7 @@ void MSG_task_set_data(m_task_t task, void *data) * \param callback a callback function */ void MSG_task_set_copy_callback(void (*callback) - (m_task_t task, m_process_t sender, m_process_t receiver)) { + (msg_task_t task, msg_process_t sender, msg_process_t receiver)) { msg_global->task_copy_callback = callback; @@ -163,53 +209,53 @@ void MSG_task_set_copy_callback(void (*callback) } /** \ingroup m_task_management - * \brief Return the sender of a #m_task_t. + * \brief Return the sender of a #msg_task_t. * - * This functions returns the #m_process_t which sent this task + * This functions returns the #msg_process_t which sent this task */ -m_process_t MSG_task_get_sender(m_task_t task) +msg_process_t MSG_task_get_sender(msg_task_t task) { xbt_assert(task, "Invalid parameters"); return ((simdata_task_t) task->simdata)->sender; } /** \ingroup m_task_management - * \brief Return the source of a #m_task_t. + * \brief Return the source of a #msg_task_t. * - * This functions returns the #m_host_t from which this task was sent + * This functions returns the #msg_host_t from which this task was sent */ -m_host_t MSG_task_get_source(m_task_t task) +msg_host_t MSG_task_get_source(msg_task_t task) { xbt_assert(task, "Invalid parameters"); return ((simdata_task_t) task->simdata)->source; } /** \ingroup m_task_management - * \brief Return the name of a #m_task_t. + * \brief Return the name of a #msg_task_t. * - * This functions returns the name of a #m_task_t as specified on creation + * This functions returns the name of a #msg_task_t as specified on creation */ -const char *MSG_task_get_name(m_task_t task) +const char *MSG_task_get_name(msg_task_t task) { xbt_assert(task, "Invalid parameters"); return task->name; } /** \ingroup m_task_management - * \brief Sets the name of a #m_task_t. + * \brief Sets the name of a #msg_task_t. * * This functions allows to associate a name to a task */ -void MSG_task_set_name(m_task_t task, const char *name) +void MSG_task_set_name(msg_task_t task, const char *name) { xbt_assert(task, "Invalid parameters"); task->name = xbt_strdup(name); } /** \ingroup m_task_management - * \brief Destroy a #m_task_t. + * \brief Destroy a #msg_task_t. * - * Destructor for #m_task_t. Note that you should free user data, if any, \b + * Destructor for #msg_task_t. Note that you should free user data, if any, \b * before calling this function. * * Only the process that owns the task can destroy it. @@ -218,7 +264,7 @@ void MSG_task_set_name(m_task_t task, const char *name) * supposed to destroy it. The sender should not use it anymore. * If the task failed to be sent, the sender remains the owner of the task. */ -MSG_error_t MSG_task_destroy(m_task_t task) +msg_error_t MSG_task_destroy(msg_task_t task) { smx_action_t action = NULL; xbt_assert((task != NULL), "Invalid parameter"); @@ -240,6 +286,8 @@ MSG_error_t MSG_task_destroy(m_task_t task) /* parallel tasks only */ xbt_free(task->simdata->host_list); + xbt_dict_free(&task->simdata->affinity_mask_db); + /* free main structures */ xbt_free(task->simdata); xbt_free(task); @@ -249,13 +297,13 @@ MSG_error_t MSG_task_destroy(m_task_t task) /** \ingroup m_task_usage - * \brief Cancel a #m_task_t. + * \brief Cancel a #msg_task_t. * \param task the task to cancel. If it was executed or transfered, it stops the process that were working on it. */ -MSG_error_t MSG_task_cancel(m_task_t task) +msg_error_t MSG_task_cancel(msg_task_t task) { - xbt_assert((task != NULL), "Invalid parameter"); + xbt_assert((task != NULL), "Cannot cancel a NULL task"); if (task->simdata->compute) { simcall_host_execution_cancel(task->simdata->compute); @@ -268,10 +316,12 @@ MSG_error_t MSG_task_cancel(m_task_t task) } /** \ingroup m_task_management - * \brief Returns the computation amount needed to process a task #m_task_t. - * Once a task has been processed, this amount is thus set to 0... + * \brief Returns the computation amount needed to process a task #msg_task_t. + * + * Once a task has been processed, this amount is set to 0. If you want, you + * can reset this value with #MSG_task_set_compute_duration before restarting the task. */ -double MSG_task_get_compute_duration(m_task_t task) +double MSG_task_get_compute_duration(msg_task_t task) { xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter"); @@ -281,10 +331,15 @@ double MSG_task_get_compute_duration(m_task_t task) /** \ingroup m_task_management - * \brief set the computation amount needed to process a task #m_task_t. + * \brief set the computation amount needed to process a task #msg_task_t. + * + * \warning If the computation is ongoing (already started and not finished), + * it is not modified by this call. And the termination of the ongoing task with + * set the computation_amount to zero, overriding any value set during the + * execution. */ -void MSG_task_set_compute_duration(m_task_t task, +void MSG_task_set_compute_duration(msg_task_t task, double computation_amount) { xbt_assert(task, "Invalid parameter"); @@ -293,10 +348,30 @@ void MSG_task_set_compute_duration(m_task_t task, } /** \ingroup m_task_management - * \brief Returns the remaining computation amount of a task #m_task_t. + * \brief set the amount data attached with a task #msg_task_t. + * + * \warning If the transfer is ongoing (already started and not finished), + * it is not modified by this call. + */ + +void MSG_task_set_data_size(msg_task_t task, + double data_size) +{ + xbt_assert(task, "Invalid parameter"); + task->simdata->message_size = data_size; + +} + + + +/** \ingroup m_task_management + * \brief Returns the remaining computation amount of a task #msg_task_t. * + * If the task is ongoing, this call retrieves the remaining amount of work. + * If it is not ongoing, it returns the total amount of work that will be + * executed when the task starts. */ -double MSG_task_get_remaining_computation(m_task_t task) +double MSG_task_get_remaining_computation(msg_task_t task) { xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter"); @@ -309,12 +384,12 @@ double MSG_task_get_remaining_computation(m_task_t task) } /** \ingroup m_task_management - * \brief Returns the total amount received by a task #m_task_t. + * \brief Returns the total amount received by a task #msg_task_t. * If the communication does not exist it will return 0. * So, if the communication has FINISHED or FAILED it returns * zero. */ -double MSG_task_get_remaining_communication(m_task_t task) +double MSG_task_get_remaining_communication(msg_task_t task) { xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter"); @@ -328,7 +403,7 @@ double MSG_task_get_remaining_communication(m_task_t task) * \brief Return 1 if communication task is limited by latency, 0 otherwise * */ -int MSG_task_is_latency_bounded(m_task_t task) +int MSG_task_is_latency_bounded(msg_task_t task) { xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter"); @@ -339,10 +414,10 @@ int MSG_task_is_latency_bounded(m_task_t task) #endif /** \ingroup m_task_management - * \brief Returns the size of the data attached to a task #m_task_t. + * \brief Returns the size of the data attached to a task #msg_task_t. * */ -double MSG_task_get_data_size(m_task_t task) +double MSG_task_get_data_size(msg_task_t task) { xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter"); @@ -358,7 +433,7 @@ double MSG_task_get_data_size(m_task_t task) * cpu power than the other ones. * */ -void MSG_task_set_priority(m_task_t task, double priority) +void MSG_task_set_priority(msg_task_t task, double priority) { xbt_assert((task != NULL) && (task->simdata != NULL), "Invalid parameter"); @@ -368,3 +443,111 @@ void MSG_task_set_priority(m_task_t task, double priority) simcall_host_execution_set_priority(task->simdata->compute, task->simdata->priority); } + + +/** \ingroup m_task_management + * \brief Changes the maximum CPU utilization of a computation task. + * Unit is flops/s. + * + * For VMs, there is a pitfall. Please see MSG_vm_set_bound(). + */ +void MSG_task_set_bound(msg_task_t task, double bound) +{ + xbt_assert(task, "Invalid parameter"); + xbt_assert(task->simdata, "Invalid parameter"); + + if (bound == 0) + XBT_INFO("bound == 0 means no capping (i.e., unlimited)."); + + task->simdata->bound = bound; + if (task->simdata->compute) + simcall_host_execution_set_bound(task->simdata->compute, + task->simdata->bound); +} + + +/** \ingroup m_task_management + * \brief Changes the CPU affinity of a computation task. + * + * When pinning the given task to the first CPU core of the given host, use + * 0x01 for the mask value. Each bit of the mask value corresponds to each CPU + * core. See taskset(1) on Linux. + * + * \param task a target task + * \param host the host having a multi-core CPU + * \param mask the bit mask of a new CPU affinity setting for the task + * + * + * Usage: + * 0. Define a host with multiple cores. + * + * + * 1. Pin a given task to the first CPU core of a host. + * MSG_task_set_affinity(task, pm0, 0x01); + * + * 2. Pin a given task to the third CPU core of a host. Turn on the third bit of the mask. + * MSG_task_set_affinity(task, pm0, 0x04); // 0x04 == 100B + * + * 3. Pin a given VM to the first CPU core of a host. + * MSG_vm_set_affinity(vm, pm0, 0x01); + * + * See examples/msg/cloud/multicore.c for more information. + * + * + * Note: + * 1. The current code does not allow an affinity of a task to multiple cores. + * The mask value 0x03 (i.e., a given task will be executed on the first core + * or the second core) is not allowed. The mask value 0x01 or 0x02 works. See + * cpu_cas01.c for details. + * + * 2. It is recommended to first compare simulation results in both the Lazy + * and Full calculation modes (using --cfg=cpu/optim:Full or not). Fix + * cpu_cas01.c if you find wrong results in the Lazy mode. + * + */ +void MSG_task_set_affinity(msg_task_t task, msg_host_t host, unsigned long mask) +{ + xbt_assert(task, "Invalid parameter"); + xbt_assert(task->simdata, "Invalid parameter"); + + if (mask == 0) { + /* 0 means clear */ + { + /* We need remove_ext() not throwing exception. */ + void *ret = xbt_dict_get_or_null_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(msg_host_t)); + if (ret != NULL) + xbt_dict_remove_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(host)); + } + } else + xbt_dict_set_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(host), (void *) mask, NULL); + + /* We set affinity data of this task. If the task is being executed, we + * actually change the affinity setting of the task. Otherwise, this change + * will be applied when the task is executed. */ + + if (!task->simdata->compute) { + /* task is not yet executed */ + XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(host), MSG_task_get_name(task)); + return; + } + + { + smx_action_t compute = task->simdata->compute; + msg_host_t host_now = compute->execution.host; // simix_private.h is necessary + if (host_now != host) { + /* task is not yet executed on this host */ + XBT_INFO("set affinity(0x%04lx@%s) for %s (not active now)", mask, MSG_host_get_name(host), MSG_task_get_name(task)); + return; + } + + /* task is being executed on this host. so change the affinity now */ + { + /* check it works. remove me if it works. */ + unsigned long affinity_mask = (unsigned long) xbt_dict_get_or_null_ext(task->simdata->affinity_mask_db, (char *) host, sizeof(msg_host_t)); + xbt_assert(affinity_mask == mask); + } + + XBT_INFO("set affinity(0x%04lx@%s) for %s", mask, MSG_host_get_name(host), MSG_task_get_name(task)); + simcall_host_execution_set_affinity(task->simdata->compute, host, mask); + } +}