X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/09a6c2a0a7bb442da5487755964e9794eaee68cb..e6d9cc62d3fa395f1111d3af226afe91cc91f3b1:/src/xbt/parmap.c diff --git a/src/xbt/parmap.c b/src/xbt/parmap.c index c40fa0d941..4e79b71bb1 100644 --- a/src/xbt/parmap.c +++ b/src/xbt/parmap.c @@ -49,8 +49,7 @@ typedef struct s_xbt_parmap { int work; /* index of the current round (1 is the first) */ int done; /* number of rounds already done */ unsigned int thread_counter; /* number of threads currently working */ - unsigned int num_workers; /* total number of worker threads */ - unsigned int workers_max_id; /* id of the next worker thread to create */ + unsigned int num_workers; /* total number of worker threads including the controller */ void_f_pvoid_t fun; /* function to run in parallel on each element of data */ xbt_dynar_t data; /* parameters to pass to fun in parallel */ unsigned int index; /* index of the next element of data to pick */ @@ -75,7 +74,7 @@ xbt_parmap_t xbt_parmap_new(unsigned int num_workers) parmap->status = PARMAP_WORK; /* Create the pool of worker threads */ - for (i = 0; i < num_workers; i++) { + for (i = 0; i < num_workers - 1; i++) { worker = xbt_os_thread_create(NULL, xbt_parmap_worker_main, parmap, NULL); xbt_os_thread_detach(worker); } @@ -126,49 +125,34 @@ void* xbt_parmap_next(xbt_parmap_t parmap) return NULL; } -/** - * \brief Returns the worker id of the current thread. - * \param parmap a parmap - * \return the worker id - */ -unsigned long xbt_parmap_get_worker_id(xbt_parmap_t parmap) -{ - return (unsigned long) xbt_os_thread_get_extra_data(); -} - /** * \brief Main function of a worker thread. * \param arg the parmap */ static void *xbt_parmap_worker_main(void *arg) { - unsigned int worker_id; xbt_parmap_t parmap = (xbt_parmap_t) arg; - /* Fetch a worker id */ - worker_id = __sync_fetch_and_add(&parmap->workers_max_id, 1); - xbt_os_thread_set_extra_data((void*) (unsigned long) worker_id); - - XBT_DEBUG("New worker thread created (%u)", worker_id); + XBT_DEBUG("New worker thread created"); /* Worker's main loop */ while (1) { xbt_parmap_wait(parmap); if (parmap->status == PARMAP_WORK) { - XBT_DEBUG("Worker %u got a job", worker_id); + XBT_DEBUG("Worker got a job"); void* work = xbt_parmap_next(parmap); if (work != NULL) { parmap->fun(work); } - XBT_DEBUG("Worker %u has finished", worker_id); + XBT_DEBUG("Worker has finished"); /* We are destroying the parmap */ } else { xbt_parmap_end(parmap); - XBT_DEBUG("Shutting down worker %u", worker_id); + XBT_DEBUG("Shutting down worker"); return NULL; } } @@ -199,6 +183,7 @@ static void xbt_parmap_start(xbt_parmap_t parmap) { #ifdef HAVE_FUTEX_H int myflag = parmap->done; + __sync_fetch_and_add(&parmap->thread_counter, 1); if (parmap->thread_counter < parmap->num_workers) { /* wait for all workers to be ready */ futex_wait(&parmap->done, myflag); @@ -223,8 +208,20 @@ static void xbt_parmap_signal(xbt_parmap_t parmap) /* wake all workers */ futex_wake(&parmap->work, parmap->num_workers); - /* wait for all of them to finish */ - futex_wait(&parmap->done, myflag); + if (parmap->status == PARMAP_WORK) { + /* also work myself */ + void* work = xbt_parmap_next(parmap); + if (work != NULL) { + parmap->fun(work); + } + } + + unsigned int mycount = __sync_add_and_fetch(&parmap->thread_counter, 1); + if (mycount < parmap->num_workers) { + /* some workers have not finished yet */ + futex_wait(&parmap->done, myflag); + } + #endif }