1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "threadpool_private.h"
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_threadpool, xbt,
10 "threadpool: pool of worker threads");
12 static void *_xbt_tpool_worker_main(void *tpool);
14 unsigned long tpoolcounter = 0; /* Debug purposes */
16 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
19 xbt_os_thread_t worker = NULL;
21 DEBUG2("Create new thread pool (%u, %u)", num_workers, max_jobs);
23 /* Initialize thread pool data structure */
24 xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
25 tpool->mutex = xbt_os_mutex_init();
26 tpool->job_posted = xbt_os_cond_init();
27 tpool->job_taken = xbt_os_cond_init();
28 tpool->job_done = xbt_os_cond_init();
29 tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
30 tpool->num_workers = num_workers;
31 tpool->num_idle_workers = 0;
32 tpool->max_jobs = max_jobs;
34 /* Create the pool of worker threads */
35 for(i=0; i < num_workers; i++){
36 worker = xbt_os_thread_create(NULL, _xbt_tpool_worker_main, tpool, NULL);
37 xbt_os_thread_detach(worker);
43 void xbt_tpool_destroy(xbt_tpool_t tpool)
45 DEBUG1("Destroy thread pool %p", tpool);
47 /* Lock the pool, then signal every worker an wait for each to finish */
48 xbt_os_mutex_acquire(tpool->mutex);
49 tpool->flags = TPOOL_DESTROY;
51 while(tpool->num_workers){
52 xbt_os_cond_signal(tpool->job_posted);
53 xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
56 /* Destroy pool's data structures */
57 xbt_os_cond_destroy(tpool->job_posted);
58 xbt_os_cond_destroy(tpool->job_taken);
59 xbt_os_cond_destroy(tpool->job_done);
60 xbt_os_mutex_release(tpool->mutex);
61 xbt_os_mutex_destroy(tpool->mutex);
65 void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
67 s_xbt_tpool_job_t job;
69 job.fun_arg = fun_arg;
71 /* Wait until we can lock on the pool with some space on it for the job */
72 xbt_os_mutex_acquire(tpool->mutex);
73 while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
74 xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
76 DEBUG3("Queue job %p (%p) to thread pool %p", fun, fun_arg, tpool);
78 /* Push the job in the queue, signal the workers and unlock the pool */
79 xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
80 xbt_os_cond_signal(tpool->job_posted);
81 xbt_os_mutex_release(tpool->mutex);
85 void xbt_tpool_wait_all(xbt_tpool_t tpool)
87 DEBUG1("Wait all in thread pool %p", tpool);
88 xbt_os_mutex_acquire(tpool->mutex);
89 while(tpool->num_idle_workers < tpool->num_workers
90 || xbt_dynar_length(tpool->jobs_queue) > 0)
91 xbt_os_cond_wait(tpool->job_done, tpool->mutex);
92 xbt_os_mutex_release(tpool->mutex);
93 DEBUG1("Wait all done in thread pool %p", tpool);
97 static void *_xbt_tpool_worker_main(void *arg)
99 s_xbt_tpool_job_t job;
100 xbt_tpool_t tpool = (xbt_tpool_t)arg;
102 unsigned long i = tpoolcounter++; /* Debug purposes */
103 DEBUG1("New worker thread created (%lu)", i);
105 /* Worker's main loop */
107 xbt_os_mutex_acquire(tpool->mutex);
108 xbt_os_cond_signal(tpool->job_done);
109 tpool->num_idle_workers++;
111 DEBUG1("Worker %lu waiting for a job", i);
113 /* If there are no jobs in the queue wait for one */
114 while(xbt_dynar_length(tpool->jobs_queue) == 0)
115 xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
117 DEBUG1("Worker %lu got a job", i);
119 /* If we are shutting down, signal the destroyer so it can kill the other */
120 /* workers, unlock the pool and return */
121 if(tpool->flags == TPOOL_DESTROY){
122 DEBUG1("Shutting down worker %lu", i);
123 tpool->num_idle_workers--;
124 tpool->num_workers--;
125 xbt_os_cond_signal(tpool->job_taken);
126 xbt_os_mutex_release(tpool->mutex);
130 /* Get a job, signal the pool to inform jobs submitters and unlock it */
131 job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
132 xbt_os_cond_signal(tpool->job_taken);
133 tpool->num_idle_workers--;
134 xbt_os_mutex_release(tpool->mutex);
136 /* Run the job and loop again ... */
137 job.fun(job.fun_arg);
139 DEBUG1("Worker %lu done with job", i);
147 XBT_TEST_SUITE("tpool", "Thread pool");
155 xbt_test_log1("I'm job %lu", (unsigned long)arg);
158 XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage")
160 xbt_test_add0("Create thread pool");
163 /* Create thread pool */
164 tpool = xbt_tpool_new(5, 10);
166 /* Queue some work */
167 for(i=0; i < 20; i++)
168 xbt_tpool_queue_job(tpool, job, (void*)i);
170 /* Destroy thread pool */
171 xbt_tpool_destroy(tpool);
173 xbt_test_assert0(0, "lala");
176 #endif /* SIMGRID_TEST */