Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
model-checker : move functions about snapshot comparison in a separate file mc_compare.c
[simgrid.git] / src / xbt / xbt_queue.c
1 /* A (synchronized) message queue.                                          */
2 /* Popping an empty queue is blocking, as well as pushing a full one        */
3
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/misc.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "xbt/dynar.h"
14
15 #include "xbt/synchro.h"
16 #include "xbt/queue.h"          /* this module */
17 #include "gras/virtu.h"
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
19                                 "Message exchanging queue");
20
21 typedef struct s_xbt_queue_ {
22   int capacity;
23   xbt_dynar_t data;
24   xbt_mutex_t mutex;
25   xbt_cond_t not_full, not_empty;
26 } s_xbt_queue_t;
27
28 /** @brief Create a new message exchange queue.
29  *
30  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
32  */
33 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
34 {
35   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
36   if (capacity<0)
37     capacity=0;
38
39   res->capacity = capacity;
40   res->data = xbt_dynar_new(elm_size, NULL);
41   res->mutex = xbt_mutex_init();
42   res->not_full = xbt_cond_init();
43   res->not_empty = xbt_cond_init();
44   return res;
45 }
46
47 /** @brief Destroy a message exchange queue.
48  *
49  * Any remaining content is leaked.
50  */
51 void xbt_queue_free(xbt_queue_t * queue)
52 {
53
54   xbt_dynar_free(&((*queue)->data));
55   xbt_mutex_destroy((*queue)->mutex);
56   xbt_cond_destroy((*queue)->not_full);
57   xbt_cond_destroy((*queue)->not_empty);
58   free(*queue);
59   *queue = NULL;
60 }
61
62 /** @brief Get the queue size */
63 unsigned long xbt_queue_length(const xbt_queue_t queue)
64 {
65   unsigned long res;
66   xbt_mutex_acquire(queue->mutex);
67   res = xbt_dynar_length(queue->data);
68   xbt_mutex_release(queue->mutex);
69   return res;
70 }
71
72 /** @brief Push something to the message exchange queue.
73  *
74  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
75  *
76  * @see #xbt_dynar_push
77  */
78 void xbt_queue_push(xbt_queue_t queue, const void *src)
79 {
80   xbt_mutex_acquire(queue->mutex);
81   while (queue->capacity != 0
82          && queue->capacity == xbt_dynar_length(queue->data)) {
83     XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
84            queue->capacity);
85     xbt_cond_wait(queue->not_full, queue->mutex);
86   }
87   xbt_dynar_push(queue->data, src);
88   xbt_cond_signal(queue->not_empty);
89   xbt_mutex_release(queue->mutex);
90 }
91
92
93 /** @brief Pop something from the message exchange queue.
94  *
95  * This is blocking if the queue is empty.
96  *
97  * @see #xbt_dynar_pop
98  *
99  */
100 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
101 {
102   xbt_mutex_acquire(queue->mutex);
103   while (xbt_dynar_is_empty(queue->data)) {
104     XBT_DEBUG("Queue %p empty. Waiting", queue);
105     xbt_cond_wait(queue->not_empty, queue->mutex);
106   }
107   xbt_dynar_pop(queue->data, dst);
108   xbt_cond_signal(queue->not_full);
109   xbt_mutex_release(queue->mutex);
110 }
111
112 /** @brief Unshift something to the message exchange queue.
113  *
114  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
115  *
116  * @see #xbt_dynar_unshift
117  */
118 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
119 {
120   xbt_mutex_acquire(queue->mutex);
121   while (queue->capacity != 0
122          && queue->capacity == xbt_dynar_length(queue->data)) {
123     XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
124            queue->capacity);
125     xbt_cond_wait(queue->not_full, queue->mutex);
126   }
127   xbt_dynar_unshift(queue->data, src);
128   xbt_cond_signal(queue->not_empty);
129   xbt_mutex_release(queue->mutex);
130 }
131
132
133 /** @brief Shift something from the message exchange queue.
134  *
135  * This is blocking if the queue is empty.
136  *
137  * @see #xbt_dynar_shift
138  *
139  */
140 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
141 {
142   xbt_mutex_acquire(queue->mutex);
143   while (xbt_dynar_is_empty(queue->data)) {
144     XBT_DEBUG("Queue %p empty. Waiting", queue);
145     xbt_cond_wait(queue->not_empty, queue->mutex);
146   }
147   xbt_dynar_shift(queue->data, dst);
148   xbt_cond_signal(queue->not_full);
149   xbt_mutex_release(queue->mutex);
150 }
151
152
153
154
155 /** @brief Push something to the message exchange queue, with a timeout.
156  *
157  * @see #xbt_queue_push
158  */
159 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
160 {
161   double begin = xbt_time();
162
163   xbt_mutex_acquire(queue->mutex);
164
165   if (delay == 0) {
166     if (queue->capacity != 0 &&
167         queue->capacity == xbt_dynar_length(queue->data)) {
168
169       xbt_mutex_release(queue->mutex);
170       THROWF(timeout_error, 0,
171              "Capacity of %p exceeded (=%d), and delay = 0", queue,
172              queue->capacity);
173     }
174   } else {
175     while (queue->capacity != 0 &&
176            queue->capacity == xbt_dynar_length(queue->data) &&
177            (delay < 0 || (xbt_time() - begin) <= delay)) {
178
179       XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
180              queue->capacity);
181       TRY {
182         xbt_cond_timedwait(queue->not_full, queue->mutex,
183                            delay < 0 ? -1 : delay - (xbt_time() - begin));
184       }
185       CATCH_ANONYMOUS {
186         xbt_mutex_release(queue->mutex);
187         RETHROW;
188       }
189     }
190   }
191
192   xbt_dynar_push(queue->data, src);
193   xbt_cond_signal(queue->not_empty);
194   xbt_mutex_release(queue->mutex);
195 }
196
197
198 /** @brief Pop something from the message exchange queue, with a timeout.
199  *
200  * @see #xbt_queue_pop
201  *
202  */
203 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
204 {
205   double begin = xbt_time();
206
207   xbt_mutex_acquire(queue->mutex);
208
209   if (delay == 0) {
210     if (xbt_dynar_is_empty(queue->data)) {
211       xbt_mutex_release(queue->mutex);
212       THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
213     }
214   } else {
215     while ((xbt_dynar_is_empty(queue->data)) &&
216            (delay < 0 || (xbt_time() - begin) <= delay)) {
217       XBT_DEBUG("Queue %p empty. Waiting", queue);
218       TRY {
219         xbt_cond_timedwait(queue->not_empty, queue->mutex,
220                            delay < 0 ? -1 : delay - (xbt_time() - begin));
221       }
222       CATCH_ANONYMOUS {
223         xbt_mutex_release(queue->mutex);
224         RETHROW;
225       }
226     }
227   }
228
229   xbt_dynar_pop(queue->data, dst);
230   xbt_cond_signal(queue->not_full);
231   xbt_mutex_release(queue->mutex);
232 }
233
234 /** @brief Unshift something to the message exchange queue, with a timeout.
235  *
236  * @see #xbt_queue_unshift
237  */
238 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
239                              double delay)
240 {
241   double begin = xbt_time();
242
243   xbt_mutex_acquire(queue->mutex);
244
245   if (delay == 0) {
246     if (queue->capacity != 0 &&
247         queue->capacity == xbt_dynar_length(queue->data)) {
248
249       xbt_mutex_release(queue->mutex);
250       THROWF(timeout_error, 0,
251              "Capacity of %p exceeded (=%d), and delay = 0", queue,
252              queue->capacity);
253     }
254   } else {
255     while (queue->capacity != 0 &&
256            queue->capacity == xbt_dynar_length(queue->data) &&
257            (delay < 0 || (xbt_time() - begin) <= delay)) {
258
259       XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
260              queue->capacity);
261       TRY {
262         xbt_cond_timedwait(queue->not_full, queue->mutex,
263                            delay < 0 ? -1 : delay - (xbt_time() - begin));
264       }
265       CATCH_ANONYMOUS {
266         xbt_mutex_release(queue->mutex);
267         RETHROW;
268       }
269     }
270   }
271
272   xbt_dynar_unshift(queue->data, src);
273   xbt_cond_signal(queue->not_empty);
274   xbt_mutex_release(queue->mutex);
275 }
276
277
278 /** @brief Shift something from the message exchange queue, with a timeout.
279  *
280  * @see #xbt_queue_shift
281  *
282  */
283 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
284                            double delay)
285 {
286   double begin = xbt_time();
287
288   xbt_mutex_acquire(queue->mutex);
289
290   if (delay == 0) {
291     if (xbt_dynar_is_empty(queue->data)) {
292       xbt_mutex_release(queue->mutex);
293       THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
294     }
295   } else {
296     while ((xbt_dynar_is_empty(queue->data)) &&
297            (delay < 0 || (xbt_time() - begin) <= delay)) {
298       XBT_DEBUG("Queue %p empty. Waiting", queue);
299       TRY {
300         xbt_cond_timedwait(queue->not_empty, queue->mutex,
301                            delay < 0 ? -1 : delay - (xbt_time() - begin));
302       }
303       CATCH_ANONYMOUS {
304         xbt_mutex_release(queue->mutex);
305         RETHROW;
306       }
307     }
308   }
309
310   if (xbt_dynar_is_empty(queue->data)) {
311     xbt_mutex_release(queue->mutex);
312     THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
313            delay);
314   }
315
316   xbt_dynar_shift(queue->data, dst);
317   xbt_cond_signal(queue->not_full);
318   xbt_mutex_release(queue->mutex);
319 }