Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make bprintf abort on error, and define bvprintf accordingly.
[simgrid.git] / src / xbt / xbt_queue.c
1 /* A (synchronized) message queue.                                          */
2 /* Popping an empty queue is blocking, as well as pushing a full one        */
3
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/misc.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "xbt/dynar.h"
14
15 #include "xbt/synchro.h"
16 #include "xbt/queue.h"          /* this module */
17 #include "gras/virtu.h"
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
19                                 "Message exchanging queue");
20
21 typedef struct s_xbt_queue_ {
22   int capacity;
23   xbt_dynar_t data;
24   xbt_mutex_t mutex;
25   xbt_cond_t not_full, not_empty;
26 } s_xbt_queue_t;
27
28 /** @brief Create a new message exchange queue.
29  *
30  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
32  */
33 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
34 {
35   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
36   xbt_assert0(capacity >= 0, "Capacity cannot be negative");
37
38   res->capacity = capacity;
39   res->data = xbt_dynar_new(elm_size, NULL);
40   res->mutex = xbt_mutex_init();
41   res->not_full = xbt_cond_init();
42   res->not_empty = xbt_cond_init();
43   return res;
44 }
45
46 /** @brief Destroy a message exchange queue.
47  *
48  * Any remaining content is leaked.
49  */
50 void xbt_queue_free(xbt_queue_t * queue)
51 {
52
53   xbt_dynar_free(&((*queue)->data));
54   xbt_mutex_destroy((*queue)->mutex);
55   xbt_cond_destroy((*queue)->not_full);
56   xbt_cond_destroy((*queue)->not_empty);
57   free((*queue));
58   *queue = NULL;
59 }
60
61 /** @brief Get the queue size */
62 unsigned long xbt_queue_length(const xbt_queue_t queue)
63 {
64   unsigned long res;
65   xbt_mutex_acquire(queue->mutex);
66   res = xbt_dynar_length(queue->data);
67   xbt_mutex_release(queue->mutex);
68   return res;
69 }
70
71 /** @brief Push something to the message exchange queue.
72  *
73  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
74  *
75  * @see #xbt_dynar_push
76  */
77 void xbt_queue_push(xbt_queue_t queue, const void *src)
78 {
79   xbt_mutex_acquire(queue->mutex);
80   while (queue->capacity != 0
81          && queue->capacity == xbt_dynar_length(queue->data)) {
82     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue,
83            queue->capacity);
84     xbt_cond_wait(queue->not_full, queue->mutex);
85   }
86   xbt_dynar_push(queue->data, src);
87   xbt_cond_signal(queue->not_empty);
88   xbt_mutex_release(queue->mutex);
89 }
90
91
92 /** @brief Pop something from the message exchange queue.
93  *
94  * This is blocking if the queue is empty.
95  *
96  * @see #xbt_dynar_pop
97  *
98  */
99 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
100 {
101   xbt_mutex_acquire(queue->mutex);
102   while (xbt_dynar_length(queue->data) == 0) {
103     DEBUG1("Queue %p empty. Waiting", queue);
104     xbt_cond_wait(queue->not_empty, queue->mutex);
105   }
106   xbt_dynar_pop(queue->data, dst);
107   xbt_cond_signal(queue->not_full);
108   xbt_mutex_release(queue->mutex);
109 }
110
111 /** @brief Unshift something to the message exchange queue.
112  *
113  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
114  *
115  * @see #xbt_dynar_unshift
116  */
117 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
118 {
119   xbt_mutex_acquire(queue->mutex);
120   while (queue->capacity != 0
121          && queue->capacity == xbt_dynar_length(queue->data)) {
122     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue,
123            queue->capacity);
124     xbt_cond_wait(queue->not_full, queue->mutex);
125   }
126   xbt_dynar_unshift(queue->data, src);
127   xbt_cond_signal(queue->not_empty);
128   xbt_mutex_release(queue->mutex);
129 }
130
131
132 /** @brief Shift something from the message exchange queue.
133  *
134  * This is blocking if the queue is empty.
135  *
136  * @see #xbt_dynar_shift
137  *
138  */
139 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
140 {
141   xbt_mutex_acquire(queue->mutex);
142   while (xbt_dynar_length(queue->data) == 0) {
143     DEBUG1("Queue %p empty. Waiting", queue);
144     xbt_cond_wait(queue->not_empty, queue->mutex);
145   }
146   xbt_dynar_shift(queue->data, dst);
147   xbt_cond_signal(queue->not_full);
148   xbt_mutex_release(queue->mutex);
149 }
150
151
152
153
154 /** @brief Push something to the message exchange queue, with a timeout.
155  *
156  * @see #xbt_queue_push
157  */
158 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
159 {
160   double begin = xbt_time();
161   xbt_ex_t e;
162
163   xbt_mutex_acquire(queue->mutex);
164
165   if (delay == 0) {
166     if (queue->capacity != 0 &&
167         queue->capacity == xbt_dynar_length(queue->data)) {
168
169       xbt_mutex_release(queue->mutex);
170       THROW2(timeout_error, 0,
171              "Capacity of %p exceded (=%d), and delay = 0", queue,
172              queue->capacity);
173     }
174   } else {
175     while (queue->capacity != 0 &&
176            queue->capacity == xbt_dynar_length(queue->data) &&
177            (delay < 0 || (xbt_time() - begin) <= delay)) {
178
179       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue,
180              queue->capacity);
181       TRY {
182         xbt_cond_timedwait(queue->not_full, queue->mutex,
183                            delay < 0 ? -1 : delay - (xbt_time() - begin));
184       }
185       CATCH(e) {
186         xbt_mutex_release(queue->mutex);
187         RETHROW;
188       }
189     }
190   }
191
192   xbt_dynar_push(queue->data, src);
193   xbt_cond_signal(queue->not_empty);
194   xbt_mutex_release(queue->mutex);
195 }
196
197
198 /** @brief Pop something from the message exchange queue, with a timeout.
199  *
200  * @see #xbt_queue_pop
201  *
202  */
203 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
204 {
205   double begin = xbt_time();
206   xbt_ex_t e;
207
208   xbt_mutex_acquire(queue->mutex);
209
210   if (delay == 0) {
211     if (xbt_dynar_length(queue->data) == 0) {
212       xbt_mutex_release(queue->mutex);
213       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
214     }
215   } else {
216     while ((xbt_dynar_length(queue->data) == 0) &&
217            (delay < 0 || (xbt_time() - begin) <= delay)) {
218       DEBUG1("Queue %p empty. Waiting", queue);
219       TRY {
220         xbt_cond_timedwait(queue->not_empty, queue->mutex,
221                            delay < 0 ? -1 : delay - (xbt_time() - begin));
222       }
223       CATCH(e) {
224         xbt_mutex_release(queue->mutex);
225         RETHROW;
226       }
227     }
228   }
229
230   xbt_dynar_pop(queue->data, dst);
231   xbt_cond_signal(queue->not_full);
232   xbt_mutex_release(queue->mutex);
233 }
234
235 /** @brief Unshift something to the message exchange queue, with a timeout.
236  *
237  * @see #xbt_queue_unshift
238  */
239 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
240                              double delay)
241 {
242   double begin = xbt_time();
243   xbt_ex_t e;
244
245   xbt_mutex_acquire(queue->mutex);
246
247   if (delay == 0) {
248     if (queue->capacity != 0 &&
249         queue->capacity == xbt_dynar_length(queue->data)) {
250
251       xbt_mutex_release(queue->mutex);
252       THROW2(timeout_error, 0,
253              "Capacity of %p exceded (=%d), and delay = 0", queue,
254              queue->capacity);
255     }
256   } else {
257     while (queue->capacity != 0 &&
258            queue->capacity == xbt_dynar_length(queue->data) &&
259            (delay < 0 || (xbt_time() - begin) <= delay)) {
260
261       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue,
262              queue->capacity);
263       TRY {
264         xbt_cond_timedwait(queue->not_full, queue->mutex,
265                            delay < 0 ? -1 : delay - (xbt_time() - begin));
266       }
267       CATCH(e) {
268         xbt_mutex_release(queue->mutex);
269         RETHROW;
270       }
271     }
272   }
273
274   xbt_dynar_unshift(queue->data, src);
275   xbt_cond_signal(queue->not_empty);
276   xbt_mutex_release(queue->mutex);
277 }
278
279
280 /** @brief Shift something from the message exchange queue, with a timeout.
281  *
282  * @see #xbt_queue_shift
283  *
284  */
285 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
286                            double delay)
287 {
288   double begin = xbt_time();
289   xbt_ex_t e;
290
291   xbt_mutex_acquire(queue->mutex);
292
293   if (delay == 0) {
294     if (xbt_dynar_length(queue->data) == 0) {
295       xbt_mutex_release(queue->mutex);
296       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
297     }
298   } else {
299     while ((xbt_dynar_length(queue->data) == 0) &&
300            (delay < 0 || (xbt_time() - begin) <= delay)) {
301       DEBUG1("Queue %p empty. Waiting", queue);
302       TRY {
303         xbt_cond_timedwait(queue->not_empty, queue->mutex,
304                            delay < 0 ? -1 : delay - (xbt_time() - begin));
305       }
306       CATCH(e) {
307         xbt_mutex_release(queue->mutex);
308         RETHROW;
309       }
310     }
311   }
312
313   if (xbt_dynar_length(queue->data) == 0) {
314     xbt_mutex_release(queue->mutex);
315     THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
316            delay);
317   }
318
319   xbt_dynar_shift(queue->data, dst);
320   xbt_cond_signal(queue->not_full);
321   xbt_mutex_release(queue->mutex);
322 }