PARMAP_DESTROY
} e_xbt_parmap_flag_t;
-#ifdef HAVE_FUTEX_H
-typedef struct s_xbt_event {
- int work;
- int done;
- unsigned int thread_counter;
- unsigned int threads_to_wait;
-} s_xbt_event_t, *xbt_event_t;
-
-static void xbt_event_init(xbt_event_t event);
-static void xbt_event_signal(xbt_event_t event);
-static void xbt_event_wait(xbt_event_t event);
-static void xbt_event_end(xbt_event_t event);
-#endif
+static void xbt_parmap_start(xbt_parmap_t parmap);
+static void xbt_parmap_signal(xbt_parmap_t parmap);
+static void xbt_parmap_wait(xbt_parmap_t parmap);
+static void xbt_parmap_end(xbt_parmap_t parmap);
+static void *xbt_parmap_worker_main(void *parmap);
-typedef struct s_xbt_parmap {
- e_xbt_parmap_flag_t status;
-#ifdef HAVE_FUTEX_H
- xbt_event_t sync_event;
-#endif
- unsigned int num_workers;
- unsigned int workers_max_id;
- void_f_pvoid_t fun;
- xbt_dynar_t data;
- unsigned int index;
-} s_xbt_parmap_t;
-
-static void *_xbt_parmap_worker_main(void *parmap);
#ifdef HAVE_FUTEX_H
static void futex_wait(int *uaddr, int val);
static void futex_wake(int *uaddr, int val);
#endif
+/**
+ * \brief Parallel map structure
+ */
+typedef struct s_xbt_parmap {
+ e_xbt_parmap_flag_t status; /* is the parmap active or being destroyed? */
+
+ int work; /* index of the current round (1 is the first) */
+ int done; /* number of rounds already done */
+ unsigned int thread_counter; /* number of threads currently working */
+ unsigned int num_workers; /* total number of worker threads */
+ unsigned int workers_max_id; /* id of the next worker thread to create */
+ void_f_pvoid_t fun; /* function to run in parallel on each element of data */
+ xbt_dynar_t data; /* parameters to pass to fun in parallel */
+ unsigned int index; /* index of the next element of data to pick */
+} s_xbt_parmap_t;
+
+/**
+ * \brief Creates a parallel map object
+ * \param num_workers number of worker threads to create
+ * \return the parmap created
+ */
xbt_parmap_t xbt_parmap_new(unsigned int num_workers)
{
unsigned int i;
/* Initialize the thread pool data structure */
xbt_parmap_t parmap = xbt_new0(s_xbt_parmap_t, 1);
-#ifdef HAVE_FUTEX_H
- parmap->sync_event = xbt_new0(s_xbt_event_t, 1);
-#endif
+
parmap->num_workers = num_workers;
parmap->status = PARMAP_WORK;
-#ifdef HAVE_FUTEX_H
- parmap->sync_event->threads_to_wait = num_workers;
-#endif
+
/* Create the pool of worker threads */
for (i = 0; i < num_workers; i++) {
- worker = xbt_os_thread_create(NULL, _xbt_parmap_worker_main, parmap, NULL);
+ worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
xbt_os_thread_detach(worker);
}
-#ifdef HAVE_FUTEX_H
- xbt_event_init(parmap->sync_event);
-#endif
+ xbt_parmap_start(parmap);
return parmap;
}
+/**
+ * \brief Destroys a parmap
+ * \param parmap the parmap to destroy
+ */
void xbt_parmap_destroy(xbt_parmap_t parmap)
{
parmap->status = PARMAP_DESTROY;
-#ifdef HAVE_FUTEX_H
- xbt_event_signal(parmap->sync_event);
- xbt_free(parmap->sync_event);
-#endif
+ xbt_parmap_signal(parmap);
xbt_free(parmap);
}
+/**
+ * \brief Applies a list of tasks in parallel.
+ * \param parmap a parallel map object
+ * \param fun the function to call in parallel
+ * \param data each element of this dynar will be passed as an argument to fun
+ */
void xbt_parmap_apply(xbt_parmap_t parmap, void_f_pvoid_t fun, xbt_dynar_t data)
{
- /* Assign resources to worker threads*/
+ /* Assign resources to worker threads */
parmap->fun = fun;
parmap->data = data;
parmap->index = 0;
-#ifdef HAVE_FUTEX_H
- xbt_event_signal(parmap->sync_event);
-#endif
+ xbt_parmap_signal(parmap);
XBT_DEBUG("Job done");
}
+/**
+ * \brief Returns a next task to process.
+ *
+ * Worker threads call this function to get more work.
+ *
+ * \return the next task to process, or NULL if there is no more work
+ */
void* xbt_parmap_next(xbt_parmap_t parmap)
{
unsigned int index = __sync_fetch_and_add(&parmap->index, 1);
return NULL;
}
-unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap)
-{
- return (unsigned long) xbt_os_thread_get_extra_data();
-}
-
-static void *_xbt_parmap_worker_main(void *arg)
+/**
+ * \brief Main function of a worker thread.
+ * \param arg the parmap
+ */
+static void *xbt_parmap_worker_main(void *arg)
{
unsigned int worker_id;
xbt_parmap_t parmap = (xbt_parmap_t) arg;
xbt_os_thread_set_extra_data((void*) (unsigned long) worker_id);
XBT_DEBUG("New worker thread created (%u)", worker_id);
-
+
/* Worker's main loop */
while (1) {
-#ifdef HAVE_FUTEX_H
- xbt_event_wait(parmap->sync_event);
-#endif
+ xbt_parmap_wait(parmap);
if (parmap->status == PARMAP_WORK) {
XBT_DEBUG("Worker %u got a job", worker_id);
/* We are destroying the parmap */
} else {
-#ifdef HAVE_FUTEX_H
- xbt_event_end(parmap->sync_event);
-#endif
+ xbt_parmap_end(parmap);
XBT_DEBUG("Shutting down worker %u", worker_id);
return NULL;
}
XBT_VERB("Waking futex %p", uaddr);
syscall(SYS_futex, uaddr, FUTEX_WAKE_PRIVATE, val, NULL, NULL, 0);
}
+#endif
-static void xbt_event_init(xbt_event_t event)
+/**
+ * \brief Starts the parmap: waits for all workers to be ready and returns.
+ *
+ * This function is called by the controller thread.
+ *
+ * \param parmap a parmap
+ */
+static void xbt_parmap_start(xbt_parmap_t parmap)
{
- int myflag = event->done;
- if (event->thread_counter < event->threads_to_wait) {
- futex_wait(&event->done, myflag);
+#ifdef HAVE_FUTEX_H
+ int myflag = parmap->done;
+ if (parmap->thread_counter < parmap->num_workers) {
+ /* wait for all workers to be ready */
+ futex_wait(&parmap->done, myflag);
}
+#endif
}
-static void xbt_event_signal(xbt_event_t event)
+/**
+ * \brief Wakes all workers and waits for them to finish the tasks.
+ *
+ * This function is called by the controller thread.
+ *
+ * \param parmap a parmap
+ */
+static void xbt_parmap_signal(xbt_parmap_t parmap)
{
- int myflag = event->done;
- event->thread_counter = 0;
- event->work++;
- futex_wake(&event->work, event->threads_to_wait);
- futex_wait(&event->done, myflag);
+#ifdef HAVE_FUTEX_H
+ int myflag = parmap->done;
+ parmap->thread_counter = 0;
+ parmap->work++;
+
+ /* wake all workers */
+ futex_wake(&parmap->work, parmap->num_workers);
+
+ /* wait for all of them to finish */
+ futex_wait(&parmap->done, myflag);
+#endif
}
-static void xbt_event_wait(xbt_event_t event)
+/**
+ * \brief Waits for some work to process.
+ *
+ * This function is called by each worker when it has no more work to do.
+ *
+ * \param parmap a parmap
+ */
+static void xbt_parmap_wait(xbt_parmap_t parmap)
{
+#ifdef HAVE_FUTEX_H
int myflag;
unsigned int mycount;
- myflag = event->work;
- mycount = __sync_add_and_fetch(&event->thread_counter, 1);
- if (mycount == event->threads_to_wait) {
- event->done++;
- futex_wake(&event->done, 1);
+ myflag = parmap->work;
+ mycount = __sync_add_and_fetch(&parmap->thread_counter, 1);
+ if (mycount == parmap->num_workers) {
+ /* all workers have finished, wake the controller */
+ parmap->done++;
+ futex_wake(&parmap->done, 1);
}
- futex_wait(&event->work, myflag);
+ /* wait for more work */
+ futex_wait(&parmap->work, myflag);
+#endif
}
-static void xbt_event_end(xbt_event_t event)
+/**
+ * \brief Ends the parmap: wakes the controller thread when all workers terminate.
+ *
+ * This function is called by all worker threads when they end.
+ *
+ * \param parmap a parmap
+ */
+static void xbt_parmap_end(xbt_parmap_t parmap)
{
+#ifdef HAVE_FUTEX_H
unsigned int mycount;
- mycount = __sync_add_and_fetch(&event->thread_counter, 1);
- if (mycount == event->threads_to_wait) {
- event->done++;
- futex_wake(&event->done, 1);
+ mycount = __sync_add_and_fetch(&parmap->thread_counter, 1);
+ if (mycount == parmap->num_workers) {
+ /* all workers have finished, wake the controller */
+ parmap->done++;
+ futex_wake(&parmap->done, 1);
}
-}
#endif
+}
#ifdef SIMGRID_TEST
#include "xbt.h"
/* Create the parallel map */
parmap = xbt_parmap_new(10);
- for(j = 0; j < 100; j++) {
+ for (j = 0; j < 100; j++) {
xbt_dynar_push_as(data, void *, (void *)j);
}