int work; /* index of the current round (1 is the first) */
int done; /* number of rounds already done */
unsigned int thread_counter; /* number of threads currently working */
- unsigned int num_workers; /* total number of worker threads */
- unsigned int workers_max_id; /* id of the next worker thread to create */
+ unsigned int num_workers; /* total number of worker threads including the controller */
void_f_pvoid_t fun; /* function to run in parallel on each element of data */
xbt_dynar_t data; /* parameters to pass to fun in parallel */
unsigned int index; /* index of the next element of data to pick */
parmap->status = PARMAP_WORK;
/* Create the pool of worker threads */
- for (i = 0; i < num_workers; i++) {
+ for (i = 0; i < num_workers - 1; i++) {
worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL);
xbt_os_thread_detach(worker);
}
{
#ifdef HAVE_FUTEX_H
int myflag = parmap->done;
+ __sync_fetch_and_add(&parmap->thread_counter, 1);
if (parmap->thread_counter < parmap->num_workers) {
/* wait for all workers to be ready */
futex_wait(&parmap->done, myflag);
/* wake all workers */
futex_wake(&parmap->work, parmap->num_workers);
- /* wait for all of them to finish */
- futex_wait(&parmap->done, myflag);
+ if (parmap->status == PARMAP_WORK) {
+ /* also work myself */
+ void* work = xbt_parmap_next(parmap);
+ if (work != NULL) {
+ parmap->fun(work);
+ }
+ }
+
+ unsigned int mycount = __sync_add_and_fetch(&parmap->thread_counter, 1);
+ if (mycount < parmap->num_workers) {
+ /* some workers have not finished yet */
+ futex_wait(&parmap->done, myflag);
+ }
+
#endif
}