X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/aed08a7a06b25674081adf610f369530e8c1e3bb..34c6518ab6cdb0b402cca01ac045561cba9bd009:/src/xbt/xbt_queue.c diff --git a/src/xbt/xbt_queue.c b/src/xbt/xbt_queue.c index a6f6278b13..39643f41c3 100644 --- a/src/xbt/xbt_queue.c +++ b/src/xbt/xbt_queue.c @@ -11,7 +11,6 @@ #include "xbt/misc.h" #include "xbt/sysdep.h" #include "xbt/log.h" -//#include "xbt/ex.h" #include "xbt/dynar.h" #include "xbt/synchro.h" @@ -57,11 +56,11 @@ void xbt_queue_free(xbt_queue_t *queue) { } /** @brief Get the queue size */ -unsigned long xbt_queue_length(xbt_queue_t queue) { +unsigned long xbt_queue_length(const xbt_queue_t queue) { unsigned long res; - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); res=xbt_dynar_length(queue->data); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); return res; } @@ -72,14 +71,14 @@ unsigned long xbt_queue_length(xbt_queue_t queue) { * @see #xbt_dynar_push */ void xbt_queue_push(xbt_queue_t queue, const void *src) { - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity); xbt_cond_wait(queue->not_full,queue->mutex); } xbt_dynar_push(queue->data,src); xbt_cond_signal(queue->not_empty); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); } @@ -91,14 +90,14 @@ void xbt_queue_push(xbt_queue_t queue, const void *src) { * */ void xbt_queue_pop(xbt_queue_t queue, void* const dst) { - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); while (xbt_dynar_length(queue->data) == 0) { DEBUG1("Queue %p empty. Waiting",queue); xbt_cond_wait(queue->not_empty,queue->mutex); } xbt_dynar_pop(queue->data,dst); xbt_cond_signal(queue->not_full); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); } /** @brief Unshift something to the message exchange queue. @@ -108,14 +107,14 @@ void xbt_queue_pop(xbt_queue_t queue, void* const dst) { * @see #xbt_dynar_unshift */ void xbt_queue_unshift(xbt_queue_t queue, const void *src) { - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity); xbt_cond_wait(queue->not_full,queue->mutex); } xbt_dynar_unshift(queue->data,src); xbt_cond_signal(queue->not_empty); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); } @@ -127,14 +126,14 @@ void xbt_queue_unshift(xbt_queue_t queue, const void *src) { * */ void xbt_queue_shift(xbt_queue_t queue, void* const dst) { - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); while (xbt_dynar_length(queue->data) == 0) { DEBUG1("Queue %p empty. Waiting",queue); xbt_cond_wait(queue->not_empty,queue->mutex); } xbt_dynar_shift(queue->data,dst); xbt_cond_signal(queue->not_full); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); } @@ -145,31 +144,31 @@ void xbt_queue_shift(xbt_queue_t queue, void* const dst) { * @see #xbt_queue_push */ void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) { - double timeout = xbt_time() + delay; + double begin = xbt_time(); xbt_ex_t e; - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); if (delay == 0) { if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0", queue,queue->capacity); } } else { while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data) && - (delay<0 || xbt_time() < timeout) ) { + (delay<0 || (xbt_time() - begin) <= delay) ) { DEBUG2("Capacity of %p exceded (=%d). Waiting", queue,queue->capacity); TRY { xbt_cond_timedwait(queue->not_full,queue->mutex, - delay < 0 ? -1 : timeout - xbt_time()); + delay < 0 ? -1 : delay - (xbt_time()-begin)); } CATCH(e) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); RETHROW; } } @@ -177,7 +176,7 @@ void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) { xbt_dynar_push(queue->data,src); xbt_cond_signal(queue->not_empty); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); } @@ -187,25 +186,25 @@ void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) { * */ void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) { - double timeout = xbt_time() + delay; + double begin = xbt_time(); xbt_ex_t e; - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); if (delay == 0) { if (xbt_dynar_length(queue->data) == 0) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); THROW0(timeout_error,0,"Delay = 0, and queue is empty"); } } else { while ( (xbt_dynar_length(queue->data) == 0) && - (delay<0 || xbt_time() < timeout) ) { + (delay<0 || (xbt_time() - begin) <= delay) ) { DEBUG1("Queue %p empty. Waiting",queue); TRY { xbt_cond_timedwait(queue->not_empty,queue->mutex, - delay<0 ? -1 : timeout - xbt_time()); + delay<0 ? -1 : delay - (xbt_time()-begin)); } CATCH(e) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); RETHROW; } } @@ -213,7 +212,7 @@ void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) { xbt_dynar_pop(queue->data,dst); xbt_cond_signal(queue->not_full); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); } /** @brief Unshift something to the message exchange queue, with a timeout. @@ -221,31 +220,31 @@ void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) { * @see #xbt_queue_unshift */ void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) { - double timeout = xbt_time() + delay; + double begin = xbt_time(); xbt_ex_t e; - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); if (delay==0) { if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0", queue,queue->capacity); } } else { while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data) && - (delay<0 || xbt_time() < timeout) ) { + (delay<0 || (xbt_time() - begin) <= delay) ) { DEBUG2("Capacity of %p exceded (=%d). Waiting", queue,queue->capacity); TRY { xbt_cond_timedwait(queue->not_full,queue->mutex, - delay < 0 ? -1 : timeout - xbt_time()); + delay < 0 ? -1 : delay - (xbt_time()-begin)); } CATCH(e) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); RETHROW; } } @@ -253,7 +252,7 @@ void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) { xbt_dynar_unshift(queue->data,src); xbt_cond_signal(queue->not_empty); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); } @@ -263,31 +262,36 @@ void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) { * */ void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) { - double timeout = xbt_time() + delay; + double begin = xbt_time(); xbt_ex_t e; - xbt_mutex_lock(queue->mutex); + xbt_mutex_acquire(queue->mutex); if (delay == 0) { if (xbt_dynar_length(queue->data) == 0) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); THROW0(timeout_error,0,"Delay = 0, and queue is empty"); } } else { while ( (xbt_dynar_length(queue->data) == 0) && - (delay<0 || xbt_time() < timeout) ) { + (delay<0 || (xbt_time() - begin) <= delay) ) { DEBUG1("Queue %p empty. Waiting",queue); TRY { xbt_cond_timedwait(queue->not_empty,queue->mutex, - delay<0 ? -1 : timeout - xbt_time()); + delay<0 ? -1 : delay - (xbt_time()-begin)); } CATCH(e) { - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); RETHROW; } } } + if (xbt_dynar_length(queue->data) == 0) { + xbt_mutex_release(queue->mutex); + THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay); + } + xbt_dynar_shift(queue->data,dst); xbt_cond_signal(queue->not_full); - xbt_mutex_unlock(queue->mutex); + xbt_mutex_release(queue->mutex); }