From: Jonathan Rouzaud-Cornabas Date: Tue, 12 Feb 2013 13:39:29 +0000 (+0100) Subject: Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid X-Git-Tag: v3_9_90~494^2~18 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/2e9105988280d1e88b5b496d6e2eed4c8d541883?hp=-c Merge branch 'master' of git+ssh://scm.gforge.inria.fr//gitroot/simgrid/simgrid --- 2e9105988280d1e88b5b496d6e2eed4c8d541883 diff --combined include/simgrid/simix.h index 5a0dfde3fd,a856c0bc37..599746406a --- a/include/simgrid/simix.h +++ b/include/simgrid/simix.h @@@ -357,8 -357,8 +357,8 @@@ XBT_PUBLIC(void *) simcall_process_get_ XBT_PUBLIC(void) simcall_process_set_data(smx_process_t process, void *data); XBT_PUBLIC(smx_host_t) simcall_process_get_host(smx_process_t process); XBT_PUBLIC(const char *) simcall_process_get_name(smx_process_t process); - int simcall_process_get_PID(smx_process_t process); - int simcall_process_get_PPID(smx_process_t process); + XBT_PUBLIC(int) simcall_process_get_PID(smx_process_t process); + XBT_PUBLIC(int) simcall_process_get_PPID(smx_process_t process); XBT_PUBLIC(int) simcall_process_is_suspended(smx_process_t process); XBT_PUBLIC(xbt_dict_t) simcall_process_get_properties(smx_process_t host); XBT_PUBLIC(void) simcall_process_set_kill_time(smx_process_t process, double kill_time); @@@ -409,7 -409,6 +409,7 @@@ XBT_PUBLIC(smx_action_t) simcall_comm_i XBT_PUBLIC(void) simcall_comm_destroy(smx_action_t comm); XBT_PUBLIC(smx_action_t) simcall_comm_iprobe(smx_rdv_t rdv, int src, int tag, int (*match_fun)(void *, void *, smx_action_t), void *data); +XBT_PUBLIC(double) simcall_comm_change_rate_first_action(smx_rdv_t rdv, double newrate); XBT_PUBLIC(void) simcall_comm_cancel(smx_action_t comm); /* FIXME: waitany is going to be a vararg function, and should take a timeout */ diff --combined src/msg/msg_gos.c index 7eb2d791e2,e37e0d7002..aeb6ad343b --- a/src/msg/msg_gos.c +++ b/src/msg/msg_gos.c @@@ -125,6 -125,7 +125,7 @@@ msg_error_t MSG_parallel_task_execute(m */ msg_error_t MSG_process_sleep(double nb_sec) { + xbt_ex_t e; msg_error_t status = MSG_OK; /*msg_process_t proc = MSG_process_self();*/ @@@ -140,7 -141,19 +141,19 @@@ proc->simdata->waiting_action = NULL;*/ - simcall_process_sleep(nb_sec); + TRY { + simcall_process_sleep(nb_sec); + } + CATCH(e) { + switch (e.category) { + case cancel_error: + status = MSG_TASK_CANCELED; + break; + default: + RETHROW; + } + xbt_ex_free(e); + } #ifdef HAVE_TRACING TRACE_msg_process_sleep_out(MSG_process_self()); @@@ -186,26 -199,6 +199,26 @@@ MSG_task_receive_from_host(msg_task_t return MSG_task_receive_ext(task, alias, -1, host); } +/** msg_task_usage + *\brief Deprecated function that used to receive a task from a mailbox from a specific host + *\brief at a given rate + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param host a #msg_host_t host from where the task was sent + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, + * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise. + */ +msg_error_t +MSG_task_receive_from_host_bounded(msg_task_t * task, const char *alias, + msg_host_t host, double rate) +{ + return MSG_task_receive_ext_bounded(task, alias, -1, host, rate); +} + /** \ingroup msg_task_usage * \brief Receives a task from a mailbox. * @@@ -225,22 -218,6 +238,22 @@@ msg_error_t MSG_task_receive(msg_task_ return MSG_task_receive_with_timeout(task, alias, -1); } +/** \ingroup msg_task_usage + * \brief Receives a task from a mailbox at a given rate. + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, + * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise. + */ +msg_error_t MSG_task_receive_bounded(msg_task_t * task, const char *alias, double rate) +{ + return MSG_task_receive_with_timeout_bounded(task, alias, -1, rate); +} + /** \ingroup msg_task_usage * \brief Receives a task from a mailbox with a given timeout. * @@@ -264,25 -241,6 +277,25 @@@ MSG_task_receive_with_timeout(msg_task_ return MSG_task_receive_ext(task, alias, timeout, NULL); } +/** \ingroup msg_task_usage + * \brief Receives a task from a mailbox with a given timeout and at a given rate. + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_receive) + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, + * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise. + */ +msg_error_t +MSG_task_receive_with_timeout_bounded(msg_task_t * task, const char *alias, + double timeout,double rate) +{ + return MSG_task_receive_ext_bounded(task, alias, timeout, NULL,rate); +} + /** \ingroup msg_task_usage * \brief Receives a task from a mailbox from a specific host with a given timeout. * @@@ -304,38 -262,28 +317,53 @@@ msg_error_ MSG_task_receive_ext(msg_task_t * task, const char *alias, double timeout, msg_host_t host) { + xbt_ex_t e; + msg_error_t ret = MSG_OK; XBT_DEBUG ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", alias); - return MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, - host, timeout); + TRY { + ret = MSG_mailbox_get_task_ext(MSG_mailbox_get_by_alias(alias), task, + host, timeout); + } + CATCH(e) { + switch (e.category) { + case cancel_error: /* may be thrown by MSG_mailbox_get_by_alias */ + ret = MSG_HOST_FAILURE; + break; + default: + RETHROW; + } + xbt_ex_free(e); + } + return ret; } +/** \ingroup msg_task_usage + * \brief Receives a task from a mailbox from a specific host with a given timeout + * and at a given rate. + * + * \param task a memory location for storing a #msg_task_t. + * \param alias name of the mailbox to receive the task from + * \param timeout is the maximum wait time for completion (provide -1 for no timeout) + * \param host a #msg_host_t host from where the task was sent + * \param rate limit the reception to rate bandwidth + * + * \return Returns + * #MSG_OK if the task was successfully received, +* #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise. + */ +msg_error_t +MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, double timeout, + msg_host_t host, double rate) +{ + XBT_DEBUG + ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'", + alias); + return MSG_mailbox_get_task_ext_bounded(MSG_mailbox_get_by_alias(alias), task, + host, timeout, rate); +} + /** \ingroup msg_task_usage * \brief Sends a task on a mailbox. * @@@ -580,40 -528,6 +608,40 @@@ msg_comm_t MSG_task_irecv(msg_task_t *t return comm; } +/** \ingroup msg_task_usage + * \brief Starts listening for receiving a task from an asynchronous communication + * at a given rate. + * + * \param task a memory location for storing a #msg_task_t. has to be valid until the end of the communication. + * \param name of the mailbox to receive the task on + * \param rate limit the bandwidth to the given rate + * \return the msg_comm_t communication created + */ +msg_comm_t MSG_task_irecv_bounded(msg_task_t *task, const char *name, double rate) +{ + + + smx_rdv_t rdv = MSG_mailbox_get_by_alias(name); + simcall_comm_change_rate_first_action(rdv,rate); + /* FIXME: these functions are not traceable */ + + /* Sanity check */ + xbt_assert(task, "Null pointer for the task storage"); + + if (*task) + XBT_CRITICAL + ("MSG_task_irecv() was asked to write in a non empty task struct."); + + /* Try to receive it by calling SIMIX network layer */ + msg_comm_t comm = xbt_new0(s_msg_comm_t, 1); + comm->task_sent = NULL; + comm->task_received = task; + comm->status = MSG_OK; + comm->s_comm = simcall_comm_irecv(rdv, task, NULL, NULL, NULL); + + return comm; +} + /** \ingroup msg_task_usage * \brief Checks whether a communication is done, and if yes, finalizes it. * \param comm the communication to test diff --combined src/msg/msg_mailbox.c index e4e633f6f6,1abbf0ee2d..15d9c3ff82 --- a/src/msg/msg_mailbox.c +++ b/src/msg/msg_mailbox.c @@@ -44,10 -44,6 +44,10 @@@ MSG_mailbox_get_count_host_waiting_task return simcall_rdv_comm_count_by_host(mailbox, host); } +double MSG_set_rate_before_read(msg_mailbox_t mailbox, double newrate) { + return simcall_comm_change_rate_first_action(mailbox,newrate); +} + msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias) { @@@ -120,6 -116,9 +120,9 @@@ MSG_mailbox_get_task_ext(msg_mailbox_t } CATCH(e) { switch (e.category) { + case cancel_error: + ret = MSG_HOST_FAILURE; + break; case network_error: ret = MSG_TRANSFER_FAILURE; break; @@@ -142,30 -141,6 +145,30 @@@ MSG_RETURN(ret); } + + +/** \ingroup msg_mailbox_management + * \brief Get a task from a mailbox on a given host at a given rate + * + * \param mailbox The mailbox where the task was sent + * \param task a memory location for storing a #msg_task_t. + * \param host a #msg_host_t host from where the task was sent + * \param timeout a timeout + * \param rate a bandwidth rate + + * \return Returns + * #MSG_OK if the task was successfully received, + * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise. + */ +msg_error_t +MSG_mailbox_get_task_ext_bounded(msg_mailbox_t mailbox, msg_task_t * task, + msg_host_t host, double timeout, double rate) +{ + MSG_set_rate_before_read(mailbox,rate); + MSG_RETURN(MSG_mailbox_get_task_ext(mailbox,task,host,timeout)); +} + + msg_error_t MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, msg_task_t task, double timeout) @@@ -211,6 -186,9 +214,9 @@@ CATCH(e) { switch (e.category) { + case cancel_error: + ret = MSG_HOST_FAILURE; + break; case network_error: ret = MSG_TRANSFER_FAILURE; break; diff --combined src/simix/smx_user.c index 3d17a0522d,f01364295c..650f46b336 --- a/src/simix/smx_user.c +++ b/src/simix/smx_user.c @@@ -607,7 -607,7 +607,7 @@@ smx_rdv_t simcall_rdv_get_by_name(cons { xbt_assert(name != NULL, "Invalid parameter for simcall_rdv_get_by_name (name is NULL)"); - /* FIXME: this is a horrible lost of performance, so we hack it out by + /* FIXME: this is a horrible loss of performance, so we hack it out by * skipping the simcall (for now). It works in parallel, it won't work on * distributed but probably we will change MSG for that. */ @@@ -735,29 -735,6 +735,29 @@@ smx_action_t simcall_comm_irecv(smx_rdv } +/** + * \ingroup simix_comm_management + */ +double simcall_comm_change_rate_first_action(smx_rdv_t rdv, double newrate) +{ + xbt_assert(rdv, "No rendez-vous point defined for change_rate_first_action"); + + smx_action_t action; + xbt_fifo_item_t item; + + item = xbt_fifo_get_first_item(rdv->comm_fifo); + if (item != NULL) { + action = (smx_action_t) xbt_fifo_get_item_content(item); + if (action->comm.rate > newrate) { + action->comm.rate = newrate; + return newrate; + } else + return action->comm.rate; + } else + return -1.0; +} + + /** * \ingroup simix_comm_management */