int work; /* index of the current round (1 is the first) */
int done; /* number of rounds already done */
unsigned int thread_counter; /* number of threads currently working */
- unsigned int num_workers; /* total number of worker threads */
- unsigned int workers_max_id; /* id of the next worker thread to create */
+ unsigned int num_workers; /* total number of worker threads including the controller */
void_f_pvoid_t fun; /* function to run in parallel on each element of data */
xbt_dynar_t data; /* parameters to pass to fun in parallel */
unsigned int index; /* index of the next element of data to pick */
parmap->status = PARMAP_WORK;
/* Create the pool of worker threads */
- for (i = 0; i < num_workers; i++) {
+ for (i = 0; i < num_workers - 1; i++) {
worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
xbt_os_thread_detach(worker);
}
return NULL;
}
-/**
- * \brief Returns the worker id of the current thread.
- * \param parmap a parmap
- * \return the worker id
- */
-unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap)
-{
- return (unsigned long) xbt_os_thread_get_extra_data();
-}
-
/**
* \brief Main function of a worker thread.
* \param arg the parmap
*/
static void *xbt_parmap_worker_main(void *arg)
{
- unsigned int worker_id;
xbt_parmap_t parmap = (xbt_parmap_t) arg;
- /* Fetch a worker id */
- worker_id = __sync_fetch_and_add(&parmap->workers_max_id, 1);
- xbt_os_thread_set_extra_data((void*) (unsigned long) worker_id);
-
- XBT_DEBUG("New worker thread created (%u)", worker_id);
+ XBT_DEBUG("New worker thread created");
/* Worker's main loop */
while (1) {
xbt_parmap_wait(parmap);
if (parmap->status == PARMAP_WORK) {
- XBT_DEBUG("Worker %u got a job", worker_id);
+ XBT_DEBUG("Worker got a job");
void* work = xbt_parmap_next(parmap);
- if (work != NULL) {
+ while (work != NULL) {
parmap->fun(work);
+ work = xbt_parmap_next(parmap);
}
- XBT_DEBUG("Worker %u has finished", worker_id);
+ XBT_DEBUG("Worker has finished");
/* We are destroying the parmap */
} else {
xbt_parmap_end(parmap);
- XBT_DEBUG("Shutting down worker %u", worker_id);
+ XBT_DEBUG("Shutting down worker");
return NULL;
}
}
{
#ifdef HAVE_FUTEX_H
int myflag = parmap->done;
+ __sync_fetch_and_add(&parmap->thread_counter, 1);
if (parmap->thread_counter < parmap->num_workers) {
/* wait for all workers to be ready */
futex_wait(&parmap->done, myflag);
/* wake all workers */
futex_wake(&parmap->work, parmap->num_workers);
- /* wait for all of them to finish */
- futex_wait(&parmap->done, myflag);
+ if (parmap->status == PARMAP_WORK) {
+ /* also work myself */
+ void* work = xbt_parmap_next(parmap);
+ if (work != NULL) {
+ parmap->fun(work);
+ }
+ }
+
+ unsigned int mycount = __sync_add_and_fetch(&parmap->thread_counter, 1);
+ if (mycount < parmap->num_workers) {
+ /* some workers have not finished yet */
+ futex_wait(&parmap->done, myflag);
+ }
+
#endif
}