Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Introduce log appenders
[simgrid.git] / src / xbt / xbt_queue.c
1 /* $Id$ */
2
3 /* A (synchronized) message queue.                                          */
4 /* Popping an empty queue is blocking, as well as pushing a full one        */
5
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/misc.h"
12 #include "xbt/sysdep.h"
13 #include "xbt/log.h"
14 #include "xbt/dynar.h"
15
16 #include "xbt/synchro.h"
17 #include "xbt/queue.h" /* this module */
18 #include "gras/virtu.h"
19 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
20
21 typedef struct s_xbt_queue_ {
22   int capacity;
23   xbt_dynar_t data;
24   xbt_mutex_t mutex;
25   xbt_cond_t  not_full, not_empty;
26 } s_xbt_queue_t;
27
28 /** @brief Create a new message exchange queue.
29  *
30  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
31  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
32  */
33 xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
34   xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
35   xbt_assert0(capacity>=0,"Capacity cannot be negative");
36
37   res->capacity = capacity;
38   res->data = xbt_dynar_new(elm_size,NULL);
39   res->mutex = xbt_mutex_init();
40   res->not_full = xbt_cond_init();
41   res->not_empty = xbt_cond_init();
42   return res;
43 }
44 /** @brief Destroy a message exchange queue.
45  *
46  * Any remaining content is leaked.
47  */
48 void xbt_queue_free(xbt_queue_t *queue) {
49
50   xbt_dynar_free(&( (*queue)->data ));
51   xbt_mutex_destroy( (*queue)->mutex );
52   xbt_cond_destroy( (*queue)->not_full );
53   xbt_cond_destroy( (*queue)->not_empty );
54   free((*queue));
55   *queue = NULL;
56 }
57
58 /** @brief Get the queue size */
59 unsigned long xbt_queue_length(const xbt_queue_t queue) {
60   unsigned long res;
61   xbt_mutex_acquire(queue->mutex);
62   res=xbt_dynar_length(queue->data);
63   xbt_mutex_release(queue->mutex);
64   return res;
65 }
66
67 /** @brief Push something to the message exchange queue.
68  *
69  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
70  *
71  * @see #xbt_dynar_push
72  */
73 void xbt_queue_push(xbt_queue_t queue, const void *src) {
74   xbt_mutex_acquire(queue->mutex);
75   while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
76     DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
77     xbt_cond_wait(queue->not_full,queue->mutex);
78   }
79   xbt_dynar_push(queue->data,src);
80   xbt_cond_signal(queue->not_empty);
81   xbt_mutex_release(queue->mutex);
82 }
83
84
85 /** @brief Pop something from the message exchange queue.
86  *
87  * This is blocking if the queue is empty.
88  *
89  * @see #xbt_dynar_pop
90  *
91  */
92 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
93   xbt_mutex_acquire(queue->mutex);
94   while (xbt_dynar_length(queue->data) == 0) {
95     DEBUG1("Queue %p empty. Waiting",queue);
96     xbt_cond_wait(queue->not_empty,queue->mutex);
97   }
98   xbt_dynar_pop(queue->data,dst);
99   xbt_cond_signal(queue->not_full);
100   xbt_mutex_release(queue->mutex);
101 }
102
103 /** @brief Unshift something to the message exchange queue.
104  *
105  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
106  *
107  * @see #xbt_dynar_unshift
108  */
109 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
110   xbt_mutex_acquire(queue->mutex);
111   while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
112     DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
113     xbt_cond_wait(queue->not_full,queue->mutex);
114   }
115   xbt_dynar_unshift(queue->data,src);
116   xbt_cond_signal(queue->not_empty);
117   xbt_mutex_release(queue->mutex);
118 }
119
120
121 /** @brief Shift something from the message exchange queue.
122  *
123  * This is blocking if the queue is empty.
124  *
125  * @see #xbt_dynar_shift
126  *
127  */
128 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
129   xbt_mutex_acquire(queue->mutex);
130   while (xbt_dynar_length(queue->data) == 0) {
131     DEBUG1("Queue %p empty. Waiting",queue);
132     xbt_cond_wait(queue->not_empty,queue->mutex);
133   }
134   xbt_dynar_shift(queue->data,dst);
135   xbt_cond_signal(queue->not_full);
136   xbt_mutex_release(queue->mutex);
137 }
138
139
140
141
142 /** @brief Push something to the message exchange queue, with a timeout.
143  *
144  * @see #xbt_queue_push
145  */
146 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
147   double begin = xbt_time();
148   xbt_ex_t e;
149
150   xbt_mutex_acquire(queue->mutex);
151
152   if (delay == 0) {
153     if (queue->capacity != 0 &&
154         queue->capacity == xbt_dynar_length(queue->data)) {
155
156       xbt_mutex_release(queue->mutex);
157       THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
158              queue,queue->capacity);
159     }
160   } else {
161     while (queue->capacity != 0 &&
162         queue->capacity == xbt_dynar_length(queue->data) &&
163         (delay<0 || (xbt_time() - begin) <= delay) ) {
164
165       DEBUG2("Capacity of %p exceded (=%d). Waiting",
166              queue,queue->capacity);
167       TRY {
168         xbt_cond_timedwait(queue->not_full,queue->mutex,
169                            delay < 0 ? -1 : delay - (xbt_time()-begin));
170       } CATCH(e) {
171         xbt_mutex_release(queue->mutex);
172         RETHROW;
173       }
174     }
175   }
176
177   xbt_dynar_push(queue->data,src);
178   xbt_cond_signal(queue->not_empty);
179   xbt_mutex_release(queue->mutex);
180 }
181
182
183 /** @brief Pop something from the message exchange queue, with a timeout.
184  *
185  * @see #xbt_queue_pop
186  *
187  */
188 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
189   double begin = xbt_time();
190   xbt_ex_t e;
191
192   xbt_mutex_acquire(queue->mutex);
193
194   if (delay == 0) {
195     if (xbt_dynar_length(queue->data) == 0) {
196       xbt_mutex_release(queue->mutex);
197       THROW0(timeout_error,0,"Delay = 0, and queue is empty");
198     }
199   } else {
200     while ( (xbt_dynar_length(queue->data) == 0) &&
201         (delay<0 || (xbt_time() - begin) <= delay) ) {
202       DEBUG1("Queue %p empty. Waiting",queue);
203       TRY {
204         xbt_cond_timedwait(queue->not_empty,queue->mutex,
205                            delay<0 ? -1 : delay - (xbt_time()-begin));
206       } CATCH(e) {
207         xbt_mutex_release(queue->mutex);
208         RETHROW;
209       }
210     }
211   }
212
213   xbt_dynar_pop(queue->data,dst);
214   xbt_cond_signal(queue->not_full);
215   xbt_mutex_release(queue->mutex);
216 }
217
218 /** @brief Unshift something to the message exchange queue, with a timeout.
219  *
220  * @see #xbt_queue_unshift
221  */
222 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
223   double begin = xbt_time();
224   xbt_ex_t e;
225
226   xbt_mutex_acquire(queue->mutex);
227
228   if (delay==0) {
229     if (queue->capacity != 0 &&
230         queue->capacity == xbt_dynar_length(queue->data)) {
231
232       xbt_mutex_release(queue->mutex);
233       THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
234              queue,queue->capacity);
235     }
236   } else {
237     while (queue->capacity != 0 &&
238         queue->capacity == xbt_dynar_length(queue->data) &&
239         (delay<0 || (xbt_time() - begin) <= delay) ) {
240
241       DEBUG2("Capacity of %p exceded (=%d). Waiting",
242              queue,queue->capacity);
243       TRY {
244         xbt_cond_timedwait(queue->not_full,queue->mutex,
245                            delay < 0 ? -1 : delay - (xbt_time()-begin));
246       } CATCH(e) {
247         xbt_mutex_release(queue->mutex);
248         RETHROW;
249       }
250     }
251   }
252
253   xbt_dynar_unshift(queue->data,src);
254   xbt_cond_signal(queue->not_empty);
255   xbt_mutex_release(queue->mutex);
256 }
257
258
259 /** @brief Shift something from the message exchange queue, with a timeout.
260  *
261  * @see #xbt_queue_shift
262  *
263  */
264 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
265   double begin = xbt_time();
266   xbt_ex_t e;
267
268   xbt_mutex_acquire(queue->mutex);
269
270   if (delay == 0) {
271     if (xbt_dynar_length(queue->data) == 0) {
272       xbt_mutex_release(queue->mutex);
273       THROW0(timeout_error,0,"Delay = 0, and queue is empty");
274     }
275   } else {
276     while ( (xbt_dynar_length(queue->data) == 0) &&
277         (delay<0 || (xbt_time() - begin) <= delay) ) {
278       DEBUG1("Queue %p empty. Waiting",queue);
279       TRY {
280         xbt_cond_timedwait(queue->not_empty,queue->mutex,
281                            delay<0 ? -1 : delay - (xbt_time()-begin));
282       } CATCH(e) {
283         xbt_mutex_release(queue->mutex);
284         RETHROW;
285       }
286     }
287   }
288
289   if (xbt_dynar_length(queue->data) == 0) {
290     xbt_mutex_release(queue->mutex);
291     THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay);
292   }
293
294   xbt_dynar_shift(queue->data,dst);
295   xbt_cond_signal(queue->not_full);
296   xbt_mutex_release(queue->mutex);
297 }