X-Git-Url: http://info.iut-bm.univ-fcomte.fr/pub/gitweb/simgrid.git/blobdiff_plain/1710ae7ecbb5d366a11a7e5b7fff135a48c6b146..f0eb9ed1492509eeaa51629e9dc95b729a8481e3:/src/xbt/threadpool.c diff --git a/src/xbt/threadpool.c b/src/xbt/threadpool.c index 271dad4343..aab6fc494c 100644 --- a/src/xbt/threadpool.c +++ b/src/xbt/threadpool.c @@ -6,21 +6,29 @@ #include "threadpool_private.h" +XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_threadpool, xbt, + "threadpool: pool of worker threads"); + static void *_xbt_tpool_worker_main(void *tpool); +unsigned long tpoolcounter = 0; /* Debug purposes */ + xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs) { unsigned int i; xbt_os_thread_t worker = NULL; + DEBUG2("Create new thread pool (%u, %u)", num_workers, max_jobs); + /* Initialize thread pool data structure */ xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1); tpool->mutex = xbt_os_mutex_init(); tpool->job_posted = xbt_os_cond_init(); tpool->job_taken = xbt_os_cond_init(); - tpool->all_jobs_done = xbt_os_cond_init(); + tpool->job_done = xbt_os_cond_init(); tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL); tpool->num_workers = num_workers; + tpool->num_idle_workers = 0; tpool->max_jobs = max_jobs; /* Create the pool of worker threads */ @@ -34,19 +42,21 @@ xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs) void xbt_tpool_destroy(xbt_tpool_t tpool) { + DEBUG1("Destroy thread pool %p", tpool); + /* Lock the pool, then signal every worker an wait for each to finish */ xbt_os_mutex_acquire(tpool->mutex); tpool->flags = TPOOL_DESTROY; while(tpool->num_workers){ xbt_os_cond_signal(tpool->job_posted); - xbt_os_cond_wait(tpool->job_posted, tpool->mutex); + xbt_os_cond_wait(tpool->job_taken, tpool->mutex); } /* Destroy pool's data structures */ xbt_os_cond_destroy(tpool->job_posted); xbt_os_cond_destroy(tpool->job_taken); - xbt_os_cond_destroy(tpool->all_jobs_done); + xbt_os_cond_destroy(tpool->job_done); xbt_os_mutex_release(tpool->mutex); xbt_os_mutex_destroy(tpool->mutex); xbt_free(tpool); @@ -63,6 +73,8 @@ void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg) while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs) xbt_os_cond_wait(tpool->job_taken, tpool->mutex); + DEBUG3("Queue job %p (%p) to thread pool %p", fun, fun_arg, tpool); + /* Push the job in the queue, signal the workers and unlock the pool */ xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job); xbt_os_cond_signal(tpool->job_posted); @@ -72,11 +84,13 @@ void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg) void xbt_tpool_wait_all(xbt_tpool_t tpool) { + DEBUG1("Wait all in thread pool %p", tpool); xbt_os_mutex_acquire(tpool->mutex); - while(xbt_dynar_length(tpool->jobs_queue)) - xbt_os_cond_wait(tpool->job_taken, tpool->mutex); + while(tpool->num_idle_workers < tpool->num_workers + || xbt_dynar_length(tpool->jobs_queue) > 0) + xbt_os_cond_wait(tpool->job_done, tpool->mutex); xbt_os_mutex_release(tpool->mutex); - + DEBUG1("Wait all done in thread pool %p", tpool); return; } @@ -84,18 +98,29 @@ static void *_xbt_tpool_worker_main(void *arg) { s_xbt_tpool_job_t job; xbt_tpool_t tpool = (xbt_tpool_t)arg; + + unsigned long i = tpoolcounter++; /* Debug purposes */ + DEBUG1("New worker thread created (%lu)", i); /* Worker's main loop */ while(1){ xbt_os_mutex_acquire(tpool->mutex); + xbt_os_cond_signal(tpool->job_done); + tpool->num_idle_workers++; + + DEBUG1("Worker %lu waiting for a job", i); /* If there are no jobs in the queue wait for one */ - if(!xbt_dynar_length(tpool->jobs_queue)) + while(xbt_dynar_length(tpool->jobs_queue) == 0) xbt_os_cond_wait(tpool->job_posted, tpool->mutex); + DEBUG1("Worker %lu got a job", i); + /* If we are shutting down, signal the destroyer so it can kill the other */ /* workers, unlock the pool and return */ if(tpool->flags == TPOOL_DESTROY){ + DEBUG1("Shutting down worker %lu", i); + tpool->num_idle_workers--; tpool->num_workers--; xbt_os_cond_signal(tpool->job_taken); xbt_os_mutex_release(tpool->mutex); @@ -105,9 +130,47 @@ static void *_xbt_tpool_worker_main(void *arg) /* Get a job, signal the pool to inform jobs submitters and unlock it */ job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t); xbt_os_cond_signal(tpool->job_taken); + tpool->num_idle_workers--; xbt_os_mutex_release(tpool->mutex); /* Run the job and loop again ... */ job.fun(job.fun_arg); + + DEBUG1("Worker %lu done with job", i); } -} \ No newline at end of file +} + +#ifdef SIMGRID_TEST +#include "xbt.h" +#include "xbt/ex.h" + +XBT_TEST_SUITE("tpool", "Thread pool"); + +xbt_tpool_t tpool; + +void job(void *arg); + +void job (void *arg) +{ + xbt_test_log1("I'm job %lu", (unsigned long)arg); +} + +XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage") +{ + xbt_test_add0("Create thread pool"); + + unsigned long i; + /* Create thread pool */ + tpool = xbt_tpool_new(5, 10); + + /* Queue some work */ + for(i=0; i < 20; i++) + xbt_tpool_queue_job(tpool, job, (void*)i); + + /* Destroy thread pool */ + xbt_tpool_destroy(tpool); + + xbt_test_assert0(0, "lala"); +} + +#endif /* SIMGRID_TEST */