1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "threadpool_private.h"
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_threadpool, xbt,
10 "threadpool: pool of worker threads");
12 static void *_xbt_tpool_worker_main(void *tpool);
14 unsigned long tpoolcounter = 0; /* Debug purposes */
16 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
19 xbt_os_thread_t worker = NULL;
21 DEBUG2("Create new thread pool (%u, %u)", num_workers, max_jobs);
23 /* Initialize thread pool data structure */
24 xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
25 tpool->mutex = xbt_os_mutex_init();
26 tpool->job_posted = xbt_os_cond_init();
27 tpool->job_taken = xbt_os_cond_init();
28 tpool->idle_worker = xbt_os_cond_init();
29 tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
30 tpool->num_workers = num_workers;
31 tpool->num_idle_workers = 0;
32 tpool->max_jobs = max_jobs;
34 /* Create the pool of worker threads */
35 for(i=0; i < num_workers; i++){
36 worker = xbt_os_thread_create(NULL, _xbt_tpool_worker_main, tpool, NULL);
37 xbt_os_thread_detach(worker);
43 void xbt_tpool_destroy(xbt_tpool_t tpool)
45 DEBUG1("Destroy thread pool %p", tpool);
47 /* Lock the pool, then signal every worker an wait for each to finish */
48 xbt_os_mutex_acquire(tpool->mutex);
49 tpool->flags = TPOOL_DESTROY;
51 while(tpool->num_workers){
52 DEBUG1("Still %u workers, waiting...", tpool->num_workers);
53 xbt_os_cond_signal(tpool->job_posted);
54 xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
57 /* Destroy pool's data structures */
58 xbt_os_cond_destroy(tpool->job_posted);
59 xbt_os_cond_destroy(tpool->job_taken);
60 xbt_os_cond_destroy(tpool->idle_worker);
61 xbt_os_mutex_release(tpool->mutex);
62 xbt_os_mutex_destroy(tpool->mutex);
66 void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
68 s_xbt_tpool_job_t job;
70 job.fun_arg = fun_arg;
72 /* Wait until we can lock on the pool with some space on it for the job */
73 xbt_os_mutex_acquire(tpool->mutex);
74 while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
75 xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
77 DEBUG3("Queue job %p (%p) to thread pool %p", fun, fun_arg, tpool);
79 /* Push the job in the queue, signal the workers and unlock the pool */
80 xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
81 xbt_os_cond_signal(tpool->job_posted);
82 xbt_os_mutex_release(tpool->mutex);
86 void xbt_tpool_wait_all(xbt_tpool_t tpool)
88 DEBUG1("Wait all workers in thread pool %p", tpool);
89 xbt_os_mutex_acquire(tpool->mutex);
91 while(tpool->num_idle_workers < tpool->num_workers
92 || xbt_dynar_length(tpool->jobs_queue) > 0)
93 xbt_os_cond_wait(tpool->idle_worker, tpool->mutex);
95 xbt_os_mutex_release(tpool->mutex);
96 DEBUG1("Wait all workers done in thread pool %p", tpool);
100 static void *_xbt_tpool_worker_main(void *arg)
102 s_xbt_tpool_job_t job;
103 xbt_tpool_t tpool = (xbt_tpool_t)arg;
105 unsigned long i = tpoolcounter++; /* Debug purposes */
106 DEBUG1("New worker thread created (%lu)", i);
109 /* Worker's main loop */
111 xbt_os_mutex_acquire(tpool->mutex);
113 tpool->num_idle_workers++;
114 xbt_os_cond_signal(tpool->idle_worker);
116 /* If there are no jobs in the queue wait for one */
117 while(xbt_dynar_length(tpool->jobs_queue) == 0
118 && tpool->flags != TPOOL_DESTROY){
119 DEBUG1("Worker %lu waiting for a job", i);
120 xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
123 DEBUG1("Worker %lu got a job", i);
125 /* If we are shutting down, signal the destroyer so it can kill the other */
126 /* workers, unlock the pool and return */
127 if(tpool->flags == TPOOL_DESTROY){
128 DEBUG1("Shutting down worker %lu", i);
129 tpool->num_idle_workers--;
130 tpool->num_workers--;
131 xbt_os_cond_signal(tpool->job_taken);
132 xbt_os_mutex_release(tpool->mutex);
136 /* Get a job, signal the pool to inform jobs submitters and unlock it */
137 job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
138 xbt_os_cond_signal(tpool->job_taken);
139 tpool->num_idle_workers--;
140 xbt_os_mutex_release(tpool->mutex);
142 /* Run the job and loop again ... */
143 job.fun(job.fun_arg);
151 XBT_TEST_SUITE("tpool", "Thread pool");
153 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_tpool_unit, xbt, "Unit test for tpool");
161 DEBUG1("I'm job %lu", (unsigned long)arg);
164 XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage")
166 xbt_test_add0("Create thread pool");
169 /* Create thread pool */
170 tpool = xbt_tpool_new(5, 10);
172 for(j=0; j < 10; j++){
173 DEBUG1("Round %lu", j);
174 /* Queue some work */
175 for(i=0; i < 20; i++){
176 DEBUG1("Queuing job %lu", i);
177 xbt_tpool_queue_job(tpool, job, (void*)i);
179 /* Wait for everyone */
180 xbt_tpool_wait_all(tpool);
183 /* Destroy thread pool */
184 xbt_tpool_destroy(tpool);
187 #endif /* SIMGRID_TEST */