X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/f24ae5fb9223f69bf15e5418cdf856f111995e03..84079cc1149b1d8c588a029f2bde6c863b5ce740:/src/xbt/xbt_queue.c diff --git a/src/xbt/xbt_queue.c b/src/xbt/xbt_queue.c index 50a3d300bd..8321246017 100644 --- a/src/xbt/xbt_queue.c +++ b/src/xbt/xbt_queue.c @@ -16,7 +16,7 @@ #include "xbt/synchro.h" #include "xbt/queue.h" /* this module */ - +#include "gras/virtu.h" XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue"); typedef struct s_xbt_queue_ { @@ -57,7 +57,7 @@ void xbt_queue_free(xbt_queue_t *queue) { } /** @brief Get the queue size */ -unsigned long xbt_queue_length(xbt_queue_t queue) { +unsigned long xbt_queue_length(const xbt_queue_t queue) { unsigned long res; xbt_mutex_lock(queue->mutex); res=xbt_dynar_length(queue->data); @@ -69,7 +69,7 @@ unsigned long xbt_queue_length(xbt_queue_t queue) { * * This is blocking if the declared capacity is non-nul, and if this amount is reached. * - * @seealso #xbt_dynar_push + * @see #xbt_dynar_push */ void xbt_queue_push(xbt_queue_t queue, const void *src) { xbt_mutex_lock(queue->mutex); @@ -87,7 +87,7 @@ void xbt_queue_push(xbt_queue_t queue, const void *src) { * * This is blocking if the queue is empty. * - * @seealso #xbt_dynar_pop + * @see #xbt_dynar_pop * */ void xbt_queue_pop(xbt_queue_t queue, void* const dst) { @@ -105,7 +105,7 @@ void xbt_queue_pop(xbt_queue_t queue, void* const dst) { * * This is blocking if the declared capacity is non-nul, and if this amount is reached. * - * @seealso #xbt_dynar_unshift + * @see #xbt_dynar_unshift */ void xbt_queue_unshift(xbt_queue_t queue, const void *src) { xbt_mutex_lock(queue->mutex); @@ -123,7 +123,7 @@ void xbt_queue_unshift(xbt_queue_t queue, const void *src) { * * This is blocking if the queue is empty. * - * @seealso #xbt_dynar_shift + * @see #xbt_dynar_shift * */ void xbt_queue_shift(xbt_queue_t queue, void* const dst) { @@ -142,94 +142,152 @@ void xbt_queue_shift(xbt_queue_t queue, void* const dst) { /** @brief Push something to the message exchange queue, with a timeout. * - * @seealso #xbt_queue_push + * @see #xbt_queue_push */ void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) { - xbt_mutex_lock(queue->mutex); - if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity); - xbt_cond_timedwait(queue->not_full,queue->mutex, delay); - } - /* check if a timeout occurs */ - if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - xbt_mutex_unlock(queue->mutex); - THROW0(timeout_error,0,"Timeout"); - } - else { - xbt_dynar_push(queue->data,src); - xbt_cond_signal(queue->not_empty); - xbt_mutex_unlock(queue->mutex); - } + double timeout = xbt_time() + delay; + xbt_ex_t e; + + xbt_mutex_lock(queue->mutex); + + if (delay == 0) { + if (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data)) { + + xbt_mutex_unlock(queue->mutex); + THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0", + queue,queue->capacity); + } + } else { + while (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data) && + (delay<0 || xbt_time() < timeout) ) { + + DEBUG2("Capacity of %p exceded (=%d). Waiting", + queue,queue->capacity); + TRY { + xbt_cond_timedwait(queue->not_full,queue->mutex, + delay < 0 ? -1 : timeout - xbt_time()); + } CATCH(e) { + xbt_mutex_unlock(queue->mutex); + RETHROW; + } + } + } + + xbt_dynar_push(queue->data,src); + xbt_cond_signal(queue->not_empty); + xbt_mutex_unlock(queue->mutex); } /** @brief Pop something from the message exchange queue, with a timeout. * - * @seealso #xbt_queue_pop + * @see #xbt_queue_pop * */ void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) { - xbt_mutex_lock(queue->mutex); - if (xbt_dynar_length(queue->data) == 0) { + double timeout = xbt_time() + delay; + xbt_ex_t e; + + xbt_mutex_lock(queue->mutex); + + if (delay == 0) { + if (xbt_dynar_length(queue->data) == 0) { + xbt_mutex_unlock(queue->mutex); + THROW0(timeout_error,0,"Delay = 0, and queue is empty"); + } + } else { + while ( (xbt_dynar_length(queue->data) == 0) && + (delay<0 || xbt_time() < timeout) ) { DEBUG1("Queue %p empty. Waiting",queue); - xbt_cond_timedwait(queue->not_empty,queue->mutex,delay); - } - /* check if a timeout occurs */ - if (xbt_dynar_length(queue->data) == 0) { - xbt_mutex_unlock(queue->mutex); - THROW0(timeout_error,0,"Timeout"); - } - else { - xbt_dynar_pop(queue->data,dst); - xbt_cond_signal(queue->not_full); - xbt_mutex_unlock(queue->mutex); - } + TRY { + xbt_cond_timedwait(queue->not_empty,queue->mutex, + delay<0 ? -1 : timeout - xbt_time()); + } CATCH(e) { + xbt_mutex_unlock(queue->mutex); + RETHROW; + } + } + } + + xbt_dynar_pop(queue->data,dst); + xbt_cond_signal(queue->not_full); + xbt_mutex_unlock(queue->mutex); } /** @brief Unshift something to the message exchange queue, with a timeout. * - * @seealso #xbt_queue_unshift + * @see #xbt_queue_unshift */ void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) { - xbt_mutex_lock(queue->mutex); - if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity); - xbt_cond_timedwait(queue->not_full,queue->mutex,delay); - } - /* check if a timeout occurs */ - - if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - - xbt_mutex_unlock(queue->mutex); - THROW0(timeout_error,0,"Timeout"); - } - else { - xbt_dynar_unshift(queue->data,src); - xbt_cond_signal(queue->not_empty); - xbt_mutex_unlock(queue->mutex); - } + double timeout = xbt_time() + delay; + xbt_ex_t e; + + xbt_mutex_lock(queue->mutex); + + if (delay==0) { + if (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data)) { + + xbt_mutex_unlock(queue->mutex); + THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0", + queue,queue->capacity); + } + } else { + while (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data) && + (delay<0 || xbt_time() < timeout) ) { + + DEBUG2("Capacity of %p exceded (=%d). Waiting", + queue,queue->capacity); + TRY { + xbt_cond_timedwait(queue->not_full,queue->mutex, + delay < 0 ? -1 : timeout - xbt_time()); + } CATCH(e) { + xbt_mutex_unlock(queue->mutex); + RETHROW; + } + } + } + + xbt_dynar_unshift(queue->data,src); + xbt_cond_signal(queue->not_empty); + xbt_mutex_unlock(queue->mutex); } /** @brief Shift something from the message exchange queue, with a timeout. * - * @seealso #xbt_queue_shift + * @see #xbt_queue_shift * */ void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) { - xbt_mutex_lock(queue->mutex); - if (xbt_dynar_length(queue->data) == 0) { + double timeout = xbt_time() + delay; + xbt_ex_t e; + + xbt_mutex_lock(queue->mutex); + + if (delay == 0) { + if (xbt_dynar_length(queue->data) == 0) { + xbt_mutex_unlock(queue->mutex); + THROW0(timeout_error,0,"Delay = 0, and queue is empty"); + } + } else { + while ( (xbt_dynar_length(queue->data) == 0) && + (delay<0 || xbt_time() < timeout) ) { DEBUG1("Queue %p empty. Waiting",queue); - xbt_cond_timedwait(queue->not_empty,queue->mutex,delay); - } - /* check if a timeout occurs */ - if (xbt_dynar_length(queue->data) == 0) { - xbt_mutex_unlock(queue->mutex); - THROW0(timeout_error,0,"Timeout"); - } - else { - xbt_dynar_shift(queue->data,dst); - xbt_cond_signal(queue->not_full); - xbt_mutex_unlock(queue->mutex); - } + TRY { + xbt_cond_timedwait(queue->not_empty,queue->mutex, + delay<0 ? -1 : timeout - xbt_time()); + } CATCH(e) { + xbt_mutex_unlock(queue->mutex); + RETHROW; + } + } + } + + xbt_dynar_shift(queue->data,dst); + xbt_cond_signal(queue->not_full); + xbt_mutex_unlock(queue->mutex); }