X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/6713c5277547eab779ccbac88e16fb15ab0b39f9..3676269c65359df0174fd13b6c11687992437df8:/src/xbt/xbt_queue.c diff --git a/src/xbt/xbt_queue.c b/src/xbt/xbt_queue.c index d3bc436de0..6dc3c95f80 100644 --- a/src/xbt/xbt_queue.c +++ b/src/xbt/xbt_queue.c @@ -1,9 +1,8 @@ -/* $Id$ */ - /* A (synchronized) message queue. */ /* Popping an empty queue is blocking, as well as pushing a full one */ -/* Copyright (c) 2007 Martin Quinson. All rights reserved. */ +/* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team. + * All rights reserved. */ /* This program is free software; you can redistribute it and/or modify it * under the terms of the license (GNU LGPL) which comes with this package. */ @@ -11,288 +10,308 @@ #include "xbt/misc.h" #include "xbt/sysdep.h" #include "xbt/log.h" -//#include "xbt/ex.h" #include "xbt/dynar.h" -#include "xbt/synchro.h" -#include "xbt/queue.h" /* this module */ -#include "gras/virtu.h" -XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue"); +#include "xbt/queue.h" /* this module */ +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt, + "Message exchanging queue"); typedef struct s_xbt_queue_ { - int capacity; - xbt_dynar_t data; - xbt_mutex_t mutex; - xbt_cond_t not_full, not_empty; + int capacity; + xbt_dynar_t data; + xbt_mutex_t mutex; + xbt_cond_t not_full, not_empty; } s_xbt_queue_t; /** @brief Create a new message exchange queue. - * + * * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data * @param elm_size size of each element stored in it (see #xbt_dynar_new) */ -xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) { - xbt_queue_t res = xbt_new0(s_xbt_queue_t,1); - xbt_assert0(capacity>=0,"Capacity cannot be negative"); - - res->capacity = capacity; - res->data = xbt_dynar_new(elm_size,NULL); - res->mutex = xbt_mutex_init(); - res->not_full = xbt_cond_init(); - res->not_empty = xbt_cond_init(); - return res; +xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size) +{ + xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1); + if (capacity<0) + capacity=0; + + res->capacity = capacity; + res->data = xbt_dynar_new(elm_size, NULL); + res->mutex = xbt_mutex_init(); + res->not_full = xbt_cond_init(); + res->not_empty = xbt_cond_init(); + return res; } + /** @brief Destroy a message exchange queue. * * Any remaining content is leaked. */ -void xbt_queue_free(xbt_queue_t *queue) { - - xbt_dynar_free(&( (*queue)->data )); - xbt_mutex_destroy( (*queue)->mutex ); - xbt_cond_destroy( (*queue)->not_full ); - xbt_cond_destroy( (*queue)->not_empty ); - free((*queue)); - *queue = NULL; +void xbt_queue_free(xbt_queue_t * queue) +{ + + xbt_dynar_free(&((*queue)->data)); + xbt_mutex_destroy((*queue)->mutex); + xbt_cond_destroy((*queue)->not_full); + xbt_cond_destroy((*queue)->not_empty); + free(*queue); + *queue = NULL; } /** @brief Get the queue size */ -unsigned long xbt_queue_length(const xbt_queue_t queue) { - unsigned long res; - xbt_mutex_acquire(queue->mutex); - res=xbt_dynar_length(queue->data); - xbt_mutex_release(queue->mutex); - return res; +unsigned long xbt_queue_length(const xbt_queue_t queue) +{ + unsigned long res; + xbt_mutex_acquire(queue->mutex); + res = xbt_dynar_length(queue->data); + xbt_mutex_release(queue->mutex); + return res; } /** @brief Push something to the message exchange queue. - * + * * This is blocking if the declared capacity is non-nul, and if this amount is reached. - * + * * @see #xbt_dynar_push */ -void xbt_queue_push(xbt_queue_t queue, const void *src) { - xbt_mutex_acquire(queue->mutex); - while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity); - xbt_cond_wait(queue->not_full,queue->mutex); - } - xbt_dynar_push(queue->data,src); - xbt_cond_signal(queue->not_empty); - xbt_mutex_release(queue->mutex); +void xbt_queue_push(xbt_queue_t queue, const void *src) +{ + xbt_mutex_acquire(queue->mutex); + while (queue->capacity != 0 + && queue->capacity == xbt_dynar_length(queue->data)) { + XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue, + queue->capacity); + xbt_cond_wait(queue->not_full, queue->mutex); + } + xbt_dynar_push(queue->data, src); + xbt_cond_signal(queue->not_empty); + xbt_mutex_release(queue->mutex); } - + /** @brief Pop something from the message exchange queue. - * + * * This is blocking if the queue is empty. - * + * * @see #xbt_dynar_pop - * + * */ -void xbt_queue_pop(xbt_queue_t queue, void* const dst) { - xbt_mutex_acquire(queue->mutex); - while (xbt_dynar_length(queue->data) == 0) { - DEBUG1("Queue %p empty. Waiting",queue); - xbt_cond_wait(queue->not_empty,queue->mutex); - } - xbt_dynar_pop(queue->data,dst); - xbt_cond_signal(queue->not_full); - xbt_mutex_release(queue->mutex); +void xbt_queue_pop(xbt_queue_t queue, void *const dst) +{ + xbt_mutex_acquire(queue->mutex); + while (xbt_dynar_is_empty(queue->data)) { + XBT_DEBUG("Queue %p empty. Waiting", queue); + xbt_cond_wait(queue->not_empty, queue->mutex); + } + xbt_dynar_pop(queue->data, dst); + xbt_cond_signal(queue->not_full); + xbt_mutex_release(queue->mutex); } /** @brief Unshift something to the message exchange queue. - * + * * This is blocking if the declared capacity is non-nul, and if this amount is reached. - * + * * @see #xbt_dynar_unshift */ -void xbt_queue_unshift(xbt_queue_t queue, const void *src) { - xbt_mutex_acquire(queue->mutex); - while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) { - DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity); - xbt_cond_wait(queue->not_full,queue->mutex); - } - xbt_dynar_unshift(queue->data,src); - xbt_cond_signal(queue->not_empty); - xbt_mutex_release(queue->mutex); +void xbt_queue_unshift(xbt_queue_t queue, const void *src) +{ + xbt_mutex_acquire(queue->mutex); + while (queue->capacity != 0 + && queue->capacity == xbt_dynar_length(queue->data)) { + XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue, + queue->capacity); + xbt_cond_wait(queue->not_full, queue->mutex); + } + xbt_dynar_unshift(queue->data, src); + xbt_cond_signal(queue->not_empty); + xbt_mutex_release(queue->mutex); } - + /** @brief Shift something from the message exchange queue. - * + * * This is blocking if the queue is empty. - * + * * @see #xbt_dynar_shift - * + * */ -void xbt_queue_shift(xbt_queue_t queue, void* const dst) { - xbt_mutex_acquire(queue->mutex); - while (xbt_dynar_length(queue->data) == 0) { - DEBUG1("Queue %p empty. Waiting",queue); - xbt_cond_wait(queue->not_empty,queue->mutex); - } - xbt_dynar_shift(queue->data,dst); - xbt_cond_signal(queue->not_full); - xbt_mutex_release(queue->mutex); +void xbt_queue_shift(xbt_queue_t queue, void *const dst) +{ + xbt_mutex_acquire(queue->mutex); + while (xbt_dynar_is_empty(queue->data)) { + XBT_DEBUG("Queue %p empty. Waiting", queue); + xbt_cond_wait(queue->not_empty, queue->mutex); + } + xbt_dynar_shift(queue->data, dst); + xbt_cond_signal(queue->not_full); + xbt_mutex_release(queue->mutex); } /** @brief Push something to the message exchange queue, with a timeout. - * + * * @see #xbt_queue_push */ -void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) { - double timeout = xbt_time() + delay; - xbt_ex_t e; +void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay) +{ + double begin = xbt_time(); xbt_mutex_acquire(queue->mutex); if (delay == 0) { - if (queue->capacity != 0 && - queue->capacity == xbt_dynar_length(queue->data)) { + if (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data)) { xbt_mutex_release(queue->mutex); - THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0", - queue,queue->capacity); + THROWF(timeout_error, 0, + "Capacity of %p exceeded (=%d), and delay = 0", queue, + queue->capacity); } } else { - while (queue->capacity != 0 && - queue->capacity == xbt_dynar_length(queue->data) && - (delay<0 || xbt_time() < timeout) ) { - - DEBUG2("Capacity of %p exceded (=%d). Waiting", - queue,queue->capacity); + while (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data) && + (delay < 0 || (xbt_time() - begin) <= delay)) { + + XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue, + queue->capacity); TRY { - xbt_cond_timedwait(queue->not_full,queue->mutex, - delay < 0 ? -1 : timeout - xbt_time()); - } CATCH(e) { - xbt_mutex_release(queue->mutex); - RETHROW; + xbt_cond_timedwait(queue->not_full, queue->mutex, + delay < 0 ? -1 : delay - (xbt_time() - begin)); + } + CATCH_ANONYMOUS { + xbt_mutex_release(queue->mutex); + RETHROW; } } } - xbt_dynar_push(queue->data,src); + xbt_dynar_push(queue->data, src); xbt_cond_signal(queue->not_empty); xbt_mutex_release(queue->mutex); } - + /** @brief Pop something from the message exchange queue, with a timeout. - * + * * @see #xbt_queue_pop - * + * */ -void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) { - double timeout = xbt_time() + delay; - xbt_ex_t e; +void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay) +{ + double begin = xbt_time(); xbt_mutex_acquire(queue->mutex); if (delay == 0) { - if (xbt_dynar_length(queue->data) == 0) { + if (xbt_dynar_is_empty(queue->data)) { xbt_mutex_release(queue->mutex); - THROW0(timeout_error,0,"Delay = 0, and queue is empty"); + THROWF(timeout_error, 0, "Delay = 0, and queue is empty"); } } else { - while ( (xbt_dynar_length(queue->data) == 0) && - (delay<0 || xbt_time() < timeout) ) { - DEBUG1("Queue %p empty. Waiting",queue); + while ((xbt_dynar_is_empty(queue->data)) && + (delay < 0 || (xbt_time() - begin) <= delay)) { + XBT_DEBUG("Queue %p empty. Waiting", queue); TRY { - xbt_cond_timedwait(queue->not_empty,queue->mutex, - delay<0 ? -1 : timeout - xbt_time()); - } CATCH(e) { - xbt_mutex_release(queue->mutex); - RETHROW; + xbt_cond_timedwait(queue->not_empty, queue->mutex, + delay < 0 ? -1 : delay - (xbt_time() - begin)); + } + CATCH_ANONYMOUS { + xbt_mutex_release(queue->mutex); + RETHROW; } } } - xbt_dynar_pop(queue->data,dst); + xbt_dynar_pop(queue->data, dst); xbt_cond_signal(queue->not_full); xbt_mutex_release(queue->mutex); } /** @brief Unshift something to the message exchange queue, with a timeout. - * + * * @see #xbt_queue_unshift */ -void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) { - double timeout = xbt_time() + delay; - xbt_ex_t e; +void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src, + double delay) +{ + double begin = xbt_time(); xbt_mutex_acquire(queue->mutex); - if (delay==0) { - if (queue->capacity != 0 && - queue->capacity == xbt_dynar_length(queue->data)) { + if (delay == 0) { + if (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data)) { xbt_mutex_release(queue->mutex); - THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0", - queue,queue->capacity); + THROWF(timeout_error, 0, + "Capacity of %p exceeded (=%d), and delay = 0", queue, + queue->capacity); } } else { - while (queue->capacity != 0 && - queue->capacity == xbt_dynar_length(queue->data) && - (delay<0 || xbt_time() < timeout) ) { - - DEBUG2("Capacity of %p exceded (=%d). Waiting", - queue,queue->capacity); + while (queue->capacity != 0 && + queue->capacity == xbt_dynar_length(queue->data) && + (delay < 0 || (xbt_time() - begin) <= delay)) { + + XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue, + queue->capacity); TRY { - xbt_cond_timedwait(queue->not_full,queue->mutex, - delay < 0 ? -1 : timeout - xbt_time()); - } CATCH(e) { - xbt_mutex_release(queue->mutex); - RETHROW; + xbt_cond_timedwait(queue->not_full, queue->mutex, + delay < 0 ? -1 : delay - (xbt_time() - begin)); + } + CATCH_ANONYMOUS { + xbt_mutex_release(queue->mutex); + RETHROW; } } } - xbt_dynar_unshift(queue->data,src); + xbt_dynar_unshift(queue->data, src); xbt_cond_signal(queue->not_empty); xbt_mutex_release(queue->mutex); } - + /** @brief Shift something from the message exchange queue, with a timeout. - * + * * @see #xbt_queue_shift - * + * */ -void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) { - double timeout = xbt_time() + delay; - xbt_ex_t e; +void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst, + double delay) +{ + double begin = xbt_time(); xbt_mutex_acquire(queue->mutex); if (delay == 0) { - if (xbt_dynar_length(queue->data) == 0) { + if (xbt_dynar_is_empty(queue->data)) { xbt_mutex_release(queue->mutex); - THROW0(timeout_error,0,"Delay = 0, and queue is empty"); + THROWF(timeout_error, 0, "Delay = 0, and queue is empty"); } } else { - while ( (xbt_dynar_length(queue->data) == 0) && - (delay<0 || xbt_time() < timeout) ) { - DEBUG1("Queue %p empty. Waiting",queue); + while ((xbt_dynar_is_empty(queue->data)) && + (delay < 0 || (xbt_time() - begin) <= delay)) { + XBT_DEBUG("Queue %p empty. Waiting", queue); TRY { - xbt_cond_timedwait(queue->not_empty,queue->mutex, - delay<0 ? -1 : timeout - xbt_time()); - } CATCH(e) { - xbt_mutex_release(queue->mutex); - RETHROW; + xbt_cond_timedwait(queue->not_empty, queue->mutex, + delay < 0 ? -1 : delay - (xbt_time() - begin)); + } + CATCH_ANONYMOUS { + xbt_mutex_release(queue->mutex); + RETHROW; } } } - if (xbt_dynar_length(queue->data) == 0) { - xbt_mutex_release(queue->mutex); - THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay); + if (xbt_dynar_is_empty(queue->data)) { + xbt_mutex_release(queue->mutex); + THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty", + delay); } - - xbt_dynar_shift(queue->data,dst); + + xbt_dynar_shift(queue->data, dst); xbt_cond_signal(queue->not_full); xbt_mutex_release(queue->mutex); }