#include "xbt/synchro.h"
#include "xbt/queue.h" /* this module */
-
+#include "gras/virtu.h"
XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
typedef struct s_xbt_queue_ {
* @seealso #xbt_queue_push
*/
void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
+ double timeout = xbt_os_time() + delay;
xbt_mutex_lock(queue->mutex);
if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
xbt_cond_timedwait(queue->not_full,queue->mutex, delay);
+ /* check if a timeout occurs */
+ if (xbt_os_time() >= timeout) {
+ xbt_mutex_unlock(queue->mutex);
+ THROW0(timeout_error,0,"Timeout");
+ }
}
- /* check if a timeout occurs */
- if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
- xbt_mutex_unlock(queue->mutex);
- THROW0(timeout_error,0,"Timeout");
- }
- else {
- xbt_dynar_push(queue->data,src);
- xbt_cond_signal(queue->not_empty);
- xbt_mutex_unlock(queue->mutex);
- }
+ xbt_dynar_push(queue->data,src);
+ xbt_cond_signal(queue->not_empty);
+ xbt_mutex_unlock(queue->mutex);
}
*
*/
void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
+ double timeout = xbt_os_time() + delay;
xbt_mutex_lock(queue->mutex);
if (xbt_dynar_length(queue->data) == 0) {
DEBUG1("Queue %p empty. Waiting",queue);
xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
+ /* check if a timeout occurs */
+ if (xbt_os_time() >= timeout) {
+ xbt_mutex_unlock(queue->mutex);
+ THROW0(timeout_error,0,"Timeout");
+ }
}
- /* check if a timeout occurs */
- if (xbt_dynar_length(queue->data) == 0) {
- xbt_mutex_unlock(queue->mutex);
- THROW0(timeout_error,0,"Timeout");
- }
- else {
- xbt_dynar_pop(queue->data,dst);
- xbt_cond_signal(queue->not_full);
- xbt_mutex_unlock(queue->mutex);
- }
+ xbt_dynar_pop(queue->data,dst);
+ xbt_cond_signal(queue->not_full);
+ xbt_mutex_unlock(queue->mutex);
}
/** @brief Unshift something to the message exchange queue, with a timeout.
* @seealso #xbt_queue_unshift
*/
void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
+ double timeout = xbt_os_time() + delay;
xbt_mutex_lock(queue->mutex);
if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
xbt_cond_timedwait(queue->not_full,queue->mutex,delay);
+ /* check if a timeout occurs */
+ if (xbt_os_time() >= timeout) {
+ xbt_mutex_unlock(queue->mutex);
+ THROW0(timeout_error,0,"Timeout");
+ }
}
- /* check if a timeout occurs */
-
- if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
-
- xbt_mutex_unlock(queue->mutex);
- THROW0(timeout_error,0,"Timeout");
- }
- else {
- xbt_dynar_unshift(queue->data,src);
- xbt_cond_signal(queue->not_empty);
- xbt_mutex_unlock(queue->mutex);
- }
+ xbt_dynar_unshift(queue->data,src);
+ xbt_cond_signal(queue->not_empty);
+ xbt_mutex_unlock(queue->mutex);
}
*
*/
void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
+ double timeout = xbt_os_time() + delay;
xbt_mutex_lock(queue->mutex);
- if (xbt_dynar_length(queue->data) == 0) {
+ while (xbt_dynar_length(queue->data) == 0) {
DEBUG1("Queue %p empty. Waiting",queue);
xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
+ /* check if a timeout occurs */
+ if (xbt_os_time() >= timeout) {
+ xbt_mutex_unlock(queue->mutex);
+ THROW0(timeout_error,0,"Timeout");
+ }
}
- /* check if a timeout occurs */
- if (xbt_dynar_length(queue->data) == 0) {
- xbt_mutex_unlock(queue->mutex);
- THROW0(timeout_error,0,"Timeout");
- }
- else {
- xbt_dynar_shift(queue->data,dst);
- xbt_cond_signal(queue->not_full);
- xbt_mutex_unlock(queue->mutex);
- }
+ xbt_dynar_shift(queue->data,dst);
+ xbt_cond_signal(queue->not_full);
+ xbt_mutex_unlock(queue->mutex);
}