Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill xbt_dynar_dopar().
[simgrid.git] / src / xbt / xbt_queue.c
1 /* A (synchronized) message queue.                                          */
2 /* Popping an empty queue is blocking, as well as pushing a full one        */
3
4 /* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
5  * All rights reserved.                                                     */
6
7 /* This program is free software; you can redistribute it and/or modify it
8  * under the terms of the license (GNU LGPL) which comes with this package. */
9
10 #include "xbt/misc.h"
11 #include "xbt/sysdep.h"
12 #include "xbt/log.h"
13 #include "xbt/dynar.h"
14
15 #include "xbt/queue.h"          /* this module */
16 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
17                                 "Message exchanging queue");
18
19 typedef struct s_xbt_queue_ {
20   int capacity;
21   xbt_dynar_t data;
22   xbt_mutex_t mutex;
23   xbt_cond_t not_full, not_empty;
24 } s_xbt_queue_t;
25
26 /** @brief Create a new message exchange queue.
27  *
28  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
29  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
30  */
31 xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
32 {
33   xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
34   if (capacity<0)
35     capacity=0;
36
37   res->capacity = capacity;
38   res->data = xbt_dynar_new(elm_size, NULL);
39   res->mutex = xbt_mutex_init();
40   res->not_full = xbt_cond_init();
41   res->not_empty = xbt_cond_init();
42   return res;
43 }
44
45 /** @brief Destroy a message exchange queue.
46  *
47  * Any remaining content is leaked.
48  */
49 void xbt_queue_free(xbt_queue_t * queue)
50 {
51
52   xbt_dynar_free(&((*queue)->data));
53   xbt_mutex_destroy((*queue)->mutex);
54   xbt_cond_destroy((*queue)->not_full);
55   xbt_cond_destroy((*queue)->not_empty);
56   free(*queue);
57   *queue = NULL;
58 }
59
60 /** @brief Get the queue size */
61 unsigned long xbt_queue_length(const xbt_queue_t queue)
62 {
63   unsigned long res;
64   xbt_mutex_acquire(queue->mutex);
65   res = xbt_dynar_length(queue->data);
66   xbt_mutex_release(queue->mutex);
67   return res;
68 }
69
70 /** @brief Push something to the message exchange queue.
71  *
72  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
73  *
74  * @see #xbt_dynar_push
75  */
76 void xbt_queue_push(xbt_queue_t queue, const void *src)
77 {
78   xbt_mutex_acquire(queue->mutex);
79   while (queue->capacity != 0
80          && queue->capacity == xbt_dynar_length(queue->data)) {
81     XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
82            queue->capacity);
83     xbt_cond_wait(queue->not_full, queue->mutex);
84   }
85   xbt_dynar_push(queue->data, src);
86   xbt_cond_signal(queue->not_empty);
87   xbt_mutex_release(queue->mutex);
88 }
89
90
91 /** @brief Pop something from the message exchange queue.
92  *
93  * This is blocking if the queue is empty.
94  *
95  * @see #xbt_dynar_pop
96  *
97  */
98 void xbt_queue_pop(xbt_queue_t queue, void *const dst)
99 {
100   xbt_mutex_acquire(queue->mutex);
101   while (xbt_dynar_is_empty(queue->data)) {
102     XBT_DEBUG("Queue %p empty. Waiting", queue);
103     xbt_cond_wait(queue->not_empty, queue->mutex);
104   }
105   xbt_dynar_pop(queue->data, dst);
106   xbt_cond_signal(queue->not_full);
107   xbt_mutex_release(queue->mutex);
108 }
109
110 /** @brief Unshift something to the message exchange queue.
111  *
112  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
113  *
114  * @see #xbt_dynar_unshift
115  */
116 void xbt_queue_unshift(xbt_queue_t queue, const void *src)
117 {
118   xbt_mutex_acquire(queue->mutex);
119   while (queue->capacity != 0
120          && queue->capacity == xbt_dynar_length(queue->data)) {
121     XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
122            queue->capacity);
123     xbt_cond_wait(queue->not_full, queue->mutex);
124   }
125   xbt_dynar_unshift(queue->data, src);
126   xbt_cond_signal(queue->not_empty);
127   xbt_mutex_release(queue->mutex);
128 }
129
130
131 /** @brief Shift something from the message exchange queue.
132  *
133  * This is blocking if the queue is empty.
134  *
135  * @see #xbt_dynar_shift
136  *
137  */
138 void xbt_queue_shift(xbt_queue_t queue, void *const dst)
139 {
140   xbt_mutex_acquire(queue->mutex);
141   while (xbt_dynar_is_empty(queue->data)) {
142     XBT_DEBUG("Queue %p empty. Waiting", queue);
143     xbt_cond_wait(queue->not_empty, queue->mutex);
144   }
145   xbt_dynar_shift(queue->data, dst);
146   xbt_cond_signal(queue->not_full);
147   xbt_mutex_release(queue->mutex);
148 }
149
150
151
152
153 /** @brief Push something to the message exchange queue, with a timeout.
154  *
155  * @see #xbt_queue_push
156  */
157 void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
158 {
159   double begin = xbt_time();
160
161   xbt_mutex_acquire(queue->mutex);
162
163   if (delay == 0) {
164     if (queue->capacity != 0 &&
165         queue->capacity == xbt_dynar_length(queue->data)) {
166
167       xbt_mutex_release(queue->mutex);
168       THROWF(timeout_error, 0,
169              "Capacity of %p exceeded (=%d), and delay = 0", queue,
170              queue->capacity);
171     }
172   } else {
173     while (queue->capacity != 0 &&
174            queue->capacity == xbt_dynar_length(queue->data) &&
175            (delay < 0 || (xbt_time() - begin) <= delay)) {
176
177       XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
178              queue->capacity);
179       TRY {
180         xbt_cond_timedwait(queue->not_full, queue->mutex,
181                            delay < 0 ? -1 : delay - (xbt_time() - begin));
182       }
183       CATCH_ANONYMOUS {
184         xbt_mutex_release(queue->mutex);
185         RETHROW;
186       }
187     }
188   }
189
190   xbt_dynar_push(queue->data, src);
191   xbt_cond_signal(queue->not_empty);
192   xbt_mutex_release(queue->mutex);
193 }
194
195
196 /** @brief Pop something from the message exchange queue, with a timeout.
197  *
198  * @see #xbt_queue_pop
199  *
200  */
201 void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
202 {
203   double begin = xbt_time();
204
205   xbt_mutex_acquire(queue->mutex);
206
207   if (delay == 0) {
208     if (xbt_dynar_is_empty(queue->data)) {
209       xbt_mutex_release(queue->mutex);
210       THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
211     }
212   } else {
213     while ((xbt_dynar_is_empty(queue->data)) &&
214            (delay < 0 || (xbt_time() - begin) <= delay)) {
215       XBT_DEBUG("Queue %p empty. Waiting", queue);
216       TRY {
217         xbt_cond_timedwait(queue->not_empty, queue->mutex,
218                            delay < 0 ? -1 : delay - (xbt_time() - begin));
219       }
220       CATCH_ANONYMOUS {
221         xbt_mutex_release(queue->mutex);
222         RETHROW;
223       }
224     }
225   }
226
227   xbt_dynar_pop(queue->data, dst);
228   xbt_cond_signal(queue->not_full);
229   xbt_mutex_release(queue->mutex);
230 }
231
232 /** @brief Unshift something to the message exchange queue, with a timeout.
233  *
234  * @see #xbt_queue_unshift
235  */
236 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
237                              double delay)
238 {
239   double begin = xbt_time();
240
241   xbt_mutex_acquire(queue->mutex);
242
243   if (delay == 0) {
244     if (queue->capacity != 0 &&
245         queue->capacity == xbt_dynar_length(queue->data)) {
246
247       xbt_mutex_release(queue->mutex);
248       THROWF(timeout_error, 0,
249              "Capacity of %p exceeded (=%d), and delay = 0", queue,
250              queue->capacity);
251     }
252   } else {
253     while (queue->capacity != 0 &&
254            queue->capacity == xbt_dynar_length(queue->data) &&
255            (delay < 0 || (xbt_time() - begin) <= delay)) {
256
257       XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
258              queue->capacity);
259       TRY {
260         xbt_cond_timedwait(queue->not_full, queue->mutex,
261                            delay < 0 ? -1 : delay - (xbt_time() - begin));
262       }
263       CATCH_ANONYMOUS {
264         xbt_mutex_release(queue->mutex);
265         RETHROW;
266       }
267     }
268   }
269
270   xbt_dynar_unshift(queue->data, src);
271   xbt_cond_signal(queue->not_empty);
272   xbt_mutex_release(queue->mutex);
273 }
274
275
276 /** @brief Shift something from the message exchange queue, with a timeout.
277  *
278  * @see #xbt_queue_shift
279  *
280  */
281 void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
282                            double delay)
283 {
284   double begin = xbt_time();
285
286   xbt_mutex_acquire(queue->mutex);
287
288   if (delay == 0) {
289     if (xbt_dynar_is_empty(queue->data)) {
290       xbt_mutex_release(queue->mutex);
291       THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
292     }
293   } else {
294     while ((xbt_dynar_is_empty(queue->data)) &&
295            (delay < 0 || (xbt_time() - begin) <= delay)) {
296       XBT_DEBUG("Queue %p empty. Waiting", queue);
297       TRY {
298         xbt_cond_timedwait(queue->not_empty, queue->mutex,
299                            delay < 0 ? -1 : delay - (xbt_time() - begin));
300       }
301       CATCH_ANONYMOUS {
302         xbt_mutex_release(queue->mutex);
303         RETHROW;
304       }
305     }
306   }
307
308   if (xbt_dynar_is_empty(queue->data)) {
309     xbt_mutex_release(queue->mutex);
310     THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
311            delay);
312   }
313
314   xbt_dynar_shift(queue->data, dst);
315   xbt_cond_signal(queue->not_full);
316   xbt_mutex_release(queue->mutex);
317 }