From: Christophe ThiƩry Date: Wed, 25 Jan 2012 13:17:39 +0000 (+0100) Subject: Parmap: implement the POSIX synchronization mode (not working yet) X-Git-Tag: exp_20120216~119^2~13 X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/commitdiff_plain/e9b3477bfa7c241e736e7a38c972930d6b3f710a Parmap: implement the POSIX synchronization mode (not working yet) --- diff --git a/src/xbt/parmap.c b/src/xbt/parmap.c index d545b193b2..13c7baf7d0 100644 --- a/src/xbt/parmap.c +++ b/src/xbt/parmap.c @@ -51,7 +51,6 @@ static void xbt_parmap_busy_end(xbt_parmap_t parmap); static void xbt_parmap_busy_signal(xbt_parmap_t parmap); static void xbt_parmap_busy_wait(xbt_parmap_t parmap); - /** * \brief Parallel map structure */ @@ -65,6 +64,12 @@ typedef struct s_xbt_parmap { xbt_dynar_t data; /**< parameters to pass to fun in parallel */ unsigned int index; /**< index of the next element of data to pick */ + /* posix only */ + xbt_os_cond_t ready_cond; + xbt_os_mutex_t ready_mutex; + xbt_os_cond_t done_cond; + xbt_os_mutex_t done_mutex; + /* fields that depend on the synchronization mode */ e_xbt_parmap_mode_t mode; /**< synchronization mode */ void (*start_f)(xbt_parmap_t); /**< initializes the worker threads */ @@ -114,6 +119,12 @@ void xbt_parmap_destroy(xbt_parmap_t parmap) parmap->status = XBT_PARMAP_DESTROY; parmap->signal_f(parmap); + + xbt_os_cond_destroy(parmap->ready_cond); + xbt_os_mutex_destroy(parmap->ready_mutex); + xbt_os_cond_destroy(parmap->done_cond); + xbt_os_mutex_destroy(parmap->done_mutex); + xbt_free(parmap); } @@ -128,8 +139,7 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode) #ifdef HAVE_FUTEX_H mode = XBT_PARMAP_FUTEX; #else - //For now use busy wait because posix is unimplemented - mode = XBT_PARMAP_BUSY_WAIT; + mode = XBT_PARMAP_POSIX; #endif } parmap->mode = mode; @@ -141,6 +151,11 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode) parmap->end_f = xbt_parmap_posix_end; parmap->signal_f = xbt_parmap_posix_signal; parmap->wait_f = xbt_parmap_posix_wait; + + parmap->ready_cond = xbt_os_cond_init(); + parmap->ready_mutex = xbt_os_mutex_init(); + parmap->done_cond = xbt_os_cond_init(); + parmap->done_mutex = xbt_os_mutex_init(); break; @@ -150,9 +165,14 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode) parmap->end_f = xbt_parmap_futex_end; parmap->signal_f = xbt_parmap_futex_signal; parmap->wait_f = xbt_parmap_futex_wait; + + xbt_os_cond_destroy(parmap->ready_cond); + xbt_os_mutex_destroy(parmap->ready_mutex); + xbt_os_cond_destroy(parmap->done_cond); + xbt_os_mutex_destroy(parmap->done_mutex); break; #else - xbt_die("Futex is not available on this OS (maybe you are on a Mac)."); + xbt_die("Futex is not available on this OS."); #endif case XBT_PARMAP_BUSY_WAIT: @@ -160,6 +180,11 @@ static void xbt_parmap_set_mode(xbt_parmap_t parmap, e_xbt_parmap_mode_t mode) parmap->end_f = xbt_parmap_busy_end; parmap->signal_f = xbt_parmap_busy_signal; parmap->wait_f = xbt_parmap_busy_wait; + + xbt_os_cond_destroy(parmap->ready_cond); + xbt_os_mutex_destroy(parmap->ready_mutex); + xbt_os_cond_destroy(parmap->done_cond); + xbt_os_mutex_destroy(parmap->done_mutex); break; case XBT_PARMAP_DEFAULT: @@ -228,7 +253,6 @@ static void *xbt_parmap_worker_main(void *arg) /* We are destroying the parmap */ } else { parmap->end_f(parmap); - XBT_DEBUG("Shutting down worker"); return NULL; } } @@ -248,24 +272,97 @@ static void futex_wake(int *uaddr, int val) } #endif +/** + * \brief Starts the parmap: waits for all workers to be ready and returns. + * + * This function is called by the controller thread. + * + * \param parmap a parmap + */ static void xbt_parmap_posix_start(xbt_parmap_t parmap) { - THROW_UNIMPLEMENTED; + unsigned int counter = __sync_fetch_and_add(&parmap->thread_counter, 1); + if (counter < parmap->num_workers) { + /* wait for all workers to be initialized */ + xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex); + } } +/** + * \brief Ends the parmap: wakes the controller thread when all workers terminate. + * + * This function is called by all worker threads when they end (not including + * the controller). + * + * \param parmap a parmap + */ static void xbt_parmap_posix_end(xbt_parmap_t parmap) { - THROW_UNIMPLEMENTED; + unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1); + XBT_DEBUG("Shutting down worker %d", counter); + if (counter == parmap->num_workers) { + /* all workers have finished, wake the controller */ + xbt_os_cond_signal(parmap->done_cond); + } } +/** + * \brief Wakes all workers and waits for them to finish the tasks. + * + * This function is called by the controller thread. + * + * \param parmap a parmap + */ static void xbt_parmap_posix_signal(xbt_parmap_t parmap) { - THROW_UNIMPLEMENTED; + parmap->thread_counter = 0; + parmap->work++; + XBT_DEBUG("Starting work %d", parmap->work); + + /* wake all workers */ + xbt_os_cond_broadcast(parmap->ready_cond); + + if (parmap->status == XBT_PARMAP_WORK) { + /* also work myself */ + void* work = xbt_parmap_next(parmap); + while (work != NULL) { + parmap->fun(work); + work = xbt_parmap_next(parmap); + } + } + + unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1); + if (counter < parmap->num_workers) { + /* some workers have not finished yet */ + XBT_DEBUG("Some workers have not finished yet, waiting for them"); + xbt_os_cond_wait(parmap->done_cond, parmap->done_mutex); + } } +/** + * \brief Waits for some work to process. + * + * This function is called by each worker thread (not including the controller) + * when it has no more work to do. + * + * \param parmap a parmap + */ static void xbt_parmap_posix_wait(xbt_parmap_t parmap) { - THROW_UNIMPLEMENTED; + int work = parmap->work; + unsigned int counter = __sync_add_and_fetch(&parmap->thread_counter, 1); + if (counter == parmap->num_workers) { + /* all workers have finished, wake the controller */ + parmap->done++; + XBT_DEBUG("Last worker has finished, waking the controller"); + xbt_os_cond_signal(parmap->done_cond); + } + + /* wait for more work */ + XBT_DEBUG("Worker %d waiting for more work", counter); + if (parmap->work == work) { + xbt_os_cond_wait(parmap->ready_cond, parmap->ready_mutex); + } } #ifdef HAVE_FUTEX_H