Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
add msg and xbt versions of barrier.
authorAugustin Degomme <degomme@idpann.imag.fr>
Tue, 6 May 2014 13:31:00 +0000 (15:31 +0200)
committerAugustin Degomme <degomme@idpann.imag.fr>
Tue, 6 May 2014 13:56:16 +0000 (15:56 +0200)
MSG/xbt_barrier_init(unsigned int count) creates the barrier object for all processes
MSG/xbt_barrier_wait(msg/xbt_bar_t bar) waits on it
MSG/xbt_barrier_destroy(msg/xbt_bar_t bar) deletes the object (only one process should call it)

include/msg/msg.h
include/xbt/synchro_core.h
src/msg/msg_synchro.c
src/xbt/xbt_sg_synchro.c

index 0269493..3ff9838 100644 (file)
@@ -417,6 +417,15 @@ XBT_PUBLIC(void) MSG_sem_get_capacity(msg_sem_t sem);
 XBT_PUBLIC(void) MSG_sem_destroy(msg_sem_t sem);
 XBT_PUBLIC(int) MSG_sem_would_block(msg_sem_t sem);
 
+/** @brief Opaque type representing a barrier identifier
+ *  @ingroup msg_synchro
+ *  @hideinitializer
+ */
+typedef struct s_xbt_bar *msg_bar_t;
+XBT_PUBLIC(msg_bar_t) MSG_barrier_init( unsigned int count);
+XBT_PUBLIC(void) MSG_barrier_destroy(msg_bar_t bar);
+XBT_PUBLIC(void) MSG_barrier_wait(msg_bar_t bar);
+
 /** @brief Opaque type describing a Virtual Machine.
  *  @ingroup msg_VMs
  *
index c41c26c..5313a9c 100644 (file)
@@ -106,6 +106,12 @@ XBT_PUBLIC(void) xbt_cond_broadcast(xbt_cond_t cond);
 /** @brief Destroys the given mutex variable */
 XBT_PUBLIC(void) xbt_cond_destroy(xbt_cond_t cond);
 
+
+typedef struct s_xbt_bar_ *xbt_bar_t;
+XBT_PUBLIC(xbt_bar_t) xbt_barrier_init( unsigned int count);
+XBT_PUBLIC(void) xbt_barrier_destroy(xbt_bar_t bar);
+XBT_PUBLIC(void) xbt_barrier_wait(xbt_bar_t bar);
+
 /** @} */
 
 SG_END_DECL()
index c825289..186fcef 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "msg_private.h"
 #include "xbt/sysdep.h"
+#include "xbt/synchro_core.h"
 #include "xbt/log.h"
 
 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(msg_synchro, msg,
@@ -64,4 +65,19 @@ int MSG_sem_would_block(msg_sem_t sem) {
   return simcall_sem_would_block(sem);
 }
 
+/** @brief Initializes a barrier, with count elements */
+msg_bar_t MSG_barrier_init(unsigned int count) {
+   return (msg_bar_t)xbt_barrier_init(count);
+}
+
+/** @brief Initializes a barrier, with count elements */
+void MSG_barrier_destroy(msg_bar_t bar) {
+  xbt_barrier_destroy((xbt_bar_t)bar);
+}
+
+/** @brief Performs a barrier already initialized */
+void MSG_barrier_wait(msg_bar_t bar) {
+  xbt_barrier_wait((xbt_bar_t)bar);
+}
+
 /**@}*/
index 4907e6f..e04f5a1 100644 (file)
@@ -184,3 +184,42 @@ void xbt_cond_destroy(xbt_cond_t cond)
 {
   simcall_cond_destroy((smx_cond_t) cond);
 }
+
+/***** barrier related functions *****/
+typedef struct s_xbt_bar_ {
+  xbt_mutex_t mutex;
+  xbt_cond_t cond;
+  unsigned int arrived_processes;
+  unsigned int expected_processes;
+} s_xbt_bar_;
+
+xbt_bar_t xbt_barrier_init(unsigned int count)
+{
+  xbt_bar_t bar = xbt_new0(s_xbt_bar_, 1);
+  bar->expected_processes = count;
+  bar->arrived_processes = 0;
+  bar->mutex = xbt_mutex_init();
+  bar->cond = xbt_cond_init();
+  return bar;
+}
+
+
+void xbt_barrier_wait(xbt_bar_t bar)
+{
+   xbt_mutex_acquire(bar->mutex);
+   if (++bar->arrived_processes == bar->expected_processes) {
+     xbt_cond_broadcast(bar->cond);
+     xbt_mutex_release(bar->mutex);
+   } else {
+     xbt_cond_wait(bar->cond, bar->mutex);
+     xbt_mutex_release(bar->mutex);
+   }
+}
+
+void xbt_barrier_destroy(xbt_bar_t bar)
+{
+   xbt_mutex_destroy(bar->mutex);
+   xbt_cond_destroy(bar->cond);
+   xbt_free(bar);
+}
+