tpool->mutex = xbt_os_mutex_init();
tpool->job_posted = xbt_os_cond_init();
tpool->job_taken = xbt_os_cond_init();
- tpool->job_done = xbt_os_cond_init();
+ tpool->idle_worker = xbt_os_cond_init();
tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
tpool->num_workers = num_workers;
tpool->num_idle_workers = 0;
tpool->flags = TPOOL_DESTROY;
while(tpool->num_workers){
+ DEBUG1("Still %u workers, waiting...", tpool->num_workers);
xbt_os_cond_signal(tpool->job_posted);
xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
}
/* Destroy pool's data structures */
xbt_os_cond_destroy(tpool->job_posted);
xbt_os_cond_destroy(tpool->job_taken);
- xbt_os_cond_destroy(tpool->job_done);
+ xbt_os_cond_destroy(tpool->idle_worker);
xbt_os_mutex_release(tpool->mutex);
xbt_os_mutex_destroy(tpool->mutex);
xbt_free(tpool);
void xbt_tpool_wait_all(xbt_tpool_t tpool)
{
- DEBUG1("Wait all in thread pool %p", tpool);
+ DEBUG1("Wait all workers in thread pool %p", tpool);
xbt_os_mutex_acquire(tpool->mutex);
+
while(tpool->num_idle_workers < tpool->num_workers
|| xbt_dynar_length(tpool->jobs_queue) > 0)
- xbt_os_cond_wait(tpool->job_done, tpool->mutex);
+ xbt_os_cond_wait(tpool->idle_worker, tpool->mutex);
+
xbt_os_mutex_release(tpool->mutex);
- DEBUG1("Wait all done in thread pool %p", tpool);
+ DEBUG1("Wait all workers done in thread pool %p", tpool);
return;
}
unsigned long i = tpoolcounter++; /* Debug purposes */
DEBUG1("New worker thread created (%lu)", i);
+
/* Worker's main loop */
while(1){
xbt_os_mutex_acquire(tpool->mutex);
- xbt_os_cond_signal(tpool->job_done);
- tpool->num_idle_workers++;
- DEBUG1("Worker %lu waiting for a job", i);
+ tpool->num_idle_workers++;
+ xbt_os_cond_signal(tpool->idle_worker);
/* If there are no jobs in the queue wait for one */
- while(xbt_dynar_length(tpool->jobs_queue) == 0)
+ while(xbt_dynar_length(tpool->jobs_queue) == 0
+ && tpool->flags != TPOOL_DESTROY){
+ DEBUG1("Worker %lu waiting for a job", i);
xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
+ }
DEBUG1("Worker %lu got a job", i);
/* Run the job and loop again ... */
job.fun(job.fun_arg);
-
- DEBUG1("Worker %lu done with job", i);
}
}
XBT_TEST_SUITE("tpool", "Thread pool");
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_tpool_unit, xbt, "Unit test for tpool");
+
xbt_tpool_t tpool;
void job(void *arg);
void job (void *arg)
{
- xbt_test_log1("I'm job %lu", (unsigned long)arg);
+ DEBUG1("I'm job %lu", (unsigned long)arg);
}
XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage")
{
xbt_test_add0("Create thread pool");
- unsigned long i;
+ unsigned long i,j;
/* Create thread pool */
tpool = xbt_tpool_new(5, 10);
- /* Queue some work */
- for(i=0; i < 20; i++)
- xbt_tpool_queue_job(tpool, job, (void*)i);
+ for(j=0; j < 10; j++){
+ DEBUG1("Round %lu", j);
+ /* Queue some work */
+ for(i=0; i < 20; i++){
+ DEBUG1("Queuing job %lu", i);
+ xbt_tpool_queue_job(tpool, job, (void*)i);
+ }
+ /* Wait for everyone */
+ xbt_tpool_wait_all(tpool);
+ }
/* Destroy thread pool */
xbt_tpool_destroy(tpool);
-
- xbt_test_assert0(0, "lala");
}
#endif /* SIMGRID_TEST */