Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
4585ae1673600c9edba98eb676a6c54d92cb1fc1
[simgrid.git] / src / xbt / xbt_queue.c
1 /* A (synchronized) message queue.                                          */
2 /* Popping an empty queue is blocking, as well as pushing a full one        */
3
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/misc.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "xbt/dynar.h"
14
15 #include "xbt/synchro.h"
16 #include "xbt/queue.h"          /* this module */
17 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
18                                 "Message exchanging queue");
19
20 typedef struct s_xbt_queue_ {
21   int capacity;
22   xbt_dynar_t data;
23   xbt_mutex_t mutex;
24   xbt_cond_t not_full, not_empty;
25 } s_xbt_queue_t;
26
27 /** @brief Create a new message exchange queue.
28  *
29  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
30  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
31  */
32 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
33 {
34   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
35   if (capacity<0)
36     capacity=0;
37
38   res->capacity = capacity;
39   res->data = xbt_dynar_new(elm_size, NULL);
40   res->mutex = xbt_mutex_init();
41   res->not_full = xbt_cond_init();
42   res->not_empty = xbt_cond_init();
43   return res;
44 }
45
46 /** @brief Destroy a message exchange queue.
47  *
48  * Any remaining content is leaked.
49  */
50 void xbt_queue_free(xbt_queue_t * queue)
51 {
52
53   xbt_dynar_free(&((*queue)->data));
54   xbt_mutex_destroy((*queue)->mutex);
55   xbt_cond_destroy((*queue)->not_full);
56   xbt_cond_destroy((*queue)->not_empty);
57   free(*queue);
58   *queue = NULL;
59 }
60
61 /** @brief Get the queue size */
62 unsigned long xbt_queue_length(const xbt_queue_t queue)
63 {
64   unsigned long res;
65   xbt_mutex_acquire(queue->mutex);
66   res = xbt_dynar_length(queue->data);
67   xbt_mutex_release(queue->mutex);
68   return res;
69 }
70
71 /** @brief Push something to the message exchange queue.
72  *
73  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
74  *
75  * @see #xbt_dynar_push
76  */
77 void xbt_queue_push(xbt_queue_t queue, const void *src)
78 {
79   xbt_mutex_acquire(queue->mutex);
80   while (queue->capacity != 0
81          && queue->capacity == xbt_dynar_length(queue->data)) {
82     XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
83            queue->capacity);
84     xbt_cond_wait(queue->not_full, queue->mutex);
85   }
86   xbt_dynar_push(queue->data, src);
87   xbt_cond_signal(queue->not_empty);
88   xbt_mutex_release(queue->mutex);
89 }
90
91
92 /** @brief Pop something from the message exchange queue.
93  *
94  * This is blocking if the queue is empty.
95  *
96  * @see #xbt_dynar_pop
97  *
98  */
99 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
100 {
101   xbt_mutex_acquire(queue->mutex);
102   while (xbt_dynar_is_empty(queue->data)) {
103     XBT_DEBUG("Queue %p empty. Waiting", queue);
104     xbt_cond_wait(queue->not_empty, queue->mutex);
105   }
106   xbt_dynar_pop(queue->data, dst);
107   xbt_cond_signal(queue->not_full);
108   xbt_mutex_release(queue->mutex);
109 }
110
111 /** @brief Unshift something to the message exchange queue.
112  *
113  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
114  *
115  * @see #xbt_dynar_unshift
116  */
117 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
118 {
119   xbt_mutex_acquire(queue->mutex);
120   while (queue->capacity != 0
121          && queue->capacity == xbt_dynar_length(queue->data)) {
122     XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
123            queue->capacity);
124     xbt_cond_wait(queue->not_full, queue->mutex);
125   }
126   xbt_dynar_unshift(queue->data, src);
127   xbt_cond_signal(queue->not_empty);
128   xbt_mutex_release(queue->mutex);
129 }
130
131
132 /** @brief Shift something from the message exchange queue.
133  *
134  * This is blocking if the queue is empty.
135  *
136  * @see #xbt_dynar_shift
137  *
138  */
139 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
140 {
141   xbt_mutex_acquire(queue->mutex);
142   while (xbt_dynar_is_empty(queue->data)) {
143     XBT_DEBUG("Queue %p empty. Waiting", queue);
144     xbt_cond_wait(queue->not_empty, queue->mutex);
145   }
146   xbt_dynar_shift(queue->data, dst);
147   xbt_cond_signal(queue->not_full);
148   xbt_mutex_release(queue->mutex);
149 }
150
151
152
153
154 /** @brief Push something to the message exchange queue, with a timeout.
155  *
156  * @see #xbt_queue_push
157  */
158 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
159 {
160   double begin = xbt_time();
161
162   xbt_mutex_acquire(queue->mutex);
163
164   if (delay == 0) {
165     if (queue->capacity != 0 &&
166         queue->capacity == xbt_dynar_length(queue->data)) {
167
168       xbt_mutex_release(queue->mutex);
169       THROWF(timeout_error, 0,
170              "Capacity of %p exceeded (=%d), and delay = 0", queue,
171              queue->capacity);
172     }
173   } else {
174     while (queue->capacity != 0 &&
175            queue->capacity == xbt_dynar_length(queue->data) &&
176            (delay < 0 || (xbt_time() - begin) <= delay)) {
177
178       XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
179              queue->capacity);
180       TRY {
181         xbt_cond_timedwait(queue->not_full, queue->mutex,
182                            delay < 0 ? -1 : delay - (xbt_time() - begin));
183       }
184       CATCH_ANONYMOUS {
185         xbt_mutex_release(queue->mutex);
186         RETHROW;
187       }
188     }
189   }
190
191   xbt_dynar_push(queue->data, src);
192   xbt_cond_signal(queue->not_empty);
193   xbt_mutex_release(queue->mutex);
194 }
195
196
197 /** @brief Pop something from the message exchange queue, with a timeout.
198  *
199  * @see #xbt_queue_pop
200  *
201  */
202 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
203 {
204   double begin = xbt_time();
205
206   xbt_mutex_acquire(queue->mutex);
207
208   if (delay == 0) {
209     if (xbt_dynar_is_empty(queue->data)) {
210       xbt_mutex_release(queue->mutex);
211       THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
212     }
213   } else {
214     while ((xbt_dynar_is_empty(queue->data)) &&
215            (delay < 0 || (xbt_time() - begin) <= delay)) {
216       XBT_DEBUG("Queue %p empty. Waiting", queue);
217       TRY {
218         xbt_cond_timedwait(queue->not_empty, queue->mutex,
219                            delay < 0 ? -1 : delay - (xbt_time() - begin));
220       }
221       CATCH_ANONYMOUS {
222         xbt_mutex_release(queue->mutex);
223         RETHROW;
224       }
225     }
226   }
227
228   xbt_dynar_pop(queue->data, dst);
229   xbt_cond_signal(queue->not_full);
230   xbt_mutex_release(queue->mutex);
231 }
232
233 /** @brief Unshift something to the message exchange queue, with a timeout.
234  *
235  * @see #xbt_queue_unshift
236  */
237 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
238                              double delay)
239 {
240   double begin = xbt_time();
241
242   xbt_mutex_acquire(queue->mutex);
243
244   if (delay == 0) {
245     if (queue->capacity != 0 &&
246         queue->capacity == xbt_dynar_length(queue->data)) {
247
248       xbt_mutex_release(queue->mutex);
249       THROWF(timeout_error, 0,
250              "Capacity of %p exceeded (=%d), and delay = 0", queue,
251              queue->capacity);
252     }
253   } else {
254     while (queue->capacity != 0 &&
255            queue->capacity == xbt_dynar_length(queue->data) &&
256            (delay < 0 || (xbt_time() - begin) <= delay)) {
257
258       XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
259              queue->capacity);
260       TRY {
261         xbt_cond_timedwait(queue->not_full, queue->mutex,
262                            delay < 0 ? -1 : delay - (xbt_time() - begin));
263       }
264       CATCH_ANONYMOUS {
265         xbt_mutex_release(queue->mutex);
266         RETHROW;
267       }
268     }
269   }
270
271   xbt_dynar_unshift(queue->data, src);
272   xbt_cond_signal(queue->not_empty);
273   xbt_mutex_release(queue->mutex);
274 }
275
276
277 /** @brief Shift something from the message exchange queue, with a timeout.
278  *
279  * @see #xbt_queue_shift
280  *
281  */
282 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
283                            double delay)
284 {
285   double begin = xbt_time();
286
287   xbt_mutex_acquire(queue->mutex);
288
289   if (delay == 0) {
290     if (xbt_dynar_is_empty(queue->data)) {
291       xbt_mutex_release(queue->mutex);
292       THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
293     }
294   } else {
295     while ((xbt_dynar_is_empty(queue->data)) &&
296            (delay < 0 || (xbt_time() - begin) <= delay)) {
297       XBT_DEBUG("Queue %p empty. Waiting", queue);
298       TRY {
299         xbt_cond_timedwait(queue->not_empty, queue->mutex,
300                            delay < 0 ? -1 : delay - (xbt_time() - begin));
301       }
302       CATCH_ANONYMOUS {
303         xbt_mutex_release(queue->mutex);
304         RETHROW;
305       }
306     }
307   }
308
309   if (xbt_dynar_is_empty(queue->data)) {
310     xbt_mutex_release(queue->mutex);
311     THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
312            delay);
313   }
314
315   xbt_dynar_shift(queue->data, dst);
316   xbt_cond_signal(queue->not_full);
317   xbt_mutex_release(queue->mutex);
318 }