Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
ba6315e89e32fb590e1ad5662cf4dc7ad3ceeb42
[simgrid.git] / src / xbt / xbt_queue.c
1 /* A (synchronized) message queue.                                          */
2 /* Popping an empty queue is blocking, as well as pushing a full one        */
3
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/misc.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "xbt/dynar.h"
14
15 #include "xbt/synchro.h"
16 #include "xbt/queue.h"          /* this module */
17 #include "gras/virtu.h"
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
19                                 "Message exchanging queue");
20
21 typedef struct s_xbt_queue_ {
22   int capacity;
23   xbt_dynar_t data;
24   xbt_mutex_t mutex;
25   xbt_cond_t not_full, not_empty;
26 } s_xbt_queue_t;
27
28 /** @brief Create a new message exchange queue.
29  *
30  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
32  */
33 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
34 {
35   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
36   if (capacity<0)
37     capacity=0;
38
39   res->capacity = capacity;
40   res->data = xbt_dynar_new(elm_size, NULL);
41   res->mutex = xbt_mutex_init();
42   res->not_full = xbt_cond_init();
43   res->not_empty = xbt_cond_init();
44   return res;
45 }
46
47 /** @brief Destroy a message exchange queue.
48  *
49  * Any remaining content is leaked.
50  */
51 void xbt_queue_free(xbt_queue_t * queue)
52 {
53
54   xbt_dynar_free(&((*queue)->data));
55   xbt_mutex_destroy((*queue)->mutex);
56   xbt_cond_destroy((*queue)->not_full);
57   xbt_cond_destroy((*queue)->not_empty);
58   free((*queue));
59   *queue = NULL;
60 }
61
62 /** @brief Get the queue size */
63 unsigned long xbt_queue_length(const xbt_queue_t queue)
64 {
65   unsigned long res;
66   xbt_mutex_acquire(queue->mutex);
67   res = xbt_dynar_length(queue->data);
68   xbt_mutex_release(queue->mutex);
69   return res;
70 }
71
72 /** @brief Push something to the message exchange queue.
73  *
74  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
75  *
76  * @see #xbt_dynar_push
77  */
78 void xbt_queue_push(xbt_queue_t queue, const void *src)
79 {
80   xbt_mutex_acquire(queue->mutex);
81   while (queue->capacity != 0
82          && queue->capacity == xbt_dynar_length(queue->data)) {
83     XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
84            queue->capacity);
85     xbt_cond_wait(queue->not_full, queue->mutex);
86   }
87   xbt_dynar_push(queue->data, src);
88   xbt_cond_signal(queue->not_empty);
89   xbt_mutex_release(queue->mutex);
90 }
91
92
93 /** @brief Pop something from the message exchange queue.
94  *
95  * This is blocking if the queue is empty.
96  *
97  * @see #xbt_dynar_pop
98  *
99  */
100 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
101 {
102   xbt_mutex_acquire(queue->mutex);
103   while (xbt_dynar_length(queue->data) == 0) {
104     XBT_DEBUG("Queue %p empty. Waiting", queue);
105     xbt_cond_wait(queue->not_empty, queue->mutex);
106   }
107   xbt_dynar_pop(queue->data, dst);
108   xbt_cond_signal(queue->not_full);
109   xbt_mutex_release(queue->mutex);
110 }
111
112 /** @brief Unshift something to the message exchange queue.
113  *
114  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
115  *
116  * @see #xbt_dynar_unshift
117  */
118 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
119 {
120   xbt_mutex_acquire(queue->mutex);
121   while (queue->capacity != 0
122          && queue->capacity == xbt_dynar_length(queue->data)) {
123     XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
124            queue->capacity);
125     xbt_cond_wait(queue->not_full, queue->mutex);
126   }
127   xbt_dynar_unshift(queue->data, src);
128   xbt_cond_signal(queue->not_empty);
129   xbt_mutex_release(queue->mutex);
130 }
131
132
133 /** @brief Shift something from the message exchange queue.
134  *
135  * This is blocking if the queue is empty.
136  *
137  * @see #xbt_dynar_shift
138  *
139  */
140 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
141 {
142   xbt_mutex_acquire(queue->mutex);
143   while (xbt_dynar_length(queue->data) == 0) {
144     XBT_DEBUG("Queue %p empty. Waiting", queue);
145     xbt_cond_wait(queue->not_empty, queue->mutex);
146   }
147   xbt_dynar_shift(queue->data, dst);
148   xbt_cond_signal(queue->not_full);
149   xbt_mutex_release(queue->mutex);
150 }
151
152
153
154
155 /** @brief Push something to the message exchange queue, with a timeout.
156  *
157  * @see #xbt_queue_push
158  */
159 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
160 {
161   double begin = xbt_time();
162   xbt_ex_t e;
163
164   xbt_mutex_acquire(queue->mutex);
165
166   if (delay == 0) {
167     if (queue->capacity != 0 &&
168         queue->capacity == xbt_dynar_length(queue->data)) {
169
170       xbt_mutex_release(queue->mutex);
171       THROW2(timeout_error, 0,
172              "Capacity of %p exceded (=%d), and delay = 0", queue,
173              queue->capacity);
174     }
175   } else {
176     while (queue->capacity != 0 &&
177            queue->capacity == xbt_dynar_length(queue->data) &&
178            (delay < 0 || (xbt_time() - begin) <= delay)) {
179
180       XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
181              queue->capacity);
182       TRY {
183         xbt_cond_timedwait(queue->not_full, queue->mutex,
184                            delay < 0 ? -1 : delay - (xbt_time() - begin));
185       }
186       CATCH(e) {
187         xbt_mutex_release(queue->mutex);
188         RETHROW;
189       }
190     }
191   }
192
193   xbt_dynar_push(queue->data, src);
194   xbt_cond_signal(queue->not_empty);
195   xbt_mutex_release(queue->mutex);
196 }
197
198
199 /** @brief Pop something from the message exchange queue, with a timeout.
200  *
201  * @see #xbt_queue_pop
202  *
203  */
204 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
205 {
206   double begin = xbt_time();
207   xbt_ex_t e;
208
209   xbt_mutex_acquire(queue->mutex);
210
211   if (delay == 0) {
212     if (xbt_dynar_length(queue->data) == 0) {
213       xbt_mutex_release(queue->mutex);
214       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
215     }
216   } else {
217     while ((xbt_dynar_length(queue->data) == 0) &&
218            (delay < 0 || (xbt_time() - begin) <= delay)) {
219       XBT_DEBUG("Queue %p empty. Waiting", queue);
220       TRY {
221         xbt_cond_timedwait(queue->not_empty, queue->mutex,
222                            delay < 0 ? -1 : delay - (xbt_time() - begin));
223       }
224       CATCH(e) {
225         xbt_mutex_release(queue->mutex);
226         RETHROW;
227       }
228     }
229   }
230
231   xbt_dynar_pop(queue->data, dst);
232   xbt_cond_signal(queue->not_full);
233   xbt_mutex_release(queue->mutex);
234 }
235
236 /** @brief Unshift something to the message exchange queue, with a timeout.
237  *
238  * @see #xbt_queue_unshift
239  */
240 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
241                              double delay)
242 {
243   double begin = xbt_time();
244   xbt_ex_t e;
245
246   xbt_mutex_acquire(queue->mutex);
247
248   if (delay == 0) {
249     if (queue->capacity != 0 &&
250         queue->capacity == xbt_dynar_length(queue->data)) {
251
252       xbt_mutex_release(queue->mutex);
253       THROW2(timeout_error, 0,
254              "Capacity of %p exceded (=%d), and delay = 0", queue,
255              queue->capacity);
256     }
257   } else {
258     while (queue->capacity != 0 &&
259            queue->capacity == xbt_dynar_length(queue->data) &&
260            (delay < 0 || (xbt_time() - begin) <= delay)) {
261
262       XBT_DEBUG("Capacity of %p exceded (=%d). Waiting", queue,
263              queue->capacity);
264       TRY {
265         xbt_cond_timedwait(queue->not_full, queue->mutex,
266                            delay < 0 ? -1 : delay - (xbt_time() - begin));
267       }
268       CATCH(e) {
269         xbt_mutex_release(queue->mutex);
270         RETHROW;
271       }
272     }
273   }
274
275   xbt_dynar_unshift(queue->data, src);
276   xbt_cond_signal(queue->not_empty);
277   xbt_mutex_release(queue->mutex);
278 }
279
280
281 /** @brief Shift something from the message exchange queue, with a timeout.
282  *
283  * @see #xbt_queue_shift
284  *
285  */
286 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
287                            double delay)
288 {
289   double begin = xbt_time();
290   xbt_ex_t e;
291
292   xbt_mutex_acquire(queue->mutex);
293
294   if (delay == 0) {
295     if (xbt_dynar_length(queue->data) == 0) {
296       xbt_mutex_release(queue->mutex);
297       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
298     }
299   } else {
300     while ((xbt_dynar_length(queue->data) == 0) &&
301            (delay < 0 || (xbt_time() - begin) <= delay)) {
302       XBT_DEBUG("Queue %p empty. Waiting", queue);
303       TRY {
304         xbt_cond_timedwait(queue->not_empty, queue->mutex,
305                            delay < 0 ? -1 : delay - (xbt_time() - begin));
306       }
307       CATCH(e) {
308         xbt_mutex_release(queue->mutex);
309         RETHROW;
310       }
311     }
312   }
313
314   if (xbt_dynar_length(queue->data) == 0) {
315     xbt_mutex_release(queue->mutex);
316     THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
317            delay);
318   }
319
320   xbt_dynar_shift(queue->data, dst);
321   xbt_cond_signal(queue->not_full);
322   xbt_mutex_release(queue->mutex);
323 }