#include "xbt/synchro.h"
#include "xbt/queue.h" /* this module */
-
+#include "gras/virtu.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
typedef struct s_xbt_queue_ {
}
/** @brief Get the queue size */
-unsigned long xbt_queue_length(xbt_queue_t queue) {
+unsigned long xbt_queue_length(const xbt_queue_t queue) {
unsigned long res;
- xbt_mutex_lock(queue->mutex);
+ xbt_mutex_acquire(queue->mutex);
res=xbt_dynar_length(queue->data);
- xbt_mutex_unlock(queue->mutex);
+ xbt_mutex_release(queue->mutex);
return res;
}
*
* This is blocking if the declared capacity is non-nul, and if this amount is reached.
*
- * @seealso #xbt_dynar_push
+ * @see #xbt_dynar_push
*/
void xbt_queue_push(xbt_queue_t queue, const void *src) {
- xbt_mutex_lock(queue->mutex);
+ xbt_mutex_acquire(queue->mutex);
while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
xbt_cond_wait(queue->not_full,queue->mutex);
}
xbt_dynar_push(queue->data,src);
xbt_cond_signal(queue->not_empty);
+ xbt_mutex_release(queue->mutex);
}
*
* This is blocking if the queue is empty.
*
- * @seealso #xbt_dynar_pop
+ * @see #xbt_dynar_pop
*
*/
void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
- xbt_mutex_lock(queue->mutex);
+ xbt_mutex_acquire(queue->mutex);
while (xbt_dynar_length(queue->data) == 0) {
DEBUG1("Queue %p empty. Waiting",queue);
xbt_cond_wait(queue->not_empty,queue->mutex);
}
xbt_dynar_pop(queue->data,dst);
xbt_cond_signal(queue->not_full);
+ xbt_mutex_release(queue->mutex);
}
/** @brief Unshift something to the message exchange queue.
*
* This is blocking if the declared capacity is non-nul, and if this amount is reached.
*
- * @seealso #xbt_dynar_unshift
+ * @see #xbt_dynar_unshift
*/
void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
- xbt_mutex_lock(queue->mutex);
+ xbt_mutex_acquire(queue->mutex);
while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
xbt_cond_wait(queue->not_full,queue->mutex);
}
xbt_dynar_unshift(queue->data,src);
xbt_cond_signal(queue->not_empty);
+ xbt_mutex_release(queue->mutex);
}
*
* This is blocking if the queue is empty.
*
- * @seealso #xbt_dynar_shift
+ * @see #xbt_dynar_shift
*
*/
void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
- xbt_mutex_lock(queue->mutex);
+ xbt_mutex_acquire(queue->mutex);
while (xbt_dynar_length(queue->data) == 0) {
DEBUG1("Queue %p empty. Waiting",queue);
xbt_cond_wait(queue->not_empty,queue->mutex);
}
xbt_dynar_shift(queue->data,dst);
xbt_cond_signal(queue->not_full);
+ xbt_mutex_release(queue->mutex);
+}
+
+
+
+
+/** @brief Push something to the message exchange queue, with a timeout.
+ *
+ * @see #xbt_queue_push
+ */
+void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
+ double timeout = xbt_time() + delay;
+ xbt_ex_t e;
+
+ xbt_mutex_acquire(queue->mutex);
+
+ if (delay == 0) {
+ if (queue->capacity != 0 &&
+ queue->capacity == xbt_dynar_length(queue->data)) {
+
+ xbt_mutex_release(queue->mutex);
+ THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
+ queue,queue->capacity);
+ }
+ } else {
+ while (queue->capacity != 0 &&
+ queue->capacity == xbt_dynar_length(queue->data) &&
+ (delay<0 || xbt_time() < timeout) ) {
+
+ DEBUG2("Capacity of %p exceded (=%d). Waiting",
+ queue,queue->capacity);
+ TRY {
+ xbt_cond_timedwait(queue->not_full,queue->mutex,
+ delay < 0 ? -1 : timeout - xbt_time());
+ } CATCH(e) {
+ xbt_mutex_release(queue->mutex);
+ RETHROW;
+ }
+ }
+ }
+
+ xbt_dynar_push(queue->data,src);
+ xbt_cond_signal(queue->not_empty);
+ xbt_mutex_release(queue->mutex);
+}
+
+
+/** @brief Pop something from the message exchange queue, with a timeout.
+ *
+ * @see #xbt_queue_pop
+ *
+ */
+void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
+ double timeout = xbt_time() + delay;
+ xbt_ex_t e;
+
+ xbt_mutex_acquire(queue->mutex);
+
+ if (delay == 0) {
+ if (xbt_dynar_length(queue->data) == 0) {
+ xbt_mutex_release(queue->mutex);
+ THROW0(timeout_error,0,"Delay = 0, and queue is empty");
+ }
+ } else {
+ while ( (xbt_dynar_length(queue->data) == 0) &&
+ (delay<0 || xbt_time() < timeout) ) {
+ DEBUG1("Queue %p empty. Waiting",queue);
+ TRY {
+ xbt_cond_timedwait(queue->not_empty,queue->mutex,
+ delay<0 ? -1 : timeout - xbt_time());
+ } CATCH(e) {
+ xbt_mutex_release(queue->mutex);
+ RETHROW;
+ }
+ }
+ }
+
+ xbt_dynar_pop(queue->data,dst);
+ xbt_cond_signal(queue->not_full);
+ xbt_mutex_release(queue->mutex);
+}
+
+/** @brief Unshift something to the message exchange queue, with a timeout.
+ *
+ * @see #xbt_queue_unshift
+ */
+void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
+ double timeout = xbt_time() + delay;
+ xbt_ex_t e;
+
+ xbt_mutex_acquire(queue->mutex);
+
+ if (delay==0) {
+ if (queue->capacity != 0 &&
+ queue->capacity == xbt_dynar_length(queue->data)) {
+
+ xbt_mutex_release(queue->mutex);
+ THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
+ queue,queue->capacity);
+ }
+ } else {
+ while (queue->capacity != 0 &&
+ queue->capacity == xbt_dynar_length(queue->data) &&
+ (delay<0 || xbt_time() < timeout) ) {
+
+ DEBUG2("Capacity of %p exceded (=%d). Waiting",
+ queue,queue->capacity);
+ TRY {
+ xbt_cond_timedwait(queue->not_full,queue->mutex,
+ delay < 0 ? -1 : timeout - xbt_time());
+ } CATCH(e) {
+ xbt_mutex_release(queue->mutex);
+ RETHROW;
+ }
+ }
+ }
+
+ xbt_dynar_unshift(queue->data,src);
+ xbt_cond_signal(queue->not_empty);
+ xbt_mutex_release(queue->mutex);
+}
+
+
+/** @brief Shift something from the message exchange queue, with a timeout.
+ *
+ * @see #xbt_queue_shift
+ *
+ */
+void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
+ double timeout = xbt_time() + delay;
+ xbt_ex_t e;
+
+ xbt_mutex_acquire(queue->mutex);
+
+ if (delay == 0) {
+ if (xbt_dynar_length(queue->data) == 0) {
+ xbt_mutex_release(queue->mutex);
+ THROW0(timeout_error,0,"Delay = 0, and queue is empty");
+ }
+ } else {
+ while ( (xbt_dynar_length(queue->data) == 0) &&
+ (delay<0 || xbt_time() < timeout) ) {
+ DEBUG1("Queue %p empty. Waiting",queue);
+ TRY {
+ xbt_cond_timedwait(queue->not_empty,queue->mutex,
+ delay<0 ? -1 : timeout - xbt_time());
+ } CATCH(e) {
+ xbt_mutex_release(queue->mutex);
+ RETHROW;
+ }
+ }
+ }
+
+ if (xbt_dynar_length(queue->data) == 0) {
+ xbt_mutex_release(queue->mutex);
+ THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay);
+ }
+
+ xbt_dynar_shift(queue->data,dst);
+ xbt_cond_signal(queue->not_full);
+ xbt_mutex_release(queue->mutex);
}