Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
e7358c87933d576d2446727a6e6200219c07abce
[simgrid.git] / src / xbt / xbt_queue.c
1 /* $Id$ */
2
3 /* A (synchronized) message queue.                                          */
4 /* Popping an empty queue is blocking, as well as pushing a full one        */
5
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/misc.h"
12 #include "xbt/sysdep.h"
13 #include "xbt/log.h"
14 //#include "xbt/ex.h"
15 #include "xbt/dynar.h"
16
17 #include "xbt/synchro.h"
18 #include "xbt/queue.h" /* this module */
19 #include "gras/virtu.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
21
22 typedef struct s_xbt_queue_ {
23    int capacity;
24    xbt_dynar_t data;
25    xbt_mutex_t mutex;
26    xbt_cond_t  not_full, not_empty;
27 } s_xbt_queue_t;
28
29 /** @brief Create a new message exchange queue.
30  * 
31  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
32  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
33  */
34 xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
35    xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
36    xbt_assert0(capacity>=0,"Capacity cannot be negative");
37
38    res->capacity = capacity;
39    res->data = xbt_dynar_new(elm_size,NULL);
40    res->mutex = xbt_mutex_init();
41    res->not_full = xbt_cond_init();
42    res->not_empty = xbt_cond_init();
43    return res;
44 }
45 /** @brief Destroy a message exchange queue.
46  *
47  * Any remaining content is leaked.
48  */
49 void xbt_queue_free(xbt_queue_t *queue) {
50
51    xbt_dynar_free(&( (*queue)->data ));
52    xbt_mutex_destroy( (*queue)->mutex );
53    xbt_cond_destroy( (*queue)->not_full );
54    xbt_cond_destroy( (*queue)->not_empty );
55    free((*queue));
56    *queue = NULL;
57 }
58
59 /** @brief Get the queue size */
60 unsigned long xbt_queue_length(const xbt_queue_t queue) {
61    unsigned long res;
62    xbt_mutex_acquire(queue->mutex);
63    res=xbt_dynar_length(queue->data);
64    xbt_mutex_release(queue->mutex);
65    return res;
66 }
67
68 /** @brief Push something to the message exchange queue.
69  * 
70  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
71  * 
72  * @see #xbt_dynar_push
73  */
74 void xbt_queue_push(xbt_queue_t queue, const void *src) {
75    xbt_mutex_acquire(queue->mutex);
76    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
77       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
78       xbt_cond_wait(queue->not_full,queue->mutex);
79    }
80    xbt_dynar_push(queue->data,src);
81    xbt_cond_signal(queue->not_empty);
82    xbt_mutex_release(queue->mutex);
83 }
84
85    
86 /** @brief Pop something from the message exchange queue.
87  * 
88  * This is blocking if the queue is empty.
89  * 
90  * @see #xbt_dynar_pop
91  * 
92  */
93 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
94    xbt_mutex_acquire(queue->mutex);
95    while (xbt_dynar_length(queue->data) == 0) {
96       DEBUG1("Queue %p empty. Waiting",queue);
97       xbt_cond_wait(queue->not_empty,queue->mutex);
98    }
99    xbt_dynar_pop(queue->data,dst);
100    xbt_cond_signal(queue->not_full);
101    xbt_mutex_release(queue->mutex);
102 }
103
104 /** @brief Unshift something to the message exchange queue.
105  * 
106  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
107  * 
108  * @see #xbt_dynar_unshift
109  */
110 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
111    xbt_mutex_acquire(queue->mutex);
112    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
113       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
114       xbt_cond_wait(queue->not_full,queue->mutex);
115    }
116    xbt_dynar_unshift(queue->data,src);
117    xbt_cond_signal(queue->not_empty);
118    xbt_mutex_release(queue->mutex);
119 }
120    
121
122 /** @brief Shift something from the message exchange queue.
123  * 
124  * This is blocking if the queue is empty.
125  * 
126  * @see #xbt_dynar_shift
127  * 
128  */
129 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
130    xbt_mutex_acquire(queue->mutex);
131    while (xbt_dynar_length(queue->data) == 0) {
132       DEBUG1("Queue %p empty. Waiting",queue);
133       xbt_cond_wait(queue->not_empty,queue->mutex);
134    }
135    xbt_dynar_shift(queue->data,dst);
136    xbt_cond_signal(queue->not_full);
137    xbt_mutex_release(queue->mutex);
138 }
139
140
141
142
143 /** @brief Push something to the message exchange queue, with a timeout.
144  * 
145  * @see #xbt_queue_push
146  */
147 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
148   double begin = xbt_time();
149   xbt_ex_t e;
150
151   xbt_mutex_acquire(queue->mutex);
152
153   if (delay == 0) {
154     if (queue->capacity != 0 && 
155         queue->capacity == xbt_dynar_length(queue->data)) {
156
157       xbt_mutex_release(queue->mutex);
158       THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
159              queue,queue->capacity);
160     }
161   } else {
162     while (queue->capacity != 0 && 
163            queue->capacity == xbt_dynar_length(queue->data) &&
164            (delay<0 || (xbt_time() - begin) <= delay) ) {
165       
166       DEBUG2("Capacity of %p exceded (=%d). Waiting",
167              queue,queue->capacity);
168       TRY {
169         xbt_cond_timedwait(queue->not_full,queue->mutex,
170                            delay < 0 ? -1 : delay - (xbt_time()-begin));
171       } CATCH(e) {
172         xbt_mutex_release(queue->mutex);
173         RETHROW;
174       }
175     }
176   }
177
178   xbt_dynar_push(queue->data,src);
179   xbt_cond_signal(queue->not_empty);
180   xbt_mutex_release(queue->mutex);
181 }
182
183    
184 /** @brief Pop something from the message exchange queue, with a timeout.
185  * 
186  * @see #xbt_queue_pop
187  * 
188  */
189 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
190   double begin = xbt_time();
191   xbt_ex_t e;
192
193   xbt_mutex_acquire(queue->mutex);
194
195   if (delay == 0) {
196     if (xbt_dynar_length(queue->data) == 0) {
197       xbt_mutex_release(queue->mutex);
198       THROW0(timeout_error,0,"Delay = 0, and queue is empty");
199     }
200   } else {
201     while ( (xbt_dynar_length(queue->data) == 0) && 
202             (delay<0 || (xbt_time() - begin) <= delay) ) {
203       DEBUG1("Queue %p empty. Waiting",queue);
204       TRY {
205         xbt_cond_timedwait(queue->not_empty,queue->mutex, 
206                            delay<0 ? -1 : delay - (xbt_time()-begin));
207       } CATCH(e) {
208         xbt_mutex_release(queue->mutex);
209         RETHROW;
210       }
211     }
212   }
213
214   xbt_dynar_pop(queue->data,dst);
215   xbt_cond_signal(queue->not_full);
216   xbt_mutex_release(queue->mutex);
217 }
218
219 /** @brief Unshift something to the message exchange queue, with a timeout.
220  * 
221  * @see #xbt_queue_unshift
222  */
223 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
224   double begin = xbt_time();
225   xbt_ex_t e;
226
227   xbt_mutex_acquire(queue->mutex);
228
229   if (delay==0) {
230     if (queue->capacity != 0 && 
231         queue->capacity == xbt_dynar_length(queue->data)) {
232
233       xbt_mutex_release(queue->mutex);
234       THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
235              queue,queue->capacity);
236     }
237   } else {
238     while (queue->capacity != 0 && 
239            queue->capacity == xbt_dynar_length(queue->data) &&
240            (delay<0 || (xbt_time() - begin) <= delay) ) {
241       
242       DEBUG2("Capacity of %p exceded (=%d). Waiting",
243              queue,queue->capacity);
244       TRY {
245         xbt_cond_timedwait(queue->not_full,queue->mutex,
246                            delay < 0 ? -1 : delay - (xbt_time()-begin));
247       } CATCH(e) {
248         xbt_mutex_release(queue->mutex);
249         RETHROW;
250       }
251     }
252   }
253
254   xbt_dynar_unshift(queue->data,src);
255   xbt_cond_signal(queue->not_empty);
256   xbt_mutex_release(queue->mutex);
257 }
258    
259
260 /** @brief Shift something from the message exchange queue, with a timeout.
261  * 
262  * @see #xbt_queue_shift
263  * 
264  */
265 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
266   double begin = xbt_time();
267   xbt_ex_t e;
268
269   xbt_mutex_acquire(queue->mutex);
270
271   if (delay == 0) {
272     if (xbt_dynar_length(queue->data) == 0) {
273       xbt_mutex_release(queue->mutex);
274       THROW0(timeout_error,0,"Delay = 0, and queue is empty");
275     }
276   } else {
277     while ( (xbt_dynar_length(queue->data) == 0) && 
278             (delay<0 || (xbt_time() - begin) <= delay) ) {
279       DEBUG1("Queue %p empty. Waiting",queue);
280       TRY {
281         xbt_cond_timedwait(queue->not_empty,queue->mutex, 
282                            delay<0 ? -1 : delay - (xbt_time()-begin));
283       } CATCH(e) {
284         xbt_mutex_release(queue->mutex);
285         RETHROW;
286       }
287     }
288   }
289
290   if (xbt_dynar_length(queue->data) == 0) {
291      xbt_mutex_release(queue->mutex);
292      THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay);
293   }
294    
295   xbt_dynar_shift(queue->data,dst);
296   xbt_cond_signal(queue->not_full);
297   xbt_mutex_release(queue->mutex);
298 }