Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
50a3d300bd711f42463f0f6dc5063ff8ad6f53fd
[simgrid.git] / src / xbt / xbt_queue.c
1 /* $Id$ */
2
3 /* A (synchronized) message queue.                                          */
4 /* Popping an empty queue is blocking, as well as pushing a full one        */
5
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/misc.h"
12 #include "xbt/sysdep.h"
13 #include "xbt/log.h"
14 //#include "xbt/ex.h"
15 #include "xbt/dynar.h"
16
17 #include "xbt/synchro.h"
18 #include "xbt/queue.h" /* this module */
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
21
22 typedef struct s_xbt_queue_ {
23    int capacity;
24    xbt_dynar_t data;
25    xbt_mutex_t mutex;
26    xbt_cond_t  not_full, not_empty;
27 } s_xbt_queue_t;
28
29 /** @brief Create a new message exchange queue.
30  * 
31  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
32  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
33  */
34 xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
35    xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
36    xbt_assert0(capacity>=0,"Capacity cannot be negative");
37
38    res->capacity = capacity;
39    res->data = xbt_dynar_new(elm_size,NULL);
40    res->mutex = xbt_mutex_init();
41    res->not_full = xbt_cond_init();
42    res->not_empty = xbt_cond_init();
43    return res;
44 }
45 /** @brief Destroy a message exchange queue.
46  *
47  * Any remaining content is leaked.
48  */
49 void xbt_queue_free(xbt_queue_t *queue) {
50
51    xbt_dynar_free(&( (*queue)->data ));
52    xbt_mutex_destroy( (*queue)->mutex );
53    xbt_cond_destroy( (*queue)->not_full );
54    xbt_cond_destroy( (*queue)->not_empty );
55    free((*queue));
56    *queue = NULL;
57 }
58
59 /** @brief Get the queue size */
60 unsigned long xbt_queue_length(xbt_queue_t queue) {
61    unsigned long res;
62    xbt_mutex_lock(queue->mutex);
63    res=xbt_dynar_length(queue->data);
64    xbt_mutex_unlock(queue->mutex);
65    return res;
66 }
67
68 /** @brief Push something to the message exchange queue.
69  * 
70  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
71  * 
72  * @seealso #xbt_dynar_push
73  */
74 void xbt_queue_push(xbt_queue_t queue, const void *src) {
75    xbt_mutex_lock(queue->mutex);
76    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
77       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
78       xbt_cond_wait(queue->not_full,queue->mutex);
79    }
80    xbt_dynar_push(queue->data,src);
81    xbt_cond_signal(queue->not_empty);
82    xbt_mutex_unlock(queue->mutex);
83 }
84
85    
86 /** @brief Pop something from the message exchange queue.
87  * 
88  * This is blocking if the queue is empty.
89  * 
90  * @seealso #xbt_dynar_pop
91  * 
92  */
93 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
94    xbt_mutex_lock(queue->mutex);
95    while (xbt_dynar_length(queue->data) == 0) {
96       DEBUG1("Queue %p empty. Waiting",queue);
97       xbt_cond_wait(queue->not_empty,queue->mutex);
98    }
99    xbt_dynar_pop(queue->data,dst);
100    xbt_cond_signal(queue->not_full);
101    xbt_mutex_unlock(queue->mutex);
102 }
103
104 /** @brief Unshift something to the message exchange queue.
105  * 
106  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
107  * 
108  * @seealso #xbt_dynar_unshift
109  */
110 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
111    xbt_mutex_lock(queue->mutex);
112    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
113       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
114       xbt_cond_wait(queue->not_full,queue->mutex);
115    }
116    xbt_dynar_unshift(queue->data,src);
117    xbt_cond_signal(queue->not_empty);
118    xbt_mutex_unlock(queue->mutex);
119 }
120    
121
122 /** @brief Shift something from the message exchange queue.
123  * 
124  * This is blocking if the queue is empty.
125  * 
126  * @seealso #xbt_dynar_shift
127  * 
128  */
129 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
130    xbt_mutex_lock(queue->mutex);
131    while (xbt_dynar_length(queue->data) == 0) {
132       DEBUG1("Queue %p empty. Waiting",queue);
133       xbt_cond_wait(queue->not_empty,queue->mutex);
134    }
135    xbt_dynar_shift(queue->data,dst);
136    xbt_cond_signal(queue->not_full);
137    xbt_mutex_unlock(queue->mutex);
138 }
139
140
141
142
143 /** @brief Push something to the message exchange queue, with a timeout.
144  * 
145  * @seealso #xbt_queue_push
146  */
147 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
148    xbt_mutex_lock(queue->mutex);
149    if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
150       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
151       xbt_cond_timedwait(queue->not_full,queue->mutex, delay);
152          }
153                 /* check if a timeout occurs */
154          if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
155                         xbt_mutex_unlock(queue->mutex);
156                         THROW0(timeout_error,0,"Timeout");
157          }
158          else {
159                 xbt_dynar_push(queue->data,src);
160                 xbt_cond_signal(queue->not_empty);
161                 xbt_mutex_unlock(queue->mutex);
162          }
163 }
164
165    
166 /** @brief Pop something from the message exchange queue, with a timeout.
167  * 
168  * @seealso #xbt_queue_pop
169  * 
170  */
171 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
172    xbt_mutex_lock(queue->mutex);
173    if (xbt_dynar_length(queue->data) == 0) {
174       DEBUG1("Queue %p empty. Waiting",queue);
175       xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
176    }
177          /* check if a timeout occurs */
178    if (xbt_dynar_length(queue->data) == 0) {
179                         xbt_mutex_unlock(queue->mutex);
180                         THROW0(timeout_error,0,"Timeout");
181          }
182          else {
183                         xbt_dynar_pop(queue->data,dst);
184                         xbt_cond_signal(queue->not_full);
185                         xbt_mutex_unlock(queue->mutex);
186          }
187 }
188
189 /** @brief Unshift something to the message exchange queue, with a timeout.
190  * 
191  * @seealso #xbt_queue_unshift
192  */
193 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
194    xbt_mutex_lock(queue->mutex);
195    if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
196       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
197       xbt_cond_timedwait(queue->not_full,queue->mutex,delay);
198    }
199          /* check if a timeout occurs */
200
201    if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
202
203                         xbt_mutex_unlock(queue->mutex);
204                         THROW0(timeout_error,0,"Timeout");
205          }
206          else {
207                 xbt_dynar_unshift(queue->data,src);
208                 xbt_cond_signal(queue->not_empty);
209                 xbt_mutex_unlock(queue->mutex);
210          }
211 }
212    
213
214 /** @brief Shift something from the message exchange queue, with a timeout.
215  * 
216  * @seealso #xbt_queue_shift
217  * 
218  */
219 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
220    xbt_mutex_lock(queue->mutex);
221    if (xbt_dynar_length(queue->data) == 0) {
222       DEBUG1("Queue %p empty. Waiting",queue);
223       xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
224    }
225          /* check if a timeout occurs */
226    if (xbt_dynar_length(queue->data) == 0) {
227                         xbt_mutex_unlock(queue->mutex);
228                         THROW0(timeout_error,0,"Timeout");
229          }
230          else {
231                 xbt_dynar_shift(queue->data,dst);
232                 xbt_cond_signal(queue->not_full);
233                 xbt_mutex_unlock(queue->mutex);
234          }
235 }