Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Added xbt_os_time and xbt_os_sleep. Execute in the real and simulate systems.
[simgrid.git] / src / xbt / xbt_queue.c
1 /* $Id$ */
2
3 /* A (synchronized) message queue.                                          */
4 /* Popping an empty queue is blocking, as well as pushing a full one        */
5
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/misc.h"
12 #include "xbt/sysdep.h"
13 #include "xbt/log.h"
14 //#include "xbt/ex.h"
15 #include "xbt/dynar.h"
16
17 #include "xbt/synchro.h"
18 #include "xbt/queue.h" /* this module */
19 #include "gras/virtu.h"
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
21
22 typedef struct s_xbt_queue_ {
23    int capacity;
24    xbt_dynar_t data;
25    xbt_mutex_t mutex;
26    xbt_cond_t  not_full, not_empty;
27 } s_xbt_queue_t;
28
29 /** @brief Create a new message exchange queue.
30  * 
31  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
32  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
33  */
34 xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
35    xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
36    xbt_assert0(capacity>=0,"Capacity cannot be negative");
37
38    res->capacity = capacity;
39    res->data = xbt_dynar_new(elm_size,NULL);
40    res->mutex = xbt_mutex_init();
41    res->not_full = xbt_cond_init();
42    res->not_empty = xbt_cond_init();
43    return res;
44 }
45 /** @brief Destroy a message exchange queue.
46  *
47  * Any remaining content is leaked.
48  */
49 void xbt_queue_free(xbt_queue_t *queue) {
50
51    xbt_dynar_free(&( (*queue)->data ));
52    xbt_mutex_destroy( (*queue)->mutex );
53    xbt_cond_destroy( (*queue)->not_full );
54    xbt_cond_destroy( (*queue)->not_empty );
55    free((*queue));
56    *queue = NULL;
57 }
58
59 /** @brief Get the queue size */
60 unsigned long xbt_queue_length(xbt_queue_t queue) {
61    unsigned long res;
62    xbt_mutex_lock(queue->mutex);
63    res=xbt_dynar_length(queue->data);
64    xbt_mutex_unlock(queue->mutex);
65    return res;
66 }
67
68 /** @brief Push something to the message exchange queue.
69  * 
70  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
71  * 
72  * @seealso #xbt_dynar_push
73  */
74 void xbt_queue_push(xbt_queue_t queue, const void *src) {
75    xbt_mutex_lock(queue->mutex);
76    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
77       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
78       xbt_cond_wait(queue->not_full,queue->mutex);
79    }
80    xbt_dynar_push(queue->data,src);
81    xbt_cond_signal(queue->not_empty);
82    xbt_mutex_unlock(queue->mutex);
83 }
84
85    
86 /** @brief Pop something from the message exchange queue.
87  * 
88  * This is blocking if the queue is empty.
89  * 
90  * @seealso #xbt_dynar_pop
91  * 
92  */
93 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
94    xbt_mutex_lock(queue->mutex);
95    while (xbt_dynar_length(queue->data) == 0) {
96       DEBUG1("Queue %p empty. Waiting",queue);
97       xbt_cond_wait(queue->not_empty,queue->mutex);
98    }
99    xbt_dynar_pop(queue->data,dst);
100    xbt_cond_signal(queue->not_full);
101    xbt_mutex_unlock(queue->mutex);
102 }
103
104 /** @brief Unshift something to the message exchange queue.
105  * 
106  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
107  * 
108  * @seealso #xbt_dynar_unshift
109  */
110 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
111    xbt_mutex_lock(queue->mutex);
112    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
113       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
114       xbt_cond_wait(queue->not_full,queue->mutex);
115    }
116    xbt_dynar_unshift(queue->data,src);
117    xbt_cond_signal(queue->not_empty);
118    xbt_mutex_unlock(queue->mutex);
119 }
120    
121
122 /** @brief Shift something from the message exchange queue.
123  * 
124  * This is blocking if the queue is empty.
125  * 
126  * @seealso #xbt_dynar_shift
127  * 
128  */
129 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
130    xbt_mutex_lock(queue->mutex);
131    while (xbt_dynar_length(queue->data) == 0) {
132       DEBUG1("Queue %p empty. Waiting",queue);
133       xbt_cond_wait(queue->not_empty,queue->mutex);
134    }
135    xbt_dynar_shift(queue->data,dst);
136    xbt_cond_signal(queue->not_full);
137    xbt_mutex_unlock(queue->mutex);
138 }
139
140
141
142
143 /** @brief Push something to the message exchange queue, with a timeout.
144  * 
145  * @seealso #xbt_queue_push
146  */
147 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
148          double timeout = xbt_os_time() + delay;
149    xbt_mutex_lock(queue->mutex);
150    if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
151       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
152       xbt_cond_timedwait(queue->not_full,queue->mutex, delay);
153                         /* check if a timeout occurs */
154                         if (xbt_os_time() >= timeout) {
155                                 xbt_mutex_unlock(queue->mutex);
156                                 THROW0(timeout_error,0,"Timeout");
157                         }
158          }
159          xbt_dynar_push(queue->data,src);
160          xbt_cond_signal(queue->not_empty);
161          xbt_mutex_unlock(queue->mutex);
162 }
163
164    
165 /** @brief Pop something from the message exchange queue, with a timeout.
166  * 
167  * @seealso #xbt_queue_pop
168  * 
169  */
170 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
171          double timeout = xbt_os_time() + delay;
172    xbt_mutex_lock(queue->mutex);
173    if (xbt_dynar_length(queue->data) == 0) {
174       DEBUG1("Queue %p empty. Waiting",queue);
175       xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
176                         /* check if a timeout occurs */
177                         if (xbt_os_time() >= timeout) {
178                                 xbt_mutex_unlock(queue->mutex);
179                                 THROW0(timeout_error,0,"Timeout");
180                         }
181    }
182          xbt_dynar_pop(queue->data,dst);
183          xbt_cond_signal(queue->not_full);
184          xbt_mutex_unlock(queue->mutex);
185 }
186
187 /** @brief Unshift something to the message exchange queue, with a timeout.
188  * 
189  * @seealso #xbt_queue_unshift
190  */
191 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
192          double timeout = xbt_os_time() + delay;
193    xbt_mutex_lock(queue->mutex);
194    if (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
195       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
196       xbt_cond_timedwait(queue->not_full,queue->mutex,delay);
197                         /* check if a timeout occurs */
198                         if (xbt_os_time() >= timeout) {
199                                 xbt_mutex_unlock(queue->mutex);
200                                 THROW0(timeout_error,0,"Timeout");
201                         }
202    }
203          xbt_dynar_unshift(queue->data,src);
204          xbt_cond_signal(queue->not_empty);
205          xbt_mutex_unlock(queue->mutex);
206 }
207    
208
209 /** @brief Shift something from the message exchange queue, with a timeout.
210  * 
211  * @seealso #xbt_queue_shift
212  * 
213  */
214 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
215          double timeout = xbt_os_time() + delay;
216    xbt_mutex_lock(queue->mutex);
217    while (xbt_dynar_length(queue->data) == 0) {
218       DEBUG1("Queue %p empty. Waiting",queue);
219       xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
220                         /* check if a timeout occurs */
221                         if (xbt_os_time() >= timeout) {
222                                 xbt_mutex_unlock(queue->mutex);
223                                 THROW0(timeout_error,0,"Timeout");
224                         }
225    }
226          xbt_dynar_shift(queue->data,dst);
227          xbt_cond_signal(queue->not_full);
228          xbt_mutex_unlock(queue->mutex);
229 }