Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
f7c70edff4abc80b5161292593810e5d92b332fe
[simgrid.git] / src / xbt / xbt_queue.c
1 /* $Id$ */
2
3 /* A (synchronized) message queue.                                          */
4 /* Popping an empty queue is blocking, as well as pushing a full one        */
5
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/misc.h"
12 #include "xbt/sysdep.h"
13 #include "xbt/log.h"
14 #include "xbt/dynar.h"
15
16 #include "xbt/synchro.h"
17 #include "xbt/queue.h"          /* this module */
18 #include "gras/virtu.h"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt, "Message exchanging queue");
20
21 typedef struct s_xbt_queue_ {
22   int capacity;
23   xbt_dynar_t data;
24   xbt_mutex_t mutex;
25   xbt_cond_t not_full, not_empty;
26 } s_xbt_queue_t;
27
28 /** @brief Create a new message exchange queue.
29  *
30  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
32  */
33 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
34 {
35   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
36   xbt_assert0(capacity >= 0, "Capacity cannot be negative");
37
38   res->capacity = capacity;
39   res->data = xbt_dynar_new(elm_size, NULL);
40   res->mutex = xbt_mutex_init();
41   res->not_full = xbt_cond_init();
42   res->not_empty = xbt_cond_init();
43   return res;
44 }
45
46 /** @brief Destroy a message exchange queue.
47  *
48  * Any remaining content is leaked.
49  */
50 void xbt_queue_free(xbt_queue_t * queue)
51 {
52
53   xbt_dynar_free(&((*queue)->data));
54   xbt_mutex_destroy((*queue)->mutex);
55   xbt_cond_destroy((*queue)->not_full);
56   xbt_cond_destroy((*queue)->not_empty);
57   free((*queue));
58   *queue = NULL;
59 }
60
61 /** @brief Get the queue size */
62 unsigned long xbt_queue_length(const xbt_queue_t queue)
63 {
64   unsigned long res;
65   xbt_mutex_acquire(queue->mutex);
66   res = xbt_dynar_length(queue->data);
67   xbt_mutex_release(queue->mutex);
68   return res;
69 }
70
71 /** @brief Push something to the message exchange queue.
72  *
73  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
74  *
75  * @see #xbt_dynar_push
76  */
77 void xbt_queue_push(xbt_queue_t queue, const void *src)
78 {
79   xbt_mutex_acquire(queue->mutex);
80   while (queue->capacity != 0
81          && queue->capacity == xbt_dynar_length(queue->data)) {
82     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
83     xbt_cond_wait(queue->not_full, queue->mutex);
84   }
85   xbt_dynar_push(queue->data, src);
86   xbt_cond_signal(queue->not_empty);
87   xbt_mutex_release(queue->mutex);
88 }
89
90
91 /** @brief Pop something from the message exchange queue.
92  *
93  * This is blocking if the queue is empty.
94  *
95  * @see #xbt_dynar_pop
96  *
97  */
98 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
99 {
100   xbt_mutex_acquire(queue->mutex);
101   while (xbt_dynar_length(queue->data) == 0) {
102     DEBUG1("Queue %p empty. Waiting", queue);
103     xbt_cond_wait(queue->not_empty, queue->mutex);
104   }
105   xbt_dynar_pop(queue->data, dst);
106   xbt_cond_signal(queue->not_full);
107   xbt_mutex_release(queue->mutex);
108 }
109
110 /** @brief Unshift something to the message exchange queue.
111  *
112  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
113  *
114  * @see #xbt_dynar_unshift
115  */
116 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
117 {
118   xbt_mutex_acquire(queue->mutex);
119   while (queue->capacity != 0
120          && queue->capacity == xbt_dynar_length(queue->data)) {
121     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
122     xbt_cond_wait(queue->not_full, queue->mutex);
123   }
124   xbt_dynar_unshift(queue->data, src);
125   xbt_cond_signal(queue->not_empty);
126   xbt_mutex_release(queue->mutex);
127 }
128
129
130 /** @brief Shift something from the message exchange queue.
131  *
132  * This is blocking if the queue is empty.
133  *
134  * @see #xbt_dynar_shift
135  *
136  */
137 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
138 {
139   xbt_mutex_acquire(queue->mutex);
140   while (xbt_dynar_length(queue->data) == 0) {
141     DEBUG1("Queue %p empty. Waiting", queue);
142     xbt_cond_wait(queue->not_empty, queue->mutex);
143   }
144   xbt_dynar_shift(queue->data, dst);
145   xbt_cond_signal(queue->not_full);
146   xbt_mutex_release(queue->mutex);
147 }
148
149
150
151
152 /** @brief Push something to the message exchange queue, with a timeout.
153  *
154  * @see #xbt_queue_push
155  */
156 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
157 {
158   double begin = xbt_time();
159   xbt_ex_t e;
160
161   xbt_mutex_acquire(queue->mutex);
162
163   if (delay == 0) {
164     if (queue->capacity != 0 &&
165         queue->capacity == xbt_dynar_length(queue->data)) {
166
167       xbt_mutex_release(queue->mutex);
168       THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
169              queue, queue->capacity);
170     }
171   } else {
172     while (queue->capacity != 0 &&
173            queue->capacity == xbt_dynar_length(queue->data) &&
174            (delay < 0 || (xbt_time() - begin) <= delay)) {
175
176       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
177       TRY {
178         xbt_cond_timedwait(queue->not_full, queue->mutex,
179                            delay < 0 ? -1 : delay - (xbt_time() - begin));
180       }
181       CATCH(e) {
182         xbt_mutex_release(queue->mutex);
183         RETHROW;
184       }
185     }
186   }
187
188   xbt_dynar_push(queue->data, src);
189   xbt_cond_signal(queue->not_empty);
190   xbt_mutex_release(queue->mutex);
191 }
192
193
194 /** @brief Pop something from the message exchange queue, with a timeout.
195  *
196  * @see #xbt_queue_pop
197  *
198  */
199 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
200 {
201   double begin = xbt_time();
202   xbt_ex_t e;
203
204   xbt_mutex_acquire(queue->mutex);
205
206   if (delay == 0) {
207     if (xbt_dynar_length(queue->data) == 0) {
208       xbt_mutex_release(queue->mutex);
209       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
210     }
211   } else {
212     while ((xbt_dynar_length(queue->data) == 0) &&
213            (delay < 0 || (xbt_time() - begin) <= delay)) {
214       DEBUG1("Queue %p empty. Waiting", queue);
215       TRY {
216         xbt_cond_timedwait(queue->not_empty, queue->mutex,
217                            delay < 0 ? -1 : delay - (xbt_time() - begin));
218       }
219       CATCH(e) {
220         xbt_mutex_release(queue->mutex);
221         RETHROW;
222       }
223     }
224   }
225
226   xbt_dynar_pop(queue->data, dst);
227   xbt_cond_signal(queue->not_full);
228   xbt_mutex_release(queue->mutex);
229 }
230
231 /** @brief Unshift something to the message exchange queue, with a timeout.
232  *
233  * @see #xbt_queue_unshift
234  */
235 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src, double delay)
236 {
237   double begin = xbt_time();
238   xbt_ex_t e;
239
240   xbt_mutex_acquire(queue->mutex);
241
242   if (delay == 0) {
243     if (queue->capacity != 0 &&
244         queue->capacity == xbt_dynar_length(queue->data)) {
245
246       xbt_mutex_release(queue->mutex);
247       THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
248              queue, queue->capacity);
249     }
250   } else {
251     while (queue->capacity != 0 &&
252            queue->capacity == xbt_dynar_length(queue->data) &&
253            (delay < 0 || (xbt_time() - begin) <= delay)) {
254
255       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
256       TRY {
257         xbt_cond_timedwait(queue->not_full, queue->mutex,
258                            delay < 0 ? -1 : delay - (xbt_time() - begin));
259       }
260       CATCH(e) {
261         xbt_mutex_release(queue->mutex);
262         RETHROW;
263       }
264     }
265   }
266
267   xbt_dynar_unshift(queue->data, src);
268   xbt_cond_signal(queue->not_empty);
269   xbt_mutex_release(queue->mutex);
270 }
271
272
273 /** @brief Shift something from the message exchange queue, with a timeout.
274  *
275  * @see #xbt_queue_shift
276  *
277  */
278 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst, double delay)
279 {
280   double begin = xbt_time();
281   xbt_ex_t e;
282
283   xbt_mutex_acquire(queue->mutex);
284
285   if (delay == 0) {
286     if (xbt_dynar_length(queue->data) == 0) {
287       xbt_mutex_release(queue->mutex);
288       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
289     }
290   } else {
291     while ((xbt_dynar_length(queue->data) == 0) &&
292            (delay < 0 || (xbt_time() - begin) <= delay)) {
293       DEBUG1("Queue %p empty. Waiting", queue);
294       TRY {
295         xbt_cond_timedwait(queue->not_empty, queue->mutex,
296                            delay < 0 ? -1 : delay - (xbt_time() - begin));
297       }
298       CATCH(e) {
299         xbt_mutex_release(queue->mutex);
300         RETHROW;
301       }
302     }
303   }
304
305   if (xbt_dynar_length(queue->data) == 0) {
306     xbt_mutex_release(queue->mutex);
307     THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
308            delay);
309   }
310
311   xbt_dynar_shift(queue->data, dst);
312   xbt_cond_signal(queue->not_full);
313   xbt_mutex_release(queue->mutex);
314 }