X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/7a8cd62135619ad52e05ae1c929ef07e166e4260..5cc7b86de46ab1595542f8e00219d45837fe82c0:/src/msg/msg_gos.c diff --git a/src/msg/msg_gos.c b/src/msg/msg_gos.c index d5ba36ef22..e8112ed93a 100644 --- a/src/msg/msg_gos.c +++ b/src/msg/msg_gos.c @@ -23,7 +23,15 @@ XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_gos, msg, */ msg_error_t MSG_task_execute(msg_task_t task) { - return MSG_parallel_task_execute(task); + /* TODO: add this to other locations */ + msg_host_t host = MSG_process_get_host(MSG_process_self()); + MSG_host_add_task(host, task); + + msg_error_t ret = MSG_parallel_task_execute(task); + + MSG_host_del_task(host, task); + + return ret; } /** \ingroup msg_task_usage @@ -77,7 +85,8 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task) simdata->compute = simcall_host_execute(task->name, p_simdata->m_host, simdata->computation_amount, - simdata->priority); + simdata->priority, + simdata->bound); } #ifdef HAVE_TRACING @@ -125,6 +134,7 @@ msg_error_t MSG_parallel_task_execute(msg_task_t task) */ msg_error_t MSG_process_sleep(double nb_sec) { + xbt_ex_t e; msg_error_t status = MSG_OK; /*msg_process_t proc = MSG_process_self();*/ @@ -140,7 +150,19 @@ msg_error_t MSG_process_sleep(double nb_sec) proc->simdata->waiting_action = NULL;*/ - simcall_process_sleep(nb_sec); + TRY { + simcall_process_sleep(nb_sec); + } + CATCH(e) { + switch (e.category) { + case cancel_error: + status = MSG_TASK_CANCELED; + break; + default: + RETHROW; + } + xbt_ex_free(e); + } #ifdef HAVE_TRACING TRACE_msg_process_sleep_out(MSG_process_self()); @@ -186,6 +208,26 @@ MSG_task_receive_from_host(msg_task_t * task, const char *alias, return MSG_task_receive_ext(task, alias, -1, host); } +/** msg_task_usage + *\brief Deprecated function that used to receive a task from a mailbox from a specific host + *\brief at a given rate + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param host a #msg_host_t host from where the task was sent + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, + * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise. + */ +msg_error_t +MSG_task_receive_from_host_bounded(msg_task_t * task, const char *alias, + msg_host_t host, double rate) +{ + return MSG_task_receive_ext_bounded(task, alias, -1, host, rate); +} + /** \ingroup msg_task_usage * \brief Receives a task from a mailbox. * @@ -205,6 +247,22 @@ msg_error_t MSG_task_receive(msg_task_t * task, const char *alias) return MSG_task_receive_with_timeout(task, alias, -1); } +/** \ingroup msg_task_usage + * \brief Receives a task from a mailbox at a given rate. + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, + * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise. + */ +msg_error_t MSG_task_receive_bounded(msg_task_t * task, const char *alias, double rate) +{ + return MSG_task_receive_with_timeout_bounded(task, alias, -1, rate); +} + /** \ingroup msg_task_usage * \brief Receives a task from a mailbox with a given timeout. * @@ -228,6 +286,25 @@ MSG_task_receive_with_timeout(msg_task_t * task, const char *alias, return MSG_task_receive_ext(task, alias, timeout, NULL); } +/** \ingroup msg_task_usage + * \brief Receives a task from a mailbox with a given timeout and at a given rate. + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_receive) + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, + * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise. + */ +msg_error_t +MSG_task_receive_with_timeout_bounded(msg_task_t * task, const char *alias, + double timeout,double rate) +{ + return MSG_task_receive_ext_bounded(task, alias, timeout, NULL,rate); +} + /** \ingroup msg_task_usage * \brief Receives a task from a mailbox from a specific host with a given timeout. * @@ -249,11 +326,51 @@ msg_error_t MSG_task_receive_ext(msg_task_t * task, const char *alias, double timeout, msg_host_t host) { + xbt_ex_t e; + msg_error_t ret = MSG_OK; XBT_DEBUG ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", alias); - return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, - host, timeout); + TRY { + ret = MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, + host, timeout); + } + CATCH(e) { + switch (e.category) { + case cancel_error: /* may be thrown by MSG_mailbox_get_by_alias */ + ret = MSG_HOST_FAILURE; + break; + default: + RETHROW; + } + xbt_ex_free(e); + } + return ret; +} + +/** \ingroup msg_task_usage + * \brief Receives a task from a mailbox from a specific host with a given timeout + * and at a given rate. + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param timeout is the maximum wait time for completion (provide -1 for no timeout) + * \param host a #msg_host_t host from where the task was sent + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, +* #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise. + */ +msg_error_t +MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, double timeout, + msg_host_t host, double rate) +{ + XBT_DEBUG + ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", + alias); + return MSG_mailbox_get_task_ext_bounded(MSG_mailbox_get_by_alias(alias), task, + host, timeout, rate); } /** \ingroup msg_task_usage @@ -500,6 +617,40 @@ msg_comm_t MSG_task_irecv(msg_task_t *task, const char *name) return comm; } +/** \ingroup msg_task_usage + * \brief Starts listening for receiving a task from an asynchronous communication + * at a given rate. + * + * \param task a memory location for storing a #msg_task_t. has to be valid until the end of the communication. + * \param name of the mailbox to receive the task on + * \param rate limit the bandwidth to the given rate + * \return the msg_comm_t communication created + */ +msg_comm_t MSG_task_irecv_bounded(msg_task_t *task, const char *name, double rate) +{ + + + smx_rdv_t rdv = MSG_mailbox_get_by_alias(name); + + /* FIXME: these functions are not traceable */ + + /* Sanity check */ + xbt_assert(task, "Null pointer for the task storage"); + + if (*task) + XBT_CRITICAL + ("MSG_task_irecv() was asked to write in a non empty task struct."); + + /* Try to receive it by calling SIMIX network layer */ + msg_comm_t comm = xbt_new0(s_msg_comm_t, 1); + comm->task_sent = NULL; + comm->task_received = task; + comm->status = MSG_OK; + comm->s_comm = simcall_comm_irecv_bounded(rdv, task, NULL, NULL, NULL, rate); + + return comm; +} + /** \ingroup msg_task_usage * \brief Checks whether a communication is done, and if yes, finalizes it. * \param comm the communication to test