Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Kill xbt_dynar_dopar().
[simgrid.git] / src / xbt / xbt_queue.c
index 976054f..6dc3c95 100644 (file)
@@ -1,9 +1,8 @@
-/* $Id$ */
-
 /* A (synchronized) message queue.                                          */
 /* Popping an empty queue is blocking, as well as pushing a full one        */
 
-/* Copyright (c) 2007 Martin Quinson. All rights reserved.                  */
+/* Copyright (c) 2007, 2008, 2009, 2010. The SimGrid Team.
+ * All rights reserved.                                                     */
 
 /* This program is free software; you can redistribute it and/or modify it
  * under the terms of the license (GNU LGPL) which comes with this package. */
 #include "xbt/log.h"
 #include "xbt/dynar.h"
 
-#include "xbt/synchro.h"
-#include "xbt/queue.h" /* this module */
-#include "gras/virtu.h"
-XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue,xbt,"Message exchanging queue");
+#include "xbt/queue.h"          /* this module */
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_queue, xbt,
+                                "Message exchanging queue");
 
 typedef struct s_xbt_queue_ {
   int capacity;
   xbt_dynar_t data;
   xbt_mutex_t mutex;
-  xbt_cond_t  not_full, not_empty;
+  xbt_cond_t not_full, not_empty;
 } s_xbt_queue_t;
 
 /** @brief Create a new message exchange queue.
@@ -30,36 +28,41 @@ typedef struct s_xbt_queue_ {
  * @param capacity the capacity of the queue. If non-nul, any attempt to push an item which would let the size of the queue over this number will be blocking until someone else pop some data
  * @param elm_size size of each element stored in it (see #xbt_dynar_new)
  */
-xbt_queue_t xbt_queue_new(int capacity,unsigned long elm_size) {
-  xbt_queue_t res = xbt_new0(s_xbt_queue_t,1);
-  xbt_assert0(capacity>=0,"Capacity cannot be negative");
+xbt_queue_t xbt_queue_new(int capacity, unsigned long elm_size)
+{
+  xbt_queue_t res = xbt_new0(s_xbt_queue_t, 1);
+  if (capacity<0)
+    capacity=0;
 
   res->capacity = capacity;
-  res->data = xbt_dynar_new(elm_size,NULL);
+  res->data = xbt_dynar_new(elm_size, NULL);
   res->mutex = xbt_mutex_init();
   res->not_full = xbt_cond_init();
   res->not_empty = xbt_cond_init();
   return res;
 }
+
 /** @brief Destroy a message exchange queue.
  *
  * Any remaining content is leaked.
  */
-void xbt_queue_free(xbt_queue_t *queue) {
-
-  xbt_dynar_free(&( (*queue)->data ));
-  xbt_mutex_destroy( (*queue)->mutex );
-  xbt_cond_destroy( (*queue)->not_full );
-  xbt_cond_destroy( (*queue)->not_empty );
-  free((*queue));
+void xbt_queue_free(xbt_queue_t * queue)
+{
+
+  xbt_dynar_free(&((*queue)->data));
+  xbt_mutex_destroy((*queue)->mutex);
+  xbt_cond_destroy((*queue)->not_full);
+  xbt_cond_destroy((*queue)->not_empty);
+  free(*queue);
   *queue = NULL;
 }
 
 /** @brief Get the queue size */
-unsigned long xbt_queue_length(const xbt_queue_t queue) {
+unsigned long xbt_queue_length(const xbt_queue_t queue)
+{
   unsigned long res;
   xbt_mutex_acquire(queue->mutex);
-  res=xbt_dynar_length(queue->data);
+  res = xbt_dynar_length(queue->data);
   xbt_mutex_release(queue->mutex);
   return res;
 }
@@ -70,13 +73,16 @@ unsigned long xbt_queue_length(const xbt_queue_t queue) {
  *
  * @see #xbt_dynar_push
  */
-void xbt_queue_push(xbt_queue_t queue, const void *src) {
+void xbt_queue_push(xbt_queue_t queue, const void *src)
+{
   xbt_mutex_acquire(queue->mutex);
-  while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
-    DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
-    xbt_cond_wait(queue->not_full,queue->mutex);
+  while (queue->capacity != 0
+         && queue->capacity == xbt_dynar_length(queue->data)) {
+    XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
+           queue->capacity);
+    xbt_cond_wait(queue->not_full, queue->mutex);
   }
-  xbt_dynar_push(queue->data,src);
+  xbt_dynar_push(queue->data, src);
   xbt_cond_signal(queue->not_empty);
   xbt_mutex_release(queue->mutex);
 }
@@ -89,13 +95,14 @@ void xbt_queue_push(xbt_queue_t queue, const void *src) {
  * @see #xbt_dynar_pop
  *
  */
-void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
+void xbt_queue_pop(xbt_queue_t queue, void *const dst)
+{
   xbt_mutex_acquire(queue->mutex);
-  while (xbt_dynar_length(queue->data) == 0) {
-    DEBUG1("Queue %p empty. Waiting",queue);
-    xbt_cond_wait(queue->not_empty,queue->mutex);
+  while (xbt_dynar_is_empty(queue->data)) {
+    XBT_DEBUG("Queue %p empty. Waiting", queue);
+    xbt_cond_wait(queue->not_empty, queue->mutex);
   }
-  xbt_dynar_pop(queue->data,dst);
+  xbt_dynar_pop(queue->data, dst);
   xbt_cond_signal(queue->not_full);
   xbt_mutex_release(queue->mutex);
 }
@@ -106,13 +113,16 @@ void xbt_queue_pop(xbt_queue_t queue, void* const dst) {
  *
  * @see #xbt_dynar_unshift
  */
-void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
+void xbt_queue_unshift(xbt_queue_t queue, const void *src)
+{
   xbt_mutex_acquire(queue->mutex);
-  while (queue->capacity != 0 && queue->capacity == xbt_dynar_length(queue->data)) {
-    DEBUG2("Capacity of %p exceded (=%d). Waiting",queue,queue->capacity);
-    xbt_cond_wait(queue->not_full,queue->mutex);
+  while (queue->capacity != 0
+         && queue->capacity == xbt_dynar_length(queue->data)) {
+    XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
+           queue->capacity);
+    xbt_cond_wait(queue->not_full, queue->mutex);
   }
-  xbt_dynar_unshift(queue->data,src);
+  xbt_dynar_unshift(queue->data, src);
   xbt_cond_signal(queue->not_empty);
   xbt_mutex_release(queue->mutex);
 }
@@ -125,13 +135,14 @@ void xbt_queue_unshift(xbt_queue_t queue, const void *src) {
  * @see #xbt_dynar_shift
  *
  */
-void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
+void xbt_queue_shift(xbt_queue_t queue, void *const dst)
+{
   xbt_mutex_acquire(queue->mutex);
-  while (xbt_dynar_length(queue->data) == 0) {
-    DEBUG1("Queue %p empty. Waiting",queue);
-    xbt_cond_wait(queue->not_empty,queue->mutex);
+  while (xbt_dynar_is_empty(queue->data)) {
+    XBT_DEBUG("Queue %p empty. Waiting", queue);
+    xbt_cond_wait(queue->not_empty, queue->mutex);
   }
-  xbt_dynar_shift(queue->data,dst);
+  xbt_dynar_shift(queue->data, dst);
   xbt_cond_signal(queue->not_full);
   xbt_mutex_release(queue->mutex);
 }
@@ -143,9 +154,9 @@ void xbt_queue_shift(xbt_queue_t queue, void* const dst) {
  *
  * @see #xbt_queue_push
  */
-void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
+void xbt_queue_push_timed(xbt_queue_t queue, const void *src, double delay)
+{
   double begin = xbt_time();
-  xbt_ex_t e;
 
   xbt_mutex_acquire(queue->mutex);
 
@@ -154,27 +165,29 @@ void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
         queue->capacity == xbt_dynar_length(queue->data)) {
 
       xbt_mutex_release(queue->mutex);
-      THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
-             queue,queue->capacity);
+      THROWF(timeout_error, 0,
+             "Capacity of %p exceeded (=%d), and delay = 0", queue,
+             queue->capacity);
     }
   } else {
     while (queue->capacity != 0 &&
-        queue->capacity == xbt_dynar_length(queue->data) &&
-        (delay<0 || (xbt_time() - begin) <= delay) ) {
+           queue->capacity == xbt_dynar_length(queue->data) &&
+           (delay < 0 || (xbt_time() - begin) <= delay)) {
 
-      DEBUG2("Capacity of %p exceded (=%d). Waiting",
-             queue,queue->capacity);
+      XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
+             queue->capacity);
       TRY {
-        xbt_cond_timedwait(queue->not_full,queue->mutex,
-                           delay < 0 ? -1 : delay - (xbt_time()-begin));
-      } CATCH(e) {
+        xbt_cond_timedwait(queue->not_full, queue->mutex,
+                           delay < 0 ? -1 : delay - (xbt_time() - begin));
+      }
+      CATCH_ANONYMOUS {
         xbt_mutex_release(queue->mutex);
         RETHROW;
       }
     }
   }
 
-  xbt_dynar_push(queue->data,src);
+  xbt_dynar_push(queue->data, src);
   xbt_cond_signal(queue->not_empty);
   xbt_mutex_release(queue->mutex);
 }
@@ -185,32 +198,33 @@ void xbt_queue_push_timed(xbt_queue_t queue, const void *src,double delay) {
  * @see #xbt_queue_pop
  *
  */
-void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
+void xbt_queue_pop_timed(xbt_queue_t queue, void *const dst, double delay)
+{
   double begin = xbt_time();
-  xbt_ex_t e;
 
   xbt_mutex_acquire(queue->mutex);
 
   if (delay == 0) {
-    if (xbt_dynar_length(queue->data) == 0) {
+    if (xbt_dynar_is_empty(queue->data)) {
       xbt_mutex_release(queue->mutex);
-      THROW0(timeout_error,0,"Delay = 0, and queue is empty");
+      THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
     }
   } else {
-    while ( (xbt_dynar_length(queue->data) == 0) &&
-        (delay<0 || (xbt_time() - begin) <= delay) ) {
-      DEBUG1("Queue %p empty. Waiting",queue);
+    while ((xbt_dynar_is_empty(queue->data)) &&
+           (delay < 0 || (xbt_time() - begin) <= delay)) {
+      XBT_DEBUG("Queue %p empty. Waiting", queue);
       TRY {
-        xbt_cond_timedwait(queue->not_empty,queue->mutex,
-                           delay<0 ? -1 : delay - (xbt_time()-begin));
-      } CATCH(e) {
+        xbt_cond_timedwait(queue->not_empty, queue->mutex,
+                           delay < 0 ? -1 : delay - (xbt_time() - begin));
+      }
+      CATCH_ANONYMOUS {
         xbt_mutex_release(queue->mutex);
         RETHROW;
       }
     }
   }
 
-  xbt_dynar_pop(queue->data,dst);
+  xbt_dynar_pop(queue->data, dst);
   xbt_cond_signal(queue->not_full);
   xbt_mutex_release(queue->mutex);
 }
@@ -219,38 +233,41 @@ void xbt_queue_pop_timed(xbt_queue_t queue, void* const dst,double delay) {
  *
  * @see #xbt_queue_unshift
  */
-void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
+void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,
+                             double delay)
+{
   double begin = xbt_time();
-  xbt_ex_t e;
 
   xbt_mutex_acquire(queue->mutex);
 
-  if (delay==0) {
+  if (delay == 0) {
     if (queue->capacity != 0 &&
         queue->capacity == xbt_dynar_length(queue->data)) {
 
       xbt_mutex_release(queue->mutex);
-      THROW2(timeout_error,0,"Capacity of %p exceded (=%d), and delay = 0",
-             queue,queue->capacity);
+      THROWF(timeout_error, 0,
+             "Capacity of %p exceeded (=%d), and delay = 0", queue,
+             queue->capacity);
     }
   } else {
     while (queue->capacity != 0 &&
-        queue->capacity == xbt_dynar_length(queue->data) &&
-        (delay<0 || (xbt_time() - begin) <= delay) ) {
+           queue->capacity == xbt_dynar_length(queue->data) &&
+           (delay < 0 || (xbt_time() - begin) <= delay)) {
 
-      DEBUG2("Capacity of %p exceded (=%d). Waiting",
-             queue,queue->capacity);
+      XBT_DEBUG("Capacity of %p exceeded (=%d). Waiting", queue,
+             queue->capacity);
       TRY {
-        xbt_cond_timedwait(queue->not_full,queue->mutex,
-                           delay < 0 ? -1 : delay - (xbt_time()-begin));
-      } CATCH(e) {
+        xbt_cond_timedwait(queue->not_full, queue->mutex,
+                           delay < 0 ? -1 : delay - (xbt_time() - begin));
+      }
+      CATCH_ANONYMOUS {
         xbt_mutex_release(queue->mutex);
         RETHROW;
       }
     }
   }
 
-  xbt_dynar_unshift(queue->data,src);
+  xbt_dynar_unshift(queue->data, src);
   xbt_cond_signal(queue->not_empty);
   xbt_mutex_release(queue->mutex);
 }
@@ -261,37 +278,40 @@ void xbt_queue_unshift_timed(xbt_queue_t queue, const void *src,double delay) {
  * @see #xbt_queue_shift
  *
  */
-void xbt_queue_shift_timed(xbt_queue_t queue, void* const dst,double delay) {
+void xbt_queue_shift_timed(xbt_queue_t queue, void *const dst,
+                           double delay)
+{
   double begin = xbt_time();
-  xbt_ex_t e;
 
   xbt_mutex_acquire(queue->mutex);
 
   if (delay == 0) {
-    if (xbt_dynar_length(queue->data) == 0) {
+    if (xbt_dynar_is_empty(queue->data)) {
       xbt_mutex_release(queue->mutex);
-      THROW0(timeout_error,0,"Delay = 0, and queue is empty");
+      THROWF(timeout_error, 0, "Delay = 0, and queue is empty");
     }
   } else {
-    while ( (xbt_dynar_length(queue->data) == 0) &&
-        (delay<0 || (xbt_time() - begin) <= delay) ) {
-      DEBUG1("Queue %p empty. Waiting",queue);
+    while ((xbt_dynar_is_empty(queue->data)) &&
+           (delay < 0 || (xbt_time() - begin) <= delay)) {
+      XBT_DEBUG("Queue %p empty. Waiting", queue);
       TRY {
-        xbt_cond_timedwait(queue->not_empty,queue->mutex,
-                           delay<0 ? -1 : delay - (xbt_time()-begin));
-      } CATCH(e) {
+        xbt_cond_timedwait(queue->not_empty, queue->mutex,
+                           delay < 0 ? -1 : delay - (xbt_time() - begin));
+      }
+      CATCH_ANONYMOUS {
         xbt_mutex_release(queue->mutex);
         RETHROW;
       }
     }
   }
 
-  if (xbt_dynar_length(queue->data) == 0) {
+  if (xbt_dynar_is_empty(queue->data)) {
     xbt_mutex_release(queue->mutex);
-    THROW1(timeout_error,0,"Timeout (%f) elapsed, but queue still empty",delay);
+    THROWF(timeout_error, 0, "Timeout (%f) elapsed, but queue still empty",
+           delay);
   }
 
-  xbt_dynar_shift(queue->data,dst);
+  xbt_dynar_shift(queue->data, dst);
   xbt_cond_signal(queue->not_full);
   xbt_mutex_release(queue->mutex);
 }