1 /* A (synchronized) message queue. */
2 /* Popping an empty queue is blocking, as well as pushing a full one */
4 /* Copyright (c) 2007 Martin Quinson. All rights reserved. */
6 /* This program is free software; you can redistribute it and/or modify it
7 * under the terms of the license (GNU LGPL) which comes with this package. */
10 #include "xbt/sysdep.h"
12 #include "xbt/dynar.h"
14 #include "xbt/synchro.h"
15 #include "xbt/queue.h" /* this module */
16 #include "gras/virtu.h"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt, "Message exchanging queue");
19 typedef struct s_xbt_queue_ {
23 xbt_cond_t not_full, not_empty;
26 /** @brief Create a new message exchange queue.
28 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
29 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
31 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
33 xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
34 xbt_assert0(capacity >= 0, "Capacity cannot be negative");
36 res->capacity = capacity;
37 res->data = xbt_dynar_new(elm_size, NULL);
38 res->mutex = xbt_mutex_init();
39 res->not_full = xbt_cond_init();
40 res->not_empty = xbt_cond_init();
44 /** @brief Destroy a message exchange queue.
46 * Any remaining content is leaked.
48 void xbt_queue_free(xbt_queue_t * queue)
51 xbt_dynar_free(&((*queue)->data));
52 xbt_mutex_destroy((*queue)->mutex);
53 xbt_cond_destroy((*queue)->not_full);
54 xbt_cond_destroy((*queue)->not_empty);
59 /** @brief Get the queue size */
60 unsigned long xbt_queue_length(const xbt_queue_t queue)
63 xbt_mutex_acquire(queue->mutex);
64 res = xbt_dynar_length(queue->data);
65 xbt_mutex_release(queue->mutex);
69 /** @brief Push something to the message exchange queue.
71 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
73 * @see #xbt_dynar_push
75 void xbt_queue_push(xbt_queue_t queue, const void *src)
77 xbt_mutex_acquire(queue->mutex);
78 while (queue->capacity != 0
79 && queue->capacity == xbt_dynar_length(queue->data)) {
80 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
81 xbt_cond_wait(queue->not_full, queue->mutex);
83 xbt_dynar_push(queue->data, src);
84 xbt_cond_signal(queue->not_empty);
85 xbt_mutex_release(queue->mutex);
89 /** @brief Pop something from the message exchange queue.
91 * This is blocking if the queue is empty.
96 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
98 xbt_mutex_acquire(queue->mutex);
99 while (xbt_dynar_length(queue->data) == 0) {
100 DEBUG1("Queue %p empty. Waiting", queue);
101 xbt_cond_wait(queue->not_empty, queue->mutex);
103 xbt_dynar_pop(queue->data, dst);
104 xbt_cond_signal(queue->not_full);
105 xbt_mutex_release(queue->mutex);
108 /** @brief Unshift something to the message exchange queue.
110 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
112 * @see #xbt_dynar_unshift
114 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
116 xbt_mutex_acquire(queue->mutex);
117 while (queue->capacity != 0
118 && queue->capacity == xbt_dynar_length(queue->data)) {
119 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
120 xbt_cond_wait(queue->not_full, queue->mutex);
122 xbt_dynar_unshift(queue->data, src);
123 xbt_cond_signal(queue->not_empty);
124 xbt_mutex_release(queue->mutex);
128 /** @brief Shift something from the message exchange queue.
130 * This is blocking if the queue is empty.
132 * @see #xbt_dynar_shift
135 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
137 xbt_mutex_acquire(queue->mutex);
138 while (xbt_dynar_length(queue->data) == 0) {
139 DEBUG1("Queue %p empty. Waiting", queue);
140 xbt_cond_wait(queue->not_empty, queue->mutex);
142 xbt_dynar_shift(queue->data, dst);
143 xbt_cond_signal(queue->not_full);
144 xbt_mutex_release(queue->mutex);
150 /** @brief Push something to the message exchange queue, with a timeout.
152 * @see #xbt_queue_push
154 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
156 double begin = xbt_time();
159 xbt_mutex_acquire(queue->mutex);
162 if (queue->capacity != 0 &&
163 queue->capacity == xbt_dynar_length(queue->data)) {
165 xbt_mutex_release(queue->mutex);
166 THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
167 queue, queue->capacity);
170 while (queue->capacity != 0 &&
171 queue->capacity == xbt_dynar_length(queue->data) &&
172 (delay < 0 || (xbt_time() - begin) <= delay)) {
174 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
176 xbt_cond_timedwait(queue->not_full, queue->mutex,
177 delay < 0 ? -1 : delay - (xbt_time() - begin));
180 xbt_mutex_release(queue->mutex);
186 xbt_dynar_push(queue->data, src);
187 xbt_cond_signal(queue->not_empty);
188 xbt_mutex_release(queue->mutex);
192 /** @brief Pop something from the message exchange queue, with a timeout.
194 * @see #xbt_queue_pop
197 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
199 double begin = xbt_time();
202 xbt_mutex_acquire(queue->mutex);
205 if (xbt_dynar_length(queue->data) == 0) {
206 xbt_mutex_release(queue->mutex);
207 THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
210 while ((xbt_dynar_length(queue->data) == 0) &&
211 (delay < 0 || (xbt_time() - begin) <= delay)) {
212 DEBUG1("Queue %p empty. Waiting", queue);
214 xbt_cond_timedwait(queue->not_empty, queue->mutex,
215 delay < 0 ? -1 : delay - (xbt_time() - begin));
218 xbt_mutex_release(queue->mutex);
224 xbt_dynar_pop(queue->data, dst);
225 xbt_cond_signal(queue->not_full);
226 xbt_mutex_release(queue->mutex);
229 /** @brief Unshift something to the message exchange queue, with a timeout.
231 * @see #xbt_queue_unshift
233 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src, double delay)
235 double begin = xbt_time();
238 xbt_mutex_acquire(queue->mutex);
241 if (queue->capacity != 0 &&
242 queue->capacity == xbt_dynar_length(queue->data)) {
244 xbt_mutex_release(queue->mutex);
245 THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
246 queue, queue->capacity);
249 while (queue->capacity != 0 &&
250 queue->capacity == xbt_dynar_length(queue->data) &&
251 (delay < 0 || (xbt_time() - begin) <= delay)) {
253 DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
255 xbt_cond_timedwait(queue->not_full, queue->mutex,
256 delay < 0 ? -1 : delay - (xbt_time() - begin));
259 xbt_mutex_release(queue->mutex);
265 xbt_dynar_unshift(queue->data, src);
266 xbt_cond_signal(queue->not_empty);
267 xbt_mutex_release(queue->mutex);
271 /** @brief Shift something from the message exchange queue, with a timeout.
273 * @see #xbt_queue_shift
276 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst, double delay)
278 double begin = xbt_time();
281 xbt_mutex_acquire(queue->mutex);
284 if (xbt_dynar_length(queue->data) == 0) {
285 xbt_mutex_release(queue->mutex);
286 THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
289 while ((xbt_dynar_length(queue->data) == 0) &&
290 (delay < 0 || (xbt_time() - begin) <= delay)) {
291 DEBUG1("Queue %p empty. Waiting", queue);
293 xbt_cond_timedwait(queue->not_empty, queue->mutex,
294 delay < 0 ? -1 : delay - (xbt_time() - begin));
297 xbt_mutex_release(queue->mutex);
303 if (xbt_dynar_length(queue->data) == 0) {
304 xbt_mutex_release(queue->mutex);
305 THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
309 xbt_dynar_shift(queue->data, dst);
310 xbt_cond_signal(queue->not_full);
311 xbt_mutex_release(queue->mutex);