static void xbt_parmap_busy_signal(xbt_parmap_t parmap);
static void xbt_parmap_busy_wait(xbt_parmap_t parmap);
-
/**
* \brief Parallel map structure
*/
xbt_dynar_t data; /**< parameters to pass to fun in parallel */
unsigned int index; /**< index of the next element of data to pick */
+ /* posix only */
+ xbt_os_cond_t ready_cond;
+ xbt_os_mutex_t ready_mutex;
+ xbt_os_cond_t done_cond;
+ xbt_os_mutex_t done_mutex;
+
/* fields that depend on the synchronization mode */
e_xbt_parmap_mode_t mode; /**< synchronization mode */
void (*start_f)(xbt_parmap_t); /**< initializes the worker threads */
parmap->status = XBT_PARMAP_DESTROY;
parmap->signal_f(parmap);
+
+ xbt_os_cond_destroy(parmap->ready_cond);
+ xbt_os_mutex_destroy(parmap->ready_mutex);
+ xbt_os_cond_destroy(parmap->done_cond);
+ xbt_os_mutex_destroy(parmap->done_mutex);
+
xbt_free(parmap);
}
#ifdef HAVE_FUTEX_H
mode = XBT_PARMAP_FUTEX;
#else
- //For now use busy wait because posix is unimplemented
- mode = XBT_PARMAP_BUSY_WAIT;
+ mode = XBT_PARMAP_POSIX;
#endif
}
parmap->mode = mode;
parmap->end_f = xbt_parmap_posix_end;
parmap->signal_f = xbt_parmap_posix_signal;
parmap->wait_f = xbt_parmap_posix_wait;
+
+ parmap->ready_cond = xbt_os_cond_init();
+ parmap->ready_mutex = xbt_os_mutex_init();
+ parmap->done_cond = xbt_os_cond_init();
+ parmap->done_mutex = xbt_os_mutex_init();
break;
parmap->end_f = xbt_parmap_futex_end;
parmap->signal_f = xbt_parmap_futex_signal;
parmap->wait_f = xbt_parmap_futex_wait;
+
+ xbt_os_cond_destroy(parmap->ready_cond);
+ xbt_os_mutex_destroy(parmap->ready_mutex);
+ xbt_os_cond_destroy(parmap->done_cond);
+ xbt_os_mutex_destroy(parmap->done_mutex);
break;
#else
- xbt_die("Futex is not available on this OS (maybe you are on a Mac).");
+ xbt_die("Futex is not available on this OS.");
#endif
case XBT_PARMAP_BUSY_WAIT:
parmap->end_f = xbt_parmap_busy_end;
parmap->signal_f = xbt_parmap_busy_signal;
parmap->wait_f = xbt_parmap_busy_wait;
+
+ xbt_os_cond_destroy(parmap->ready_cond);
+ xbt_os_mutex_destroy(parmap->ready_mutex);
+ xbt_os_cond_destroy(parmap->done_cond);
+ xbt_os_mutex_destroy(parmap->done_mutex);
break;
case XBT_PARMAP_DEFAULT:
/* We are destroying the parmap */
} else {
parmap->end_f(parmap);
- XBT_DEBUG("Shutting down worker");
return NULL;
}
}
}
#endif
+/**
+ * \brief Starts the parmap: waits for all workers to be ready and returns.
+ *
+ * This function is called by the controller thread.
+ *
+ * \param parmap a parmap
+ */
static void xbt_parmap_posix_start(xbt_parmap_t parmap)
{
- THROW_UNIMPLEMENTED;
+ unsigned int counter = __sync_fetch_and_add(&parmap->thread_counter, 1);
+ if (counter < parmap->num_workers) {
+ /* wait for all workers to be initialized */
+ xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
+ }
}
+/**
+ * \brief Ends the parmap: wakes the controller thread when all workers terminate.
+ *
+ * This function is called by all worker threads when they end (not including
+ * the controller).
+ *
+ * \param parmap a parmap
+ */
static void xbt_parmap_posix_end(xbt_parmap_t parmap)
{
- THROW_UNIMPLEMENTED;
+ unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1);
+ XBT_DEBUG("Shutting down worker %d", counter);
+ if (counter == parmap->num_workers) {
+ /* all workers have finished, wake the controller */
+ xbt_os_cond_signal(parmap->done_cond);
+ }
}
+/**
+ * \brief Wakes all workers and waits for them to finish the tasks.
+ *
+ * This function is called by the controller thread.
+ *
+ * \param parmap a parmap
+ */
static void xbt_parmap_posix_signal(xbt_parmap_t parmap)
{
- THROW_UNIMPLEMENTED;
+ parmap->thread_counter = 0;
+ parmap->work++;
+ XBT_DEBUG("Starting work %d", parmap->work);
+
+ /* wake all workers */
+ xbt_os_cond_broadcast(parmap->ready_cond);
+
+ if (parmap->status == XBT_PARMAP_WORK) {
+ /* also work myself */
+ void* work = xbt_parmap_next(parmap);
+ while (work != NULL) {
+ parmap->fun(work);
+ work = xbt_parmap_next(parmap);
+ }
+ }
+
+ unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1);
+ if (counter < parmap->num_workers) {
+ /* some workers have not finished yet */
+ XBT_DEBUG("Some workers have not finished yet, waiting for them");
+ xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex);
+ }
}
+/**
+ * \brief Waits for some work to process.
+ *
+ * This function is called by each worker thread (not including the controller)
+ * when it has no more work to do.
+ *
+ * \param parmap a parmap
+ */
static void xbt_parmap_posix_wait(xbt_parmap_t parmap)
{
- THROW_UNIMPLEMENTED;
+ int work = parmap->work;
+ unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1);
+ if (counter == parmap->num_workers) {
+ /* all workers have finished, wake the controller */
+ parmap->done++;
+ XBT_DEBUG("Last worker has finished, waking the controller");
+ xbt_os_cond_signal(parmap->done_cond);
+ }
+
+ /* wait for more work */
+ XBT_DEBUG("Worker %d waiting for more work", counter);
+ if (parmap->work == work) {
+ xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex);
+ }
}
#ifdef HAVE_FUTEX_H