3 /* A (synchronized) message queue. */
4 /* Popping an empty queue is blocking, as well as pushing a full one */
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved. */
8 /* This program is free software; you can redistribute it and/or modify it
9 * under the terms of the license (GNU LGPL) which comes with this package. */
12 #include "xbt/sysdep.h"
14 #include "xbt/dynar.h"
16 #include "xbt/synchro.h"
17 #include "xbt/queue.h" /* this module */
18 #include "gras/virtu.h"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
21 typedef struct s_xbt_queue_ {
25 xbt_cond_t not_full, not_empty;
28 /** @brief Create a new message exchange queue.
30 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
33 xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
34 xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
35 xbt_assert0(capacity>=0,"Capacity cannot be negative");
37 res->capacity = capacity;
38 res->data = xbt_dynar_new(elm_size,NULL);
39 res->mutex = xbt_mutex_init();
40 res->not_full = xbt_cond_init();
41 res->not_empty = xbt_cond_init();
44 /** @brief Destroy a message exchange queue.
46 * Any remaining content is leaked.
48 void xbt_queue_free(xbt_queue_t *queue) {
50 xbt_dynar_free(&( (*queue)->data ));
51 xbt_mutex_destroy( (*queue)->mutex );
52 xbt_cond_destroy( (*queue)->not_full );
53 xbt_cond_destroy( (*queue)->not_empty );
58 /** @brief Get the queue size */
59 unsigned long xbt_queue_length(const xbt_queue_t queue) {
61 xbt_mutex_acquire(queue->mutex);
62 res=xbt_dynar_length(queue->data);
63 xbt_mutex_release(queue->mutex);
67 /** @brief Push something to the message exchange queue.
69 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
71 * @see #xbt_dynar_push
73 void xbt_queue_push(xbt_queue_t queue, const void *src) {
74 xbt_mutex_acquire(queue->mutex);
75 while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
76 DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
77 xbt_cond_wait(queue->not_full,queue->mutex);
79 xbt_dynar_push(queue->data,src);
80 xbt_cond_signal(queue->not_empty);
81 xbt_mutex_release(queue->mutex);
85 /** @brief Pop something from the message exchange queue.
87 * This is blocking if the queue is empty.
92 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
93 xbt_mutex_acquire(queue->mutex);
94 while (xbt_dynar_length(queue->data) == 0) {
95 DEBUG1("Queue %p empty. Waiting",queue);
96 xbt_cond_wait(queue->not_empty,queue->mutex);
98 xbt_dynar_pop(queue->data,dst);
99 xbt_cond_signal(queue->not_full);
100 xbt_mutex_release(queue->mutex);
103 /** @brief Unshift something to the message exchange queue.
105 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
107 * @see #xbt_dynar_unshift
109 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
110 xbt_mutex_acquire(queue->mutex);
111 while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
112 DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
113 xbt_cond_wait(queue->not_full,queue->mutex);
115 xbt_dynar_unshift(queue->data,src);
116 xbt_cond_signal(queue->not_empty);
117 xbt_mutex_release(queue->mutex);
121 /** @brief Shift something from the message exchange queue.
123 * This is blocking if the queue is empty.
125 * @see #xbt_dynar_shift
128 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
129 xbt_mutex_acquire(queue->mutex);
130 while (xbt_dynar_length(queue->data) == 0) {
131 DEBUG1("Queue %p empty. Waiting",queue);
132 xbt_cond_wait(queue->not_empty,queue->mutex);
134 xbt_dynar_shift(queue->data,dst);
135 xbt_cond_signal(queue->not_full);
136 xbt_mutex_release(queue->mutex);
142 /** @brief Push something to the message exchange queue, with a timeout.
144 * @see #xbt_queue_push
146 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
147 double begin = xbt_time();
150 xbt_mutex_acquire(queue->mutex);
153 if (queue->capacity != 0 &&
154 queue->capacity == xbt_dynar_length(queue->data)) {
156 xbt_mutex_release(queue->mutex);
157 THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
158 queue,queue->capacity);
161 while (queue->capacity != 0 &&
162 queue->capacity == xbt_dynar_length(queue->data) &&
163 (delay<0 || (xbt_time() - begin) <= delay) ) {
165 DEBUG2("Capacity of %p exceded (=%d). Waiting",
166 queue,queue->capacity);
168 xbt_cond_timedwait(queue->not_full,queue->mutex,
169 delay < 0 ? -1 : delay - (xbt_time()-begin));
171 xbt_mutex_release(queue->mutex);
177 xbt_dynar_push(queue->data,src);
178 xbt_cond_signal(queue->not_empty);
179 xbt_mutex_release(queue->mutex);
183 /** @brief Pop something from the message exchange queue, with a timeout.
185 * @see #xbt_queue_pop
188 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
189 double begin = xbt_time();
192 xbt_mutex_acquire(queue->mutex);
195 if (xbt_dynar_length(queue->data) == 0) {
196 xbt_mutex_release(queue->mutex);
197 THROW0(timeout_error,0,"Delay = 0, and queue is empty");
200 while ( (xbt_dynar_length(queue->data) == 0) &&
201 (delay<0 || (xbt_time() - begin) <= delay) ) {
202 DEBUG1("Queue %p empty. Waiting",queue);
204 xbt_cond_timedwait(queue->not_empty,queue->mutex,
205 delay<0 ? -1 : delay - (xbt_time()-begin));
207 xbt_mutex_release(queue->mutex);
213 xbt_dynar_pop(queue->data,dst);
214 xbt_cond_signal(queue->not_full);
215 xbt_mutex_release(queue->mutex);
218 /** @brief Unshift something to the message exchange queue, with a timeout.
220 * @see #xbt_queue_unshift
222 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
223 double begin = xbt_time();
226 xbt_mutex_acquire(queue->mutex);
229 if (queue->capacity != 0 &&
230 queue->capacity == xbt_dynar_length(queue->data)) {
232 xbt_mutex_release(queue->mutex);
233 THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
234 queue,queue->capacity);
237 while (queue->capacity != 0 &&
238 queue->capacity == xbt_dynar_length(queue->data) &&
239 (delay<0 || (xbt_time() - begin) <= delay) ) {
241 DEBUG2("Capacity of %p exceded (=%d). Waiting",
242 queue,queue->capacity);
244 xbt_cond_timedwait(queue->not_full,queue->mutex,
245 delay < 0 ? -1 : delay - (xbt_time()-begin));
247 xbt_mutex_release(queue->mutex);
253 xbt_dynar_unshift(queue->data,src);
254 xbt_cond_signal(queue->not_empty);
255 xbt_mutex_release(queue->mutex);
259 /** @brief Shift something from the message exchange queue, with a timeout.
261 * @see #xbt_queue_shift
264 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
265 double begin = xbt_time();
268 xbt_mutex_acquire(queue->mutex);
271 if (xbt_dynar_length(queue->data) == 0) {
272 xbt_mutex_release(queue->mutex);
273 THROW0(timeout_error,0,"Delay = 0, and queue is empty");
276 while ( (xbt_dynar_length(queue->data) == 0) &&
277 (delay<0 || (xbt_time() - begin) <= delay) ) {
278 DEBUG1("Queue %p empty. Waiting",queue);
280 xbt_cond_timedwait(queue->not_empty,queue->mutex,
281 delay<0 ? -1 : delay - (xbt_time()-begin));
283 xbt_mutex_release(queue->mutex);
289 if (xbt_dynar_length(queue->data) == 0) {
290 xbt_mutex_release(queue->mutex);
291 THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay);
294 xbt_dynar_shift(queue->data,dst);
295 xbt_cond_signal(queue->not_full);
296 xbt_mutex_release(queue->mutex);