Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
b609da3b3bc88b0e2cdeaba6d9a0141c14860624
[simgrid.git] / src / xbt / xbt_queue.c
1 /* A (synchronized) message queue.                                          */
2 /* Popping an empty queue is blocking, as well as pushing a full one        */
3
4 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
5
6 /* This program is free software; you can redistribute it and/or modify it
7  * under the terms of the license (GNU LGPL) which comes with this package. */
8
9 #include "xbt/misc.h"
10 #include "xbt/sysdep.h"
11 #include "xbt/log.h"
12 #include "xbt/dynar.h"
13
14 #include "xbt/synchro.h"
15 #include "xbt/queue.h"          /* this module */
16 #include "gras/virtu.h"
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt, "Message exchanging queue");
18
19 typedef struct s_xbt_queue_ {
20   int capacity;
21   xbt_dynar_t data;
22   xbt_mutex_t mutex;
23   xbt_cond_t not_full, not_empty;
24 } s_xbt_queue_t;
25
26 /** @brief Create a new message exchange queue.
27  *
28  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
29  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
30  */
31 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
32 {
33   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
34   xbt_assert0(capacity >= 0, "Capacity cannot be negative");
35
36   res->capacity = capacity;
37   res->data = xbt_dynar_new(elm_size, NULL);
38   res->mutex = xbt_mutex_init();
39   res->not_full = xbt_cond_init();
40   res->not_empty = xbt_cond_init();
41   return res;
42 }
43
44 /** @brief Destroy a message exchange queue.
45  *
46  * Any remaining content is leaked.
47  */
48 void xbt_queue_free(xbt_queue_t * queue)
49 {
50
51   xbt_dynar_free(&((*queue)->data));
52   xbt_mutex_destroy((*queue)->mutex);
53   xbt_cond_destroy((*queue)->not_full);
54   xbt_cond_destroy((*queue)->not_empty);
55   free((*queue));
56   *queue = NULL;
57 }
58
59 /** @brief Get the queue size */
60 unsigned long xbt_queue_length(const xbt_queue_t queue)
61 {
62   unsigned long res;
63   xbt_mutex_acquire(queue->mutex);
64   res = xbt_dynar_length(queue->data);
65   xbt_mutex_release(queue->mutex);
66   return res;
67 }
68
69 /** @brief Push something to the message exchange queue.
70  *
71  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
72  *
73  * @see #xbt_dynar_push
74  */
75 void xbt_queue_push(xbt_queue_t queue, const void *src)
76 {
77   xbt_mutex_acquire(queue->mutex);
78   while (queue->capacity != 0
79          && queue->capacity == xbt_dynar_length(queue->data)) {
80     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
81     xbt_cond_wait(queue->not_full, queue->mutex);
82   }
83   xbt_dynar_push(queue->data, src);
84   xbt_cond_signal(queue->not_empty);
85   xbt_mutex_release(queue->mutex);
86 }
87
88
89 /** @brief Pop something from the message exchange queue.
90  *
91  * This is blocking if the queue is empty.
92  *
93  * @see #xbt_dynar_pop
94  *
95  */
96 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
97 {
98   xbt_mutex_acquire(queue->mutex);
99   while (xbt_dynar_length(queue->data) == 0) {
100     DEBUG1("Queue %p empty. Waiting", queue);
101     xbt_cond_wait(queue->not_empty, queue->mutex);
102   }
103   xbt_dynar_pop(queue->data, dst);
104   xbt_cond_signal(queue->not_full);
105   xbt_mutex_release(queue->mutex);
106 }
107
108 /** @brief Unshift something to the message exchange queue.
109  *
110  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
111  *
112  * @see #xbt_dynar_unshift
113  */
114 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
115 {
116   xbt_mutex_acquire(queue->mutex);
117   while (queue->capacity != 0
118          && queue->capacity == xbt_dynar_length(queue->data)) {
119     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
120     xbt_cond_wait(queue->not_full, queue->mutex);
121   }
122   xbt_dynar_unshift(queue->data, src);
123   xbt_cond_signal(queue->not_empty);
124   xbt_mutex_release(queue->mutex);
125 }
126
127
128 /** @brief Shift something from the message exchange queue.
129  *
130  * This is blocking if the queue is empty.
131  *
132  * @see #xbt_dynar_shift
133  *
134  */
135 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
136 {
137   xbt_mutex_acquire(queue->mutex);
138   while (xbt_dynar_length(queue->data) == 0) {
139     DEBUG1("Queue %p empty. Waiting", queue);
140     xbt_cond_wait(queue->not_empty, queue->mutex);
141   }
142   xbt_dynar_shift(queue->data, dst);
143   xbt_cond_signal(queue->not_full);
144   xbt_mutex_release(queue->mutex);
145 }
146
147
148
149
150 /** @brief Push something to the message exchange queue, with a timeout.
151  *
152  * @see #xbt_queue_push
153  */
154 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
155 {
156   double begin = xbt_time();
157   xbt_ex_t e;
158
159   xbt_mutex_acquire(queue->mutex);
160
161   if (delay == 0) {
162     if (queue->capacity != 0 &&
163         queue->capacity == xbt_dynar_length(queue->data)) {
164
165       xbt_mutex_release(queue->mutex);
166       THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
167              queue, queue->capacity);
168     }
169   } else {
170     while (queue->capacity != 0 &&
171            queue->capacity == xbt_dynar_length(queue->data) &&
172            (delay < 0 || (xbt_time() - begin) <= delay)) {
173
174       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
175       TRY {
176         xbt_cond_timedwait(queue->not_full, queue->mutex,
177                            delay < 0 ? -1 : delay - (xbt_time() - begin));
178       }
179       CATCH(e) {
180         xbt_mutex_release(queue->mutex);
181         RETHROW;
182       }
183     }
184   }
185
186   xbt_dynar_push(queue->data, src);
187   xbt_cond_signal(queue->not_empty);
188   xbt_mutex_release(queue->mutex);
189 }
190
191
192 /** @brief Pop something from the message exchange queue, with a timeout.
193  *
194  * @see #xbt_queue_pop
195  *
196  */
197 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
198 {
199   double begin = xbt_time();
200   xbt_ex_t e;
201
202   xbt_mutex_acquire(queue->mutex);
203
204   if (delay == 0) {
205     if (xbt_dynar_length(queue->data) == 0) {
206       xbt_mutex_release(queue->mutex);
207       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
208     }
209   } else {
210     while ((xbt_dynar_length(queue->data) == 0) &&
211            (delay < 0 || (xbt_time() - begin) <= delay)) {
212       DEBUG1("Queue %p empty. Waiting", queue);
213       TRY {
214         xbt_cond_timedwait(queue->not_empty, queue->mutex,
215                            delay < 0 ? -1 : delay - (xbt_time() - begin));
216       }
217       CATCH(e) {
218         xbt_mutex_release(queue->mutex);
219         RETHROW;
220       }
221     }
222   }
223
224   xbt_dynar_pop(queue->data, dst);
225   xbt_cond_signal(queue->not_full);
226   xbt_mutex_release(queue->mutex);
227 }
228
229 /** @brief Unshift something to the message exchange queue, with a timeout.
230  *
231  * @see #xbt_queue_unshift
232  */
233 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src, double delay)
234 {
235   double begin = xbt_time();
236   xbt_ex_t e;
237
238   xbt_mutex_acquire(queue->mutex);
239
240   if (delay == 0) {
241     if (queue->capacity != 0 &&
242         queue->capacity == xbt_dynar_length(queue->data)) {
243
244       xbt_mutex_release(queue->mutex);
245       THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
246              queue, queue->capacity);
247     }
248   } else {
249     while (queue->capacity != 0 &&
250            queue->capacity == xbt_dynar_length(queue->data) &&
251            (delay < 0 || (xbt_time() - begin) <= delay)) {
252
253       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
254       TRY {
255         xbt_cond_timedwait(queue->not_full, queue->mutex,
256                            delay < 0 ? -1 : delay - (xbt_time() - begin));
257       }
258       CATCH(e) {
259         xbt_mutex_release(queue->mutex);
260         RETHROW;
261       }
262     }
263   }
264
265   xbt_dynar_unshift(queue->data, src);
266   xbt_cond_signal(queue->not_empty);
267   xbt_mutex_release(queue->mutex);
268 }
269
270
271 /** @brief Shift something from the message exchange queue, with a timeout.
272  *
273  * @see #xbt_queue_shift
274  *
275  */
276 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst, double delay)
277 {
278   double begin = xbt_time();
279   xbt_ex_t e;
280
281   xbt_mutex_acquire(queue->mutex);
282
283   if (delay == 0) {
284     if (xbt_dynar_length(queue->data) == 0) {
285       xbt_mutex_release(queue->mutex);
286       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
287     }
288   } else {
289     while ((xbt_dynar_length(queue->data) == 0) &&
290            (delay < 0 || (xbt_time() - begin) <= delay)) {
291       DEBUG1("Queue %p empty. Waiting", queue);
292       TRY {
293         xbt_cond_timedwait(queue->not_empty, queue->mutex,
294                            delay < 0 ? -1 : delay - (xbt_time() - begin));
295       }
296       CATCH(e) {
297         xbt_mutex_release(queue->mutex);
298         RETHROW;
299       }
300     }
301   }
302
303   if (xbt_dynar_length(queue->data) == 0) {
304     xbt_mutex_release(queue->mutex);
305     THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
306            delay);
307   }
308
309   xbt_dynar_shift(queue->data, dst);
310   xbt_cond_signal(queue->not_full);
311   xbt_mutex_release(queue->mutex);
312 }