3 /* A (synchronized) message queue. */
4 /* Popping an empty queue is blocking, as well as pushing a full one */
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved. */
8 /* This program is free software; you can redistribute it and/or modify it
9 * under the terms of the license (GNU LGPL) which comes with this package. */
12 #include "xbt/sysdep.h"
14 #include "xbt/dynar.h"
16 #include "xbt/synchro.h"
17 #include "xbt/queue.h" /* this module */
18 #include "gras/virtu.h"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt, "Message exchanging queue");
21 typedef struct s_xbt_queue_ {
25 xbt_cond_t not_full, not_empty;
28 /** @brief Create a new message exchange queue.
30 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
33 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
35 xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
36 xbt_assert0(capacity >= 0, "Capacity cannot be negative");
38 res->capacity = capacity;
39 res->data = xbt_dynar_new(elm_size, NULL);
40 res->mutex = xbt_mutex_init();
41 res->not_full = xbt_cond_init();
42 res->not_empty = xbt_cond_init();
46 /** @brief Destroy a message exchange queue.
48 * Any remaining content is leaked.
50 void xbt_queue_free(xbt_queue_t * queue)
53 xbt_dynar_free(&((*queue)->data));
54 xbt_mutex_destroy((*queue)->mutex);
55 xbt_cond_destroy((*queue)->not_full);
56 xbt_cond_destroy((*queue)->not_empty);
61 /** @brief Get the queue size */
62 unsigned long xbt_queue_length(const xbt_queue_t queue)
65 xbt_mutex_acquire(queue->mutex);
66 res = xbt_dynar_length(queue->data);
67 xbt_mutex_release(queue->mutex);
71 /** @brief Push something to the message exchange queue.
73 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
75 * @see #xbt_dynar_push
77 void xbt_queue_push(xbt_queue_t queue, const void *src)
79 xbt_mutex_acquire(queue->mutex);
80 while (queue->capacity != 0
81 && queue->capacity == xbt_dynar_length(queue->data)) {
82 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
83 xbt_cond_wait(queue->not_full, queue->mutex);
85 xbt_dynar_push(queue->data, src);
86 xbt_cond_signal(queue->not_empty);
87 xbt_mutex_release(queue->mutex);
91 /** @brief Pop something from the message exchange queue.
93 * This is blocking if the queue is empty.
98 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
100 xbt_mutex_acquire(queue->mutex);
101 while (xbt_dynar_length(queue->data) == 0) {
102 DEBUG1("Queue %p empty. Waiting", queue);
103 xbt_cond_wait(queue->not_empty, queue->mutex);
105 xbt_dynar_pop(queue->data, dst);
106 xbt_cond_signal(queue->not_full);
107 xbt_mutex_release(queue->mutex);
110 /** @brief Unshift something to the message exchange queue.
112 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
114 * @see #xbt_dynar_unshift
116 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
118 xbt_mutex_acquire(queue->mutex);
119 while (queue->capacity != 0
120 && queue->capacity == xbt_dynar_length(queue->data)) {
121 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
122 xbt_cond_wait(queue->not_full, queue->mutex);
124 xbt_dynar_unshift(queue->data, src);
125 xbt_cond_signal(queue->not_empty);
126 xbt_mutex_release(queue->mutex);
130 /** @brief Shift something from the message exchange queue.
132 * This is blocking if the queue is empty.
134 * @see #xbt_dynar_shift
137 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
139 xbt_mutex_acquire(queue->mutex);
140 while (xbt_dynar_length(queue->data) == 0) {
141 DEBUG1("Queue %p empty. Waiting", queue);
142 xbt_cond_wait(queue->not_empty, queue->mutex);
144 xbt_dynar_shift(queue->data, dst);
145 xbt_cond_signal(queue->not_full);
146 xbt_mutex_release(queue->mutex);
152 /** @brief Push something to the message exchange queue, with a timeout.
154 * @see #xbt_queue_push
156 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
158 double begin = xbt_time();
161 xbt_mutex_acquire(queue->mutex);
164 if (queue->capacity != 0 &&
165 queue->capacity == xbt_dynar_length(queue->data)) {
167 xbt_mutex_release(queue->mutex);
168 THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
169 queue, queue->capacity);
172 while (queue->capacity != 0 &&
173 queue->capacity == xbt_dynar_length(queue->data) &&
174 (delay < 0 || (xbt_time() - begin) <= delay)) {
176 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
178 xbt_cond_timedwait(queue->not_full, queue->mutex,
179 delay < 0 ? -1 : delay - (xbt_time() - begin));
182 xbt_mutex_release(queue->mutex);
188 xbt_dynar_push(queue->data, src);
189 xbt_cond_signal(queue->not_empty);
190 xbt_mutex_release(queue->mutex);
194 /** @brief Pop something from the message exchange queue, with a timeout.
196 * @see #xbt_queue_pop
199 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
201 double begin = xbt_time();
204 xbt_mutex_acquire(queue->mutex);
207 if (xbt_dynar_length(queue->data) == 0) {
208 xbt_mutex_release(queue->mutex);
209 THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
212 while ((xbt_dynar_length(queue->data) == 0) &&
213 (delay < 0 || (xbt_time() - begin) <= delay)) {
214 DEBUG1("Queue %p empty. Waiting", queue);
216 xbt_cond_timedwait(queue->not_empty, queue->mutex,
217 delay < 0 ? -1 : delay - (xbt_time() - begin));
220 xbt_mutex_release(queue->mutex);
226 xbt_dynar_pop(queue->data, dst);
227 xbt_cond_signal(queue->not_full);
228 xbt_mutex_release(queue->mutex);
231 /** @brief Unshift something to the message exchange queue, with a timeout.
233 * @see #xbt_queue_unshift
235 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src, double delay)
237 double begin = xbt_time();
240 xbt_mutex_acquire(queue->mutex);
243 if (queue->capacity != 0 &&
244 queue->capacity == xbt_dynar_length(queue->data)) {
246 xbt_mutex_release(queue->mutex);
247 THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
248 queue, queue->capacity);
251 while (queue->capacity != 0 &&
252 queue->capacity == xbt_dynar_length(queue->data) &&
253 (delay < 0 || (xbt_time() - begin) <= delay)) {
255 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
257 xbt_cond_timedwait(queue->not_full, queue->mutex,
258 delay < 0 ? -1 : delay - (xbt_time() - begin));
261 xbt_mutex_release(queue->mutex);
267 xbt_dynar_unshift(queue->data, src);
268 xbt_cond_signal(queue->not_empty);
269 xbt_mutex_release(queue->mutex);
273 /** @brief Shift something from the message exchange queue, with a timeout.
275 * @see #xbt_queue_shift
278 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst, double delay)
280 double begin = xbt_time();
283 xbt_mutex_acquire(queue->mutex);
286 if (xbt_dynar_length(queue->data) == 0) {
287 xbt_mutex_release(queue->mutex);
288 THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
291 while ((xbt_dynar_length(queue->data) == 0) &&
292 (delay < 0 || (xbt_time() - begin) <= delay)) {
293 DEBUG1("Queue %p empty. Waiting", queue);
295 xbt_cond_timedwait(queue->not_empty, queue->mutex,
296 delay < 0 ? -1 : delay - (xbt_time() - begin));
299 xbt_mutex_release(queue->mutex);
305 if (xbt_dynar_length(queue->data) == 0) {
306 xbt_mutex_release(queue->mutex);
307 THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
311 xbt_dynar_shift(queue->data, dst);
312 xbt_cond_signal(queue->not_full);
313 xbt_mutex_release(queue->mutex);