Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Make sure all the source files have an reference of the copyright and of the licence
[simgrid.git] / src / xbt / xbt_queue.c
index 4e645af..39643f4 100644 (file)
 #include "xbt/misc.h"
 #include "xbt/sysdep.h"
 #include "xbt/log.h"
-//#include "xbt/ex.h"
 #include "xbt/dynar.h"
 
 #include "xbt/synchro.h"
 #include "xbt/queue.h" /* this module */
-
+#include "gras/virtu.h"
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
 
 typedef struct s_xbt_queue_ {
@@ -57,11 +56,11 @@ void xbt_queue_free(xbt_queue_t *queue) {
 }
 
 /** @brief Get the queue size */
-unsigned long xbt_queue_length(xbt_queue_t queue) {
+unsigned long xbt_queue_length(const xbt_queue_t queue) {
    unsigned long res;
-   xbt_mutex_lock(queue->mutex);
+   xbt_mutex_acquire(queue->mutex);
    res=xbt_dynar_length(queue->data);
-   xbt_mutex_unlock(queue->mutex);
+   xbt_mutex_release(queue->mutex);
    return res;
 }
 
@@ -69,16 +68,17 @@ unsigned long xbt_queue_length(xbt_queue_t queue) {
  * 
  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
  * 
- * @seealso #xbt_dynar_push
+ * @see #xbt_dynar_push
  */
 void xbt_queue_push(xbt_queue_t queue, const void *src) {
-   xbt_mutex_lock(queue->mutex);
+   xbt_mutex_acquire(queue->mutex);
    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
       xbt_cond_wait(queue->not_full,queue->mutex);
    }
    xbt_dynar_push(queue->data,src);
    xbt_cond_signal(queue->not_empty);
+   xbt_mutex_release(queue->mutex);
 }
 
    
@@ -86,33 +86,35 @@ void xbt_queue_push(xbt_queue_t queue, const void *src) {
  * 
  * This is blocking if the queue is empty.
  * 
- * @seealso #xbt_dynar_pop
+ * @see #xbt_dynar_pop
  * 
  */
 void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
-   xbt_mutex_lock(queue->mutex);
+   xbt_mutex_acquire(queue->mutex);
    while (xbt_dynar_length(queue->data) == 0) {
       DEBUG1("Queue %p empty. Waiting",queue);
       xbt_cond_wait(queue->not_empty,queue->mutex);
    }
    xbt_dynar_pop(queue->data,dst);
    xbt_cond_signal(queue->not_full);
+   xbt_mutex_release(queue->mutex);
 }
 
 /** @brief Unshift something to the message exchange queue.
  * 
  * This is blocking if the declared capacity is non-nul, and if this amount is reached.
  * 
- * @seealso #xbt_dynar_unshift
+ * @see #xbt_dynar_unshift
  */
 void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
-   xbt_mutex_lock(queue->mutex);
+   xbt_mutex_acquire(queue->mutex);
    while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
       DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
       xbt_cond_wait(queue->not_full,queue->mutex);
    }
    xbt_dynar_unshift(queue->data,src);
    xbt_cond_signal(queue->not_empty);
+   xbt_mutex_release(queue->mutex);
 }
    
 
@@ -120,17 +122,18 @@ void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
  * 
  * This is blocking if the queue is empty.
  * 
- * @seealso #xbt_dynar_shift
+ * @see #xbt_dynar_shift
  * 
  */
 void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
-   xbt_mutex_lock(queue->mutex);
+   xbt_mutex_acquire(queue->mutex);
    while (xbt_dynar_length(queue->data) == 0) {
       DEBUG1("Queue %p empty. Waiting",queue);
       xbt_cond_wait(queue->not_empty,queue->mutex);
    }
    xbt_dynar_shift(queue->data,dst);
    xbt_cond_signal(queue->not_full);
+   xbt_mutex_release(queue->mutex);
 }
 
 
@@ -138,60 +141,157 @@ void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
 
 /** @brief Push something to the message exchange queue, with a timeout.
  * 
- * @seealso #xbt_queue_push
+ * @see #xbt_queue_push
  */
 void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
-   xbt_mutex_lock(queue->mutex);
-   while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
-      DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
-      xbt_cond_timedwait(queue->not_full,queue->mutex, delay);
-   }
-   xbt_dynar_push(queue->data,src);
-   xbt_cond_signal(queue->not_empty);
+  double begin = xbt_time();
+  xbt_ex_t e;
+
+  xbt_mutex_acquire(queue->mutex);
+
+  if (delay == 0) {
+    if (queue->capacity != 0 && 
+       queue->capacity == xbt_dynar_length(queue->data)) {
+
+      xbt_mutex_release(queue->mutex);
+      THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
+            queue,queue->capacity);
+    }
+  } else {
+    while (queue->capacity != 0 && 
+          queue->capacity == xbt_dynar_length(queue->data) &&
+          (delay<0 || (xbt_time() - begin) <= delay) ) {
+      
+      DEBUG2("Capacity of %p exceded (=%d). Waiting",
+            queue,queue->capacity);
+      TRY {
+       xbt_cond_timedwait(queue->not_full,queue->mutex,
+                          delay < 0 ? -1 : delay - (xbt_time()-begin));
+      } CATCH(e) {
+       xbt_mutex_release(queue->mutex);
+       RETHROW;
+      }
+    }
+  }
+
+  xbt_dynar_push(queue->data,src);
+  xbt_cond_signal(queue->not_empty);
+  xbt_mutex_release(queue->mutex);
 }
 
    
 /** @brief Pop something from the message exchange queue, with a timeout.
  * 
- * @seealso #xbt_queue_pop
+ * @see #xbt_queue_pop
  * 
  */
 void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
-   xbt_mutex_lock(queue->mutex);
-   while (xbt_dynar_length(queue->data) == 0) {
+  double begin = xbt_time();
+  xbt_ex_t e;
+
+  xbt_mutex_acquire(queue->mutex);
+
+  if (delay == 0) {
+    if (xbt_dynar_length(queue->data) == 0) {
+      xbt_mutex_release(queue->mutex);
+      THROW0(timeout_error,0,"Delay = 0, and queue is empty");
+    }
+  } else {
+    while ( (xbt_dynar_length(queue->data) == 0) && 
+           (delay<0 || (xbt_time() - begin) <= delay) ) {
       DEBUG1("Queue %p empty. Waiting",queue);
-      xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
-   }
-   xbt_dynar_pop(queue->data,dst);
-   xbt_cond_signal(queue->not_full);
+      TRY {
+       xbt_cond_timedwait(queue->not_empty,queue->mutex, 
+                          delay<0 ? -1 : delay - (xbt_time()-begin));
+      } CATCH(e) {
+       xbt_mutex_release(queue->mutex);
+       RETHROW;
+      }
+    }
+  }
+
+  xbt_dynar_pop(queue->data,dst);
+  xbt_cond_signal(queue->not_full);
+  xbt_mutex_release(queue->mutex);
 }
 
 /** @brief Unshift something to the message exchange queue, with a timeout.
  * 
- * @seealso #xbt_queue_unshift
+ * @see #xbt_queue_unshift
  */
 void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
-   xbt_mutex_lock(queue->mutex);
-   while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
-      DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
-      xbt_cond_timedwait(queue->not_full,queue->mutex,delay);
-   }
-   xbt_dynar_unshift(queue->data,src);
-   xbt_cond_signal(queue->not_empty);
+  double begin = xbt_time();
+  xbt_ex_t e;
+
+  xbt_mutex_acquire(queue->mutex);
+
+  if (delay==0) {
+    if (queue->capacity != 0 && 
+       queue->capacity == xbt_dynar_length(queue->data)) {
+
+      xbt_mutex_release(queue->mutex);
+      THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
+            queue,queue->capacity);
+    }
+  } else {
+    while (queue->capacity != 0 && 
+          queue->capacity == xbt_dynar_length(queue->data) &&
+          (delay<0 || (xbt_time() - begin) <= delay) ) {
+      
+      DEBUG2("Capacity of %p exceded (=%d). Waiting",
+            queue,queue->capacity);
+      TRY {
+       xbt_cond_timedwait(queue->not_full,queue->mutex,
+                          delay < 0 ? -1 : delay - (xbt_time()-begin));
+      } CATCH(e) {
+       xbt_mutex_release(queue->mutex);
+       RETHROW;
+      }
+    }
+  }
+
+  xbt_dynar_unshift(queue->data,src);
+  xbt_cond_signal(queue->not_empty);
+  xbt_mutex_release(queue->mutex);
 }
    
 
 /** @brief Shift something from the message exchange queue, with a timeout.
  * 
- * @seealso #xbt_queue_shift
+ * @see #xbt_queue_shift
  * 
  */
 void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
-   xbt_mutex_lock(queue->mutex);
-   while (xbt_dynar_length(queue->data) == 0) {
+  double begin = xbt_time();
+  xbt_ex_t e;
+
+  xbt_mutex_acquire(queue->mutex);
+
+  if (delay == 0) {
+    if (xbt_dynar_length(queue->data) == 0) {
+      xbt_mutex_release(queue->mutex);
+      THROW0(timeout_error,0,"Delay = 0, and queue is empty");
+    }
+  } else {
+    while ( (xbt_dynar_length(queue->data) == 0) && 
+           (delay<0 || (xbt_time() - begin) <= delay) ) {
       DEBUG1("Queue %p empty. Waiting",queue);
-      xbt_cond_timedwait(queue->not_empty,queue->mutex,delay);
-   }
-   xbt_dynar_shift(queue->data,dst);
-   xbt_cond_signal(queue->not_full);
+      TRY {
+       xbt_cond_timedwait(queue->not_empty,queue->mutex, 
+                          delay<0 ? -1 : delay - (xbt_time()-begin));
+      } CATCH(e) {
+       xbt_mutex_release(queue->mutex);
+       RETHROW;
+      }
+    }
+  }
+
+  if (xbt_dynar_length(queue->data) == 0) {
+     xbt_mutex_release(queue->mutex);
+     THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay);
+  }
+   
+  xbt_dynar_shift(queue->data,dst);
+  xbt_cond_signal(queue->not_full);
+  xbt_mutex_release(queue->mutex);
 }