Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
change mmalloc.h into a public header
[simgrid.git] / src / xbt / xbt_queue.c
1 /* A (synchronized) message queue.                                          */
2 /* Popping an empty queue is blocking, as well as pushing a full one        */
3
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/misc.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "xbt/dynar.h"
14
15 #include "xbt/synchro.h"
16 #include "xbt/queue.h"          /* this module */
17 #include "gras/virtu.h"
18 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt, "Message exchanging queue");
19
20 typedef struct s_xbt_queue_ {
21   int capacity;
22   xbt_dynar_t data;
23   xbt_mutex_t mutex;
24   xbt_cond_t not_full, not_empty;
25 } s_xbt_queue_t;
26
27 /** @brief Create a new message exchange queue.
28  *
29  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
30  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
31  */
32 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
33 {
34   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
35   xbt_assert0(capacity >= 0, "Capacity cannot be negative");
36
37   res->capacity = capacity;
38   res->data = xbt_dynar_new(elm_size, NULL);
39   res->mutex = xbt_mutex_init();
40   res->not_full = xbt_cond_init();
41   res->not_empty = xbt_cond_init();
42   return res;
43 }
44
45 /** @brief Destroy a message exchange queue.
46  *
47  * Any remaining content is leaked.
48  */
49 void xbt_queue_free(xbt_queue_t * queue)
50 {
51
52   xbt_dynar_free(&((*queue)->data));
53   xbt_mutex_destroy((*queue)->mutex);
54   xbt_cond_destroy((*queue)->not_full);
55   xbt_cond_destroy((*queue)->not_empty);
56   free((*queue));
57   *queue = NULL;
58 }
59
60 /** @brief Get the queue size */
61 unsigned long xbt_queue_length(const xbt_queue_t queue)
62 {
63   unsigned long res;
64   xbt_mutex_acquire(queue->mutex);
65   res = xbt_dynar_length(queue->data);
66   xbt_mutex_release(queue->mutex);
67   return res;
68 }
69
70 /** @brief Push something to the message exchange queue.
71  *
72  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
73  *
74  * @see #xbt_dynar_push
75  */
76 void xbt_queue_push(xbt_queue_t queue, const void *src)
77 {
78   xbt_mutex_acquire(queue->mutex);
79   while (queue->capacity != 0
80          && queue->capacity == xbt_dynar_length(queue->data)) {
81     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
82     xbt_cond_wait(queue->not_full, queue->mutex);
83   }
84   xbt_dynar_push(queue->data, src);
85   xbt_cond_signal(queue->not_empty);
86   xbt_mutex_release(queue->mutex);
87 }
88
89
90 /** @brief Pop something from the message exchange queue.
91  *
92  * This is blocking if the queue is empty.
93  *
94  * @see #xbt_dynar_pop
95  *
96  */
97 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
98 {
99   xbt_mutex_acquire(queue->mutex);
100   while (xbt_dynar_length(queue->data) == 0) {
101     DEBUG1("Queue %p empty. Waiting", queue);
102     xbt_cond_wait(queue->not_empty, queue->mutex);
103   }
104   xbt_dynar_pop(queue->data, dst);
105   xbt_cond_signal(queue->not_full);
106   xbt_mutex_release(queue->mutex);
107 }
108
109 /** @brief Unshift something to the message exchange queue.
110  *
111  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
112  *
113  * @see #xbt_dynar_unshift
114  */
115 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
116 {
117   xbt_mutex_acquire(queue->mutex);
118   while (queue->capacity != 0
119          && queue->capacity == xbt_dynar_length(queue->data)) {
120     DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
121     xbt_cond_wait(queue->not_full, queue->mutex);
122   }
123   xbt_dynar_unshift(queue->data, src);
124   xbt_cond_signal(queue->not_empty);
125   xbt_mutex_release(queue->mutex);
126 }
127
128
129 /** @brief Shift something from the message exchange queue.
130  *
131  * This is blocking if the queue is empty.
132  *
133  * @see #xbt_dynar_shift
134  *
135  */
136 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
137 {
138   xbt_mutex_acquire(queue->mutex);
139   while (xbt_dynar_length(queue->data) == 0) {
140     DEBUG1("Queue %p empty. Waiting", queue);
141     xbt_cond_wait(queue->not_empty, queue->mutex);
142   }
143   xbt_dynar_shift(queue->data, dst);
144   xbt_cond_signal(queue->not_full);
145   xbt_mutex_release(queue->mutex);
146 }
147
148
149
150
151 /** @brief Push something to the message exchange queue, with a timeout.
152  *
153  * @see #xbt_queue_push
154  */
155 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
156 {
157   double begin = xbt_time();
158   xbt_ex_t e;
159
160   xbt_mutex_acquire(queue->mutex);
161
162   if (delay == 0) {
163     if (queue->capacity != 0 &&
164         queue->capacity == xbt_dynar_length(queue->data)) {
165
166       xbt_mutex_release(queue->mutex);
167       THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
168              queue, queue->capacity);
169     }
170   } else {
171     while (queue->capacity != 0 &&
172            queue->capacity == xbt_dynar_length(queue->data) &&
173            (delay < 0 || (xbt_time() - begin) <= delay)) {
174
175       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
176       TRY {
177         xbt_cond_timedwait(queue->not_full, queue->mutex,
178                            delay < 0 ? -1 : delay - (xbt_time() - begin));
179       }
180       CATCH(e) {
181         xbt_mutex_release(queue->mutex);
182         RETHROW;
183       }
184     }
185   }
186
187   xbt_dynar_push(queue->data, src);
188   xbt_cond_signal(queue->not_empty);
189   xbt_mutex_release(queue->mutex);
190 }
191
192
193 /** @brief Pop something from the message exchange queue, with a timeout.
194  *
195  * @see #xbt_queue_pop
196  *
197  */
198 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
199 {
200   double begin = xbt_time();
201   xbt_ex_t e;
202
203   xbt_mutex_acquire(queue->mutex);
204
205   if (delay == 0) {
206     if (xbt_dynar_length(queue->data) == 0) {
207       xbt_mutex_release(queue->mutex);
208       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
209     }
210   } else {
211     while ((xbt_dynar_length(queue->data) == 0) &&
212            (delay < 0 || (xbt_time() - begin) <= delay)) {
213       DEBUG1("Queue %p empty. Waiting", queue);
214       TRY {
215         xbt_cond_timedwait(queue->not_empty, queue->mutex,
216                            delay < 0 ? -1 : delay - (xbt_time() - begin));
217       }
218       CATCH(e) {
219         xbt_mutex_release(queue->mutex);
220         RETHROW;
221       }
222     }
223   }
224
225   xbt_dynar_pop(queue->data, dst);
226   xbt_cond_signal(queue->not_full);
227   xbt_mutex_release(queue->mutex);
228 }
229
230 /** @brief Unshift something to the message exchange queue, with a timeout.
231  *
232  * @see #xbt_queue_unshift
233  */
234 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src, double delay)
235 {
236   double begin = xbt_time();
237   xbt_ex_t e;
238
239   xbt_mutex_acquire(queue->mutex);
240
241   if (delay == 0) {
242     if (queue->capacity != 0 &&
243         queue->capacity == xbt_dynar_length(queue->data)) {
244
245       xbt_mutex_release(queue->mutex);
246       THROW2(timeout_error, 0, "Capacity of %p exceded (=%d), and delay = 0",
247              queue, queue->capacity);
248     }
249   } else {
250     while (queue->capacity != 0 &&
251            queue->capacity == xbt_dynar_length(queue->data) &&
252            (delay < 0 || (xbt_time() - begin) <= delay)) {
253
254       DEBUG2("Capacity of %p exceded (=%d). Waiting", queue, queue->capacity);
255       TRY {
256         xbt_cond_timedwait(queue->not_full, queue->mutex,
257                            delay < 0 ? -1 : delay - (xbt_time() - begin));
258       }
259       CATCH(e) {
260         xbt_mutex_release(queue->mutex);
261         RETHROW;
262       }
263     }
264   }
265
266   xbt_dynar_unshift(queue->data, src);
267   xbt_cond_signal(queue->not_empty);
268   xbt_mutex_release(queue->mutex);
269 }
270
271
272 /** @brief Shift something from the message exchange queue, with a timeout.
273  *
274  * @see #xbt_queue_shift
275  *
276  */
277 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst, double delay)
278 {
279   double begin = xbt_time();
280   xbt_ex_t e;
281
282   xbt_mutex_acquire(queue->mutex);
283
284   if (delay == 0) {
285     if (xbt_dynar_length(queue->data) == 0) {
286       xbt_mutex_release(queue->mutex);
287       THROW0(timeout_error, 0, "Delay = 0, and queue is empty");
288     }
289   } else {
290     while ((xbt_dynar_length(queue->data) == 0) &&
291            (delay < 0 || (xbt_time() - begin) <= delay)) {
292       DEBUG1("Queue %p empty. Waiting", queue);
293       TRY {
294         xbt_cond_timedwait(queue->not_empty, queue->mutex,
295                            delay < 0 ? -1 : delay - (xbt_time() - begin));
296       }
297       CATCH(e) {
298         xbt_mutex_release(queue->mutex);
299         RETHROW;
300       }
301     }
302   }
303
304   if (xbt_dynar_length(queue->data) == 0) {
305     xbt_mutex_release(queue->mutex);
306     THROW1(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
307            delay);
308   }
309
310   xbt_dynar_shift(queue->data, dst);
311   xbt_cond_signal(queue->not_full);
312   xbt_mutex_release(queue->mutex);
313 }