1 /* A (synchronized) message queue. */
2 /* Popping an empty queue is blocking, as well as pushing a full one */
4 /* Copyright (c) 2007-2014. The SimGrid Team.
5 * All rights reserved. */
7 /* This program is free software; you can redistribute it and/or modify it
8 * under the terms of the license (GNU LGPL) which comes with this package. */
11 #include "xbt/sysdep.h"
13 #include "xbt/dynar.h"
14 #include "xbt/synchro_core.h"
16 #include "xbt/queue.h" /* this module */
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
18 "Message exchanging queue");
20 typedef struct s_xbt_queue_ {
24 xbt_cond_t not_full, not_empty;
27 /** @brief Create a new message exchange queue.
29 * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
30 * @param elm_size size of each element stored in it (see #xbt_dynar_new)
32 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
34 xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
38 res->capacity = capacity;
39 res->data = xbt_dynar_new(elm_size, NULL);
40 res->mutex = xbt_mutex_init();
41 res->not_full = xbt_cond_init();
42 res->not_empty = xbt_cond_init();
46 /** @brief Destroy a message exchange queue.
48 * Any remaining content is leaked.
50 void xbt_queue_free(xbt_queue_t * queue)
53 xbt_dynar_free(&((*queue)->data));
54 xbt_mutex_destroy((*queue)->mutex);
55 xbt_cond_destroy((*queue)->not_full);
56 xbt_cond_destroy((*queue)->not_empty);
61 /** @brief Get the queue size */
62 unsigned long xbt_queue_length(const xbt_queue_t queue)
65 xbt_mutex_acquire(queue->mutex);
66 res = xbt_dynar_length(queue->data);
67 xbt_mutex_release(queue->mutex);
71 /** @brief Push something to the message exchange queue.
73 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
75 * @see #xbt_dynar_push
77 void xbt_queue_push(xbt_queue_t queue, const void *src)
79 xbt_mutex_acquire(queue->mutex);
80 while (queue->capacity != 0
81 && queue->capacity == xbt_dynar_length(queue->data)) {
82 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
84 xbt_cond_wait(queue->not_full, queue->mutex);
86 xbt_dynar_push(queue->data, src);
87 xbt_cond_signal(queue->not_empty);
88 xbt_mutex_release(queue->mutex);
92 /** @brief Pop something from the message exchange queue.
94 * This is blocking if the queue is empty.
99 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
101 xbt_mutex_acquire(queue->mutex);
102 while (xbt_dynar_is_empty(queue->data)) {
103 XBT_DEBUG("Queue %p empty. Waiting", queue);
104 xbt_cond_wait(queue->not_empty, queue->mutex);
106 xbt_dynar_pop(queue->data, dst);
107 xbt_cond_signal(queue->not_full);
108 xbt_mutex_release(queue->mutex);
111 /** @brief Unshift something to the message exchange queue.
113 * This is blocking if the declared capacity is non-nul, and if this amount is reached.
115 * @see #xbt_dynar_unshift
117 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
119 xbt_mutex_acquire(queue->mutex);
120 while (queue->capacity != 0
121 && queue->capacity == xbt_dynar_length(queue->data)) {
122 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
124 xbt_cond_wait(queue->not_full, queue->mutex);
126 xbt_dynar_unshift(queue->data, src);
127 xbt_cond_signal(queue->not_empty);
128 xbt_mutex_release(queue->mutex);
132 /** @brief Shift something from the message exchange queue.
134 * This is blocking if the queue is empty.
136 * @see #xbt_dynar_shift
139 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
141 xbt_mutex_acquire(queue->mutex);
142 while (xbt_dynar_is_empty(queue->data)) {
143 XBT_DEBUG("Queue %p empty. Waiting", queue);
144 xbt_cond_wait(queue->not_empty, queue->mutex);
146 xbt_dynar_shift(queue->data, dst);
147 xbt_cond_signal(queue->not_full);
148 xbt_mutex_release(queue->mutex);
154 /** @brief Push something to the message exchange queue, with a timeout.
156 * @see #xbt_queue_push
158 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
160 double begin = xbt_time();
162 xbt_mutex_acquire(queue->mutex);
165 if (queue->capacity != 0 &&
166 queue->capacity == xbt_dynar_length(queue->data)) {
168 xbt_mutex_release(queue->mutex);
169 THROWF(timeout_error, 0,
170 "Capacity of %p exceeded (=%d), and delay = 0", queue,
174 while (queue->capacity != 0 &&
175 queue->capacity == xbt_dynar_length(queue->data) &&
176 (delay < 0 || (xbt_time() - begin) <= delay)) {
178 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
181 xbt_cond_timedwait(queue->not_full, queue->mutex,
182 delay < 0 ? -1 : delay - (xbt_time() - begin));
185 xbt_mutex_release(queue->mutex);
191 xbt_dynar_push(queue->data, src);
192 xbt_cond_signal(queue->not_empty);
193 xbt_mutex_release(queue->mutex);
197 /** @brief Pop something from the message exchange queue, with a timeout.
199 * @see #xbt_queue_pop
202 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
204 double begin = xbt_time();
206 xbt_mutex_acquire(queue->mutex);
209 if (xbt_dynar_is_empty(queue->data)) {
210 xbt_mutex_release(queue->mutex);
211 THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
214 while ((xbt_dynar_is_empty(queue->data)) &&
215 (delay < 0 || (xbt_time() - begin) <= delay)) {
216 XBT_DEBUG("Queue %p empty. Waiting", queue);
218 xbt_cond_timedwait(queue->not_empty, queue->mutex,
219 delay < 0 ? -1 : delay - (xbt_time() - begin));
222 xbt_mutex_release(queue->mutex);
228 xbt_dynar_pop(queue->data, dst);
229 xbt_cond_signal(queue->not_full);
230 xbt_mutex_release(queue->mutex);
233 /** @brief Unshift something to the message exchange queue, with a timeout.
235 * @see #xbt_queue_unshift
237 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
240 double begin = xbt_time();
242 xbt_mutex_acquire(queue->mutex);
245 if (queue->capacity != 0 &&
246 queue->capacity == xbt_dynar_length(queue->data)) {
248 xbt_mutex_release(queue->mutex);
249 THROWF(timeout_error, 0,
250 "Capacity of %p exceeded (=%d), and delay = 0", queue,
254 while (queue->capacity != 0 &&
255 queue->capacity == xbt_dynar_length(queue->data) &&
256 (delay < 0 || (xbt_time() - begin) <= delay)) {
258 XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
261 xbt_cond_timedwait(queue->not_full, queue->mutex,
262 delay < 0 ? -1 : delay - (xbt_time() - begin));
265 xbt_mutex_release(queue->mutex);
271 xbt_dynar_unshift(queue->data, src);
272 xbt_cond_signal(queue->not_empty);
273 xbt_mutex_release(queue->mutex);
277 /** @brief Shift something from the message exchange queue, with a timeout.
279 * @see #xbt_queue_shift
282 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
285 double begin = xbt_time();
287 xbt_mutex_acquire(queue->mutex);
290 if (xbt_dynar_is_empty(queue->data)) {
291 xbt_mutex_release(queue->mutex);
292 THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
295 while ((xbt_dynar_is_empty(queue->data)) &&
296 (delay < 0 || (xbt_time() - begin) <= delay)) {
297 XBT_DEBUG("Queue %p empty. Waiting", queue);
299 xbt_cond_timedwait(queue->not_empty, queue->mutex,
300 delay < 0 ? -1 : delay - (xbt_time() - begin));
303 xbt_mutex_release(queue->mutex);
309 if (xbt_dynar_is_empty(queue->data)) {
310 xbt_mutex_release(queue->mutex);
311 THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
315 xbt_dynar_shift(queue->data, dst);
316 xbt_cond_signal(queue->not_full);
317 xbt_mutex_release(queue->mutex);