Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
implement versatile threads (working both on simulator and in situ); the simulated...
[simgrid.git] / src / xbt / xbt_queue.c
1 /* $Id$ */
2
3 /* A (synchronized) message queue.                                          */
4 /* Popping an empty queue is blocking, as well as pushing a full one        */
5
6 /* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
7
8 /* This program is free software; you can redistribute it and/or modify it
9  * under the terms of the license (GNU LGPL) which comes with this package. */
10
11 #include "xbt/misc.h"
12 #include "xbt/sysdep.h"
13 #include "xbt/log.h"
14 //#include "xbt/ex.h"
15 #include "xbt/dynar.h"
16
17 #include "xbt/synchro.h"
18 #include "xbt/queue.h" /* this module */
19
20 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
21
22 typedef struct s_xbt_queue_ {
23    int capacity;
24    xbt_dynar_t data;
25    xbt_mutex_t mutex;
26    xbt_cond_t  not_full, not_empty;
27 } s_xbt_queue_t;
28
29 /** @brief Create a new message exchange queue.
30  * 
31  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
32  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
33  */
34 xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
35    xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
36    xbt_assert0(capacity>=0,"Capacity cannot be negative");
37
38    res->capacity = capacity;
39    res->data = xbt_dynar_new(elm_size,NULL);
40    res->mutex = xbt_mutex_init();
41    res->not_full = xbt_cond_init();
42    res->not_empty = xbt_cond_init();
43    return res;
44 }
45 /** @brief Destroy a message exchange queue.
46  *
47  * Any remaining content is leaked.
48  */
49 void xbt_queue_free(xbt_queue_t *queue) {
50
51    xbt_dynar_free(&( (*queue)->data ));
52    xbt_mutex_destroy( (*queue)->mutex );
53    xbt_cond_destroy( (*queue)->not_full );
54    xbt_cond_destroy( (*queue)->not_empty );
55    free((*queue));
56    *queue = NULL;
57 }
58
59 /** @brief Get the queue size */
60 unsigned long xbt_queue_length(xbt_queue_t queue) {
61    unsigned long res;
62    xbt_mutex_lock(queue->mutex);
63    res=xbt_dynar_length(queue->data);
64    xbt_mutex_unlock(queue->mutex);
65    return res;
66 }
67
68 /** @brief Push something to the message exchange queue.
69  * 
70  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
71  * 
72  * @seealso #xbt_dynar_push
73  */
74 void xbt_queue_push(xbt_queue_t queue, const void *src) {
75    xbt_mutex_lock(queue->mutex);
76    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
77       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
78       xbt_cond_wait(queue->not_full,queue->mutex);
79    }
80    xbt_dynar_push(queue->data,src);
81    xbt_cond_signal(queue->not_empty);
82 }
83
84    
85 /** @brief Pop something from the message exchange queue.
86  * 
87  * This is blocking if the queue is empty.
88  * 
89  * @seealso #xbt_dynar_pop
90  * 
91  */
92 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
93    xbt_mutex_lock(queue->mutex);
94    while (xbt_dynar_length(queue->data) == 0) {
95       DEBUG1("Queue %p empty. Waiting",queue);
96       xbt_cond_wait(queue->not_empty,queue->mutex);
97    }
98    xbt_dynar_pop(queue->data,dst);
99    xbt_cond_signal(queue->not_full);
100 }
101
102 /** @brief Unshift something to the message exchange queue.
103  * 
104  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
105  * 
106  * @seealso #xbt_dynar_unshift
107  */
108 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
109    xbt_mutex_lock(queue->mutex);
110    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
111       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
112       xbt_cond_wait(queue->not_full,queue->mutex);
113    }
114    xbt_dynar_unshift(queue->data,src);
115    xbt_cond_signal(queue->not_empty);
116 }
117    
118
119 /** @brief Shift something from the message exchange queue.
120  * 
121  * This is blocking if the queue is empty.
122  * 
123  * @seealso #xbt_dynar_shift
124  * 
125  */
126 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
127    xbt_mutex_lock(queue->mutex);
128    while (xbt_dynar_length(queue->data) == 0) {
129       DEBUG1("Queue %p empty. Waiting",queue);
130       xbt_cond_wait(queue->not_empty,queue->mutex);
131    }
132    xbt_dynar_shift(queue->data,dst);
133    xbt_cond_signal(queue->not_full);
134 }
135
136
137
138
139 /** @brief Push something to the message exchange queue, with a timeout.
140  * 
141  * @seealso #xbt_queue_push
142  */
143 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
144    xbt_mutex_lock(queue->mutex);
145    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
146       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
147       xbt_cond_timedwait(queue->not_full,queue->mutex, delay);
148    }
149    xbt_dynar_push(queue->data,src);
150    xbt_cond_signal(queue->not_empty);
151 }
152
153    
154 /** @brief Pop something from the message exchange queue, with a timeout.
155  * 
156  * @seealso #xbt_queue_pop
157  * 
158  */
159 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
160    xbt_mutex_lock(queue->mutex);
161    while (xbt_dynar_length(queue->data) == 0) {
162       DEBUG1("Queue %p empty. Waiting",queue);
163       xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
164    }
165    xbt_dynar_pop(queue->data,dst);
166    xbt_cond_signal(queue->not_full);
167 }
168
169 /** @brief Unshift something to the message exchange queue, with a timeout.
170  * 
171  * @seealso #xbt_queue_unshift
172  */
173 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
174    xbt_mutex_lock(queue->mutex);
175    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
176       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
177       xbt_cond_timedwait(queue->not_full,queue->mutex,delay);
178    }
179    xbt_dynar_unshift(queue->data,src);
180    xbt_cond_signal(queue->not_empty);
181 }
182    
183
184 /** @brief Shift something from the message exchange queue, with a timeout.
185  * 
186  * @seealso #xbt_queue_shift
187  * 
188  */
189 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
190    xbt_mutex_lock(queue->mutex);
191    while (xbt_dynar_length(queue->data) == 0) {
192       DEBUG1("Queue %p empty. Waiting",queue);
193       xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
194    }
195    xbt_dynar_shift(queue->data,dst);
196    xbt_cond_signal(queue->not_full);
197 }