Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Parmap: implement the POSIX synchronization mode (not working yet)
authorChristophe Thiéry <christopho128@gmail.com>
Wed, 25 Jan 2012 13:17:39 +0000 (14:17 +0100)
committerChristophe Thiéry <christopho128@gmail.com>
Wed, 25 Jan 2012 13:18:59 +0000 (14:18 +0100)
src/xbt/parmap.c

index d545b19..13c7baf 100644 (file)
@@ -51,7 +51,6 @@ static void xbt_parmap_busy_end(xbt_parmap_t parmap);
 static void xbt_parmap_busy_signal(xbt_parmap_t parmap);
 static void xbt_parmap_busy_wait(xbt_parmap_t parmap);
 
 static void xbt_parmap_busy_signal(xbt_parmap_t parmap);
 static void xbt_parmap_busy_wait(xbt_parmap_t parmap);
 
-
 /**
  * \brief Parallel map structure
  */
 /**
  * \brief Parallel map structure
  */
@@ -65,6 +64,12 @@ typedef struct s_xbt_parmap {
   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
   unsigned int index;              /**< index of the next element of data to pick */
 
   xbt_dynar_t data;                /**< parameters to pass to fun in parallel */
   unsigned int index;              /**< index of the next element of data to pick */
 
+  /* posix only */
+  xbt_os_cond_t ready_cond;
+  xbt_os_mutex_t ready_mutex;
+  xbt_os_cond_t done_cond;
+  xbt_os_mutex_t done_mutex;
+
   /* fields that depend on the synchronization mode */
   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
   void (*start_f)(xbt_parmap_t);   /**< initializes the worker threads */
   /* fields that depend on the synchronization mode */
   e_xbt_parmap_mode_t mode;        /**< synchronization mode */
   void (*start_f)(xbt_parmap_t);   /**< initializes the worker threads */
@@ -114,6 +119,12 @@ void xbt_parmap_destroy(xbt_parmap_t parmap)
 
   parmap->status = XBT_PARMAP_DESTROY;
   parmap->signal_f(parmap);
 
   parmap->status = XBT_PARMAP_DESTROY;
   parmap->signal_f(parmap);
+
+  xbt_os_cond_destroy(parmap->ready_cond);
+  xbt_os_mutex_destroy(parmap->ready_mutex);
+  xbt_os_cond_destroy(parmap->done_cond);
+  xbt_os_mutex_destroy(parmap->done_mutex);
+
   xbt_free(parmap);
 }
 
   xbt_free(parmap);
 }
 
@@ -128,8 +139,7 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
 #ifdef HAVE_FUTEX_H
     mode = XBT_PARMAP_FUTEX;
 #else
 #ifdef HAVE_FUTEX_H
     mode = XBT_PARMAP_FUTEX;
 #else
-    //For now use busy wait because posix is unimplemented
-    mode = XBT_PARMAP_BUSY_WAIT;
+    mode = XBT_PARMAP_POSIX;
 #endif
   }
   parmap->mode = mode;
 #endif
   }
   parmap->mode = mode;
@@ -141,6 +151,11 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
       parmap->end_f = xbt_parmap_posix_end;
       parmap->signal_f = xbt_parmap_posix_signal;
       parmap->wait_f = xbt_parmap_posix_wait;
       parmap->end_f = xbt_parmap_posix_end;
       parmap->signal_f = xbt_parmap_posix_signal;
       parmap->wait_f = xbt_parmap_posix_wait;
+
+      parmap->ready_cond = xbt_os_cond_init();
+      parmap->ready_mutex = xbt_os_mutex_init();
+      parmap->done_cond = xbt_os_cond_init();
+      parmap->done_mutex = xbt_os_mutex_init();
       break;
 
 
       break;
 
 
@@ -150,9 +165,14 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
       parmap->end_f = xbt_parmap_futex_end;
       parmap->signal_f = xbt_parmap_futex_signal;
       parmap->wait_f = xbt_parmap_futex_wait;
       parmap->end_f = xbt_parmap_futex_end;
       parmap->signal_f = xbt_parmap_futex_signal;
       parmap->wait_f = xbt_parmap_futex_wait;
+
+      xbt_os_cond_destroy(parmap->ready_cond);
+      xbt_os_mutex_destroy(parmap->ready_mutex);
+      xbt_os_cond_destroy(parmap->done_cond);
+      xbt_os_mutex_destroy(parmap->done_mutex);
       break;
 #else
       break;
 #else
-      xbt_die("Futex is not available on this OS (maybe you are on a Mac).");
+      xbt_die("Futex is not available on this OS.");
 #endif
 
     case XBT_PARMAP_BUSY_WAIT:
 #endif
 
     case XBT_PARMAP_BUSY_WAIT:
@@ -160,6 +180,11 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode)
       parmap->end_f = xbt_parmap_busy_end;
       parmap->signal_f = xbt_parmap_busy_signal;
       parmap->wait_f = xbt_parmap_busy_wait;
       parmap->end_f = xbt_parmap_busy_end;
       parmap->signal_f = xbt_parmap_busy_signal;
       parmap->wait_f = xbt_parmap_busy_wait;
+
+      xbt_os_cond_destroy(parmap->ready_cond);
+      xbt_os_mutex_destroy(parmap->ready_mutex);
+      xbt_os_cond_destroy(parmap->done_cond);
+      xbt_os_mutex_destroy(parmap->done_mutex);
       break;
 
     case XBT_PARMAP_DEFAULT:
       break;
 
     case XBT_PARMAP_DEFAULT:
@@ -228,7 +253,6 @@ static void *xbt_parmap_worker_main(void *arg)
     /* We are destroying the parmap */
     } else {
       parmap->end_f(parmap);
     /* We are destroying the parmap */
     } else {
       parmap->end_f(parmap);
-      XBT_DEBUG("Shutting down worker");
       return NULL;
     }
   }
       return NULL;
     }
   }
@@ -248,24 +272,97 @@ static void futex_wake(int *uaddr, int val)
 }
 #endif
 
 }
 #endif
 
+/**
+ * \brief Starts the parmap: waits for all workers to be ready and returns.
+ *
+ * This function is called by the controller thread.
+ *
+ * \param parmap a parmap
+ */
 static void xbt_parmap_posix_start(xbt_parmap_t parmap)
 {
 static void xbt_parmap_posix_start(xbt_parmap_t parmap)
 {
-  THROW_UNIMPLEMENTED;
+  unsigned int counter = __sync_fetch_and_add(&parmap->thread_counter, 1);
+  if (counter < parmap->num_workers) {
+    /* wait for all workers to be initialized */
+    xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
+  }
 }
 
 }
 
+/**
+ * \brief Ends the parmap: wakes the controller thread when all workers terminate.
+ *
+ * This function is called by all worker threads when they end (not including
+ * the controller).
+ *
+ * \param parmap a parmap
+ */
 static void xbt_parmap_posix_end(xbt_parmap_t parmap)
 {
 static void xbt_parmap_posix_end(xbt_parmap_t parmap)
 {
-  THROW_UNIMPLEMENTED;
+  unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1);
+  XBT_DEBUG("Shutting down worker %d", counter);
+  if (counter == parmap->num_workers) {
+    /* all workers have finished, wake the controller */
+    xbt_os_cond_signal(parmap->done_cond);
+  }
 }
 
 }
 
+/**
+ * \brief Wakes all workers and waits for them to finish the tasks.
+ *
+ * This function is called by the controller thread.
+ *
+ * \param parmap a parmap
+ */
 static void xbt_parmap_posix_signal(xbt_parmap_t parmap)
 {
 static void xbt_parmap_posix_signal(xbt_parmap_t parmap)
 {
-  THROW_UNIMPLEMENTED;
+  parmap->thread_counter = 0;
+  parmap->work++;
+  XBT_DEBUG("Starting work %d", parmap->work);
+
+  /* wake all workers */
+  xbt_os_cond_broadcast(parmap->ready_cond);
+
+  if (parmap->status == XBT_PARMAP_WORK) {
+    /* also work myself */
+    void* work = xbt_parmap_next(parmap);
+    while (work != NULL) {
+      parmap->fun(work);
+      work = xbt_parmap_next(parmap);
+    }
+  }
+
+  unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1);
+  if (counter < parmap->num_workers) {
+    /* some workers have not finished yet */
+    XBT_DEBUG("Some workers have not finished yet, waiting for them");
+    xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
+  }
 }
 
 }
 
+/**
+ * \brief Waits for some work to process.
+ *
+ * This function is called by each worker thread (not including the controller)
+ * when it has no more work to do.
+ *
+ * \param parmap a parmap
+ */
 static void xbt_parmap_posix_wait(xbt_parmap_t parmap)
 {
 static void xbt_parmap_posix_wait(xbt_parmap_t parmap)
 {
-  THROW_UNIMPLEMENTED;
+  int work = parmap->work;
+  unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1);
+  if (counter == parmap->num_workers) {
+    /* all workers have finished, wake the controller */
+    parmap->done++;
+    XBT_DEBUG("Last worker has finished, waking the controller");
+    xbt_os_cond_signal(parmap->done_cond);
+  }
+
+  /* wait for more work */
+  XBT_DEBUG("Worker %d waiting for more work", counter);
+  if (parmap->work == work) {
+    xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
+  }
 }
 
 #ifdef HAVE_FUTEX_H
 }
 
 #ifdef HAVE_FUTEX_H