Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bounded receive
authorJonathan Rouzaud-Cornabas <jonathan.rouzaud-cornabas@ens-lyon.fr>
Tue, 12 Feb 2013 13:39:05 +0000 (14:39 +0100)
committerJonathan Rouzaud-Cornabas <jonathan.rouzaud-cornabas@ens-lyon.fr>
Tue, 12 Feb 2013 13:39:05 +0000 (14:39 +0100)
include/msg/msg.h
src/msg/msg_gos.c
src/msg/msg_mailbox.c
src/msg/msg_mailbox.h

index 330ed56..20690c0 100644 (file)
@@ -195,6 +195,8 @@ XBT_PUBLIC(msg_error_t) MSG_task_destroy(msg_task_t task);
 
 XBT_PUBLIC(msg_error_t) MSG_task_receive_from_host(msg_task_t * task, const char *alias,
                                        msg_host_t host);
+XBT_PUBLIC(msg_error_t) MSG_task_receive_from_host_bounded(msg_task_t * task, const char *alias,
+                                       msg_host_t host, double rate);
 
 XBT_PUBLIC(msg_error_t) MSG_task_execute(msg_task_t task);
 XBT_PUBLIC(msg_error_t) MSG_parallel_task_execute(msg_task_t task);
@@ -226,6 +228,20 @@ XBT_PUBLIC(msg_error_t)
     MSG_task_receive(msg_task_t * task, const char *alias);
 #define MSG_task_recv(t,a) MSG_task_receive(t,a)
 
+
+
+XBT_PUBLIC(msg_error_t)
+    MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, double timeout,
+                     msg_host_t host, double rate);
+
+XBT_PUBLIC(msg_error_t)
+    MSG_task_receive_with_timeout_bounded(msg_task_t * task, const char *alias,
+                              double timeout, double rate);
+
+XBT_PUBLIC(msg_error_t)
+    MSG_task_receive_bounded(msg_task_t * task, const char *alias,double rate);
+#define MSG_task_recv_bounded(t,a,r) MSG_task_receive_bounded(t,a,r)
+
 XBT_PUBLIC(msg_comm_t) MSG_task_isend(msg_task_t task, const char *alias);
 XBT_PUBLIC(msg_comm_t) MSG_task_isend_bounded(msg_task_t task, const char *alias, double maxrate);
 XBT_PUBLIC(msg_comm_t) MSG_task_isend_with_matching(msg_task_t task,
@@ -236,6 +252,7 @@ XBT_PUBLIC(msg_comm_t) MSG_task_isend_with_matching(msg_task_t task,
 XBT_PUBLIC(void) MSG_task_dsend(msg_task_t task, const char *alias, void_f_pvoid_t cleanup);
 XBT_PUBLIC(void) MSG_task_dsend_bounded(msg_task_t task, const char *alias, void_f_pvoid_t cleanup, double maxrate);
 XBT_PUBLIC(msg_comm_t) MSG_task_irecv(msg_task_t * task, const char *alias);
+XBT_PUBLIC(msg_comm_t) MSG_task_irecv_bounded(msg_task_t * task, const char *alias, double rate);
 XBT_PUBLIC(int) MSG_comm_test(msg_comm_t comm);
 XBT_PUBLIC(int) MSG_comm_testany(xbt_dynar_t comms);
 XBT_PUBLIC(void) MSG_comm_destroy(msg_comm_t comm);
index d5ba36e..7eb2d79 100644 (file)
@@ -186,6 +186,26 @@ MSG_task_receive_from_host(msg_task_t * task, const char *alias,
   return MSG_task_receive_ext(task, alias, -1, host);
 }
 
+/** msg_task_usage
+ *\brief Deprecated function that used to receive a task from a mailbox from a specific host
+ *\brief at a given rate
+ *
+ * \param task a memory location for storing a #msg_task_t.
+ * \param alias name of the mailbox to receive the task from
+ * \param host a #msg_host_t host from where the task was sent
+ * \param rate limit the reception to rate bandwidth
+ *
+ * \return Returns
+ * #MSG_OK if the task was successfully received,
+ * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
+ */
+msg_error_t
+MSG_task_receive_from_host_bounded(msg_task_t * task, const char *alias,
+                           msg_host_t host, double rate)
+{
+  return MSG_task_receive_ext_bounded(task, alias, -1, host, rate);
+}
+
 /** \ingroup msg_task_usage
  * \brief Receives a task from a mailbox.
  *
@@ -205,6 +225,22 @@ msg_error_t MSG_task_receive(msg_task_t * task, const char *alias)
   return MSG_task_receive_with_timeout(task, alias, -1);
 }
 
+/** \ingroup msg_task_usage
+ * \brief Receives a task from a mailbox at a given rate.
+ *
+ * \param task a memory location for storing a #msg_task_t.
+ * \param alias name of the mailbox to receive the task from
+ *  \param rate limit the reception to rate bandwidth
+ *
+ * \return Returns
+ * #MSG_OK if the task was successfully received,
+ * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
+ */
+msg_error_t MSG_task_receive_bounded(msg_task_t * task, const char *alias, double rate)
+{
+  return MSG_task_receive_with_timeout_bounded(task, alias, -1, rate);
+}
+
 /** \ingroup msg_task_usage
  * \brief Receives a task from a mailbox with a given timeout.
  *
@@ -228,6 +264,25 @@ MSG_task_receive_with_timeout(msg_task_t * task, const char *alias,
   return MSG_task_receive_ext(task, alias, timeout, NULL);
 }
 
+/** \ingroup msg_task_usage
+ * \brief Receives a task from a mailbox with a given timeout and at a given rate.
+ *
+ * \param task a memory location for storing a #msg_task_t.
+ * \param alias name of the mailbox to receive the task from
+ * \param timeout is the maximum wait time for completion (if -1, this call is the same as #MSG_task_receive)
+ *  \param rate limit the reception to rate bandwidth
+ *
+ * \return Returns
+ * #MSG_OK if the task was successfully received,
+ * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
+ */
+msg_error_t
+MSG_task_receive_with_timeout_bounded(msg_task_t * task, const char *alias,
+                              double timeout,double rate)
+{
+  return MSG_task_receive_ext_bounded(task, alias, timeout, NULL,rate);
+}
+
 /** \ingroup msg_task_usage
  * \brief Receives a task from a mailbox from a specific host with a given timeout.
  *
@@ -256,6 +311,31 @@ MSG_task_receive_ext(msg_task_t * task, const char *alias, double timeout,
                                   host, timeout);
 }
 
+/** \ingroup msg_task_usage
+ * \brief Receives a task from a mailbox from a specific host with a given timeout
+ *  and at a given rate.
+ *
+ * \param task a memory location for storing a #msg_task_t.
+ * \param alias name of the mailbox to receive the task from
+ * \param timeout is the maximum wait time for completion (provide -1 for no timeout)
+ * \param host a #msg_host_t host from where the task was sent
+ * \param rate limit the reception to rate bandwidth
+ *
+ * \return Returns
+ * #MSG_OK if the task was successfully received,
+* #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE, or #MSG_TIMEOUT otherwise.
+ */
+msg_error_t
+MSG_task_receive_ext_bounded(msg_task_t * task, const char *alias, double timeout,
+                     msg_host_t host, double rate)
+{
+  XBT_DEBUG
+      ("MSG_task_receive_ext: Trying to receive a message on mailbox '%s'",
+       alias);
+  return MSG_mailbox_get_task_ext_bounded(MSG_mailbox_get_by_alias(alias), task,
+                                  host, timeout, rate);
+}
+
 /** \ingroup msg_task_usage
  * \brief Sends a task on a mailbox.
  *
@@ -500,6 +580,40 @@ msg_comm_t MSG_task_irecv(msg_task_t *task, const char *name)
   return comm;
 }
 
+/** \ingroup msg_task_usage
+ * \brief Starts listening for receiving a task from an asynchronous communication
+ * at a given rate.
+ *
+ * \param task a memory location for storing a #msg_task_t. has to be valid until the end of the communication.
+ * \param name of the mailbox to receive the task on
+ * \param rate limit the bandwidth to the given rate
+ * \return the msg_comm_t communication created
+ */
+msg_comm_t MSG_task_irecv_bounded(msg_task_t *task, const char *name, double rate)
+{
+
+
+  smx_rdv_t rdv = MSG_mailbox_get_by_alias(name);
+  simcall_comm_change_rate_first_action(rdv,rate);
+  /* FIXME: these functions are not traceable */
+
+  /* Sanity check */
+  xbt_assert(task, "Null pointer for the task storage");
+
+  if (*task)
+    XBT_CRITICAL
+        ("MSG_task_irecv() was asked to write in a non empty task struct.");
+
+  /* Try to receive it by calling SIMIX network layer */
+  msg_comm_t comm = xbt_new0(s_msg_comm_t, 1);
+  comm->task_sent = NULL;
+  comm->task_received = task;
+  comm->status = MSG_OK;
+  comm->s_comm = simcall_comm_irecv(rdv, task, NULL, NULL, NULL);
+
+  return comm;
+}
+
 /** \ingroup msg_task_usage
  * \brief Checks whether a communication is done, and if yes, finalizes it.
  * \param comm the communication to test
index 942f332..e4e633f 100644 (file)
@@ -44,6 +44,10 @@ MSG_mailbox_get_count_host_waiting_tasks(msg_mailbox_t mailbox,
   return simcall_rdv_comm_count_by_host(mailbox, host);
 }
 
+double MSG_set_rate_before_read(msg_mailbox_t mailbox, double newrate) {
+       return simcall_comm_change_rate_first_action(mailbox,newrate);
+}
+
 msg_mailbox_t MSG_mailbox_get_by_alias(const char *alias)
 {
 
@@ -138,6 +142,30 @@ MSG_mailbox_get_task_ext(msg_mailbox_t mailbox, msg_task_t * task,
   MSG_RETURN(ret);
 }
 
+
+
+/** \ingroup msg_mailbox_management
+ * \brief Get a task from a mailbox on a given host at a given rate
+ *
+ * \param mailbox The mailbox where the task was sent
+ * \param task a memory location for storing a #msg_task_t.
+ * \param host a #msg_host_t host from where the task was sent
+ * \param timeout a timeout
+ * \param rate a bandwidth rate
+
+ * \return Returns
+ * #MSG_OK if the task was successfully received,
+ * #MSG_HOST_FAILURE, or #MSG_TRANSFER_FAILURE otherwise.
+ */
+msg_error_t
+MSG_mailbox_get_task_ext_bounded(msg_mailbox_t mailbox, msg_task_t * task,
+                         msg_host_t host, double timeout, double rate)
+{
+       MSG_set_rate_before_read(mailbox,rate);
+       MSG_RETURN(MSG_mailbox_get_task_ext(mailbox,task,host,timeout));
+}
+
+
 msg_error_t
 MSG_mailbox_put_with_timeout(msg_mailbox_t mailbox, msg_task_t task,
                              double timeout)
index 9f3f379..ac387d6 100644 (file)
@@ -40,6 +40,13 @@ void MSG_mailbox_free(void *mailbox);
  */
 void MSG_mailbox_free(void *mailbox);
 
+/* \brief MSG_set_rate_before_read - set a rate before receiving a task
+ *
+ * \param mailbox  The mailbox to release.
+ * \param rate    The new rate
+ */
+double MSG_set_rate_before_read(msg_mailbox_t mailbox, double rate);
+
 /* \brief MSG_mailbox_get_by_alias - get a mailbox from its alias.
  *
  * The function MSG_mailbox_get_by_alias returns the mailbox associated with