Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Rename condition variable + improve debugging
[simgrid.git] / src / xbt / threadpool.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "threadpool_private.h"
8
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_threadpool, xbt,
10                                 "threadpool: pool of worker threads");
11
12 static void *_xbt_tpool_worker_main(void *tpool);
13
14 unsigned long tpoolcounter = 0;  /* Debug purposes */
15
16 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
17 {
18   unsigned int i;
19   xbt_os_thread_t worker = NULL;
20
21   DEBUG2("Create new thread pool (%u, %u)", num_workers, max_jobs);
22
23   /* Initialize thread pool data structure */
24   xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
25   tpool->mutex = xbt_os_mutex_init();
26   tpool->job_posted = xbt_os_cond_init();
27   tpool->job_taken = xbt_os_cond_init();
28   tpool->idle_worker = xbt_os_cond_init();
29   tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
30   tpool->num_workers = num_workers;
31   tpool->num_idle_workers = 0;
32   tpool->max_jobs = max_jobs;
33   
34   /* Create the pool of worker threads */
35   for(i=0; i < num_workers; i++){
36     worker = xbt_os_thread_create(NULL, _xbt_tpool_worker_main, tpool, NULL);
37     xbt_os_thread_detach(worker);
38   }
39   
40   return tpool;
41 }
42
43 void xbt_tpool_destroy(xbt_tpool_t tpool)
44
45   DEBUG1("Destroy thread pool %p", tpool);
46
47   /* Lock the pool, then signal every worker an wait for each to finish */
48   xbt_os_mutex_acquire(tpool->mutex);
49   tpool->flags = TPOOL_DESTROY; 
50
51   while(tpool->num_workers){
52     DEBUG1("Still %u workers, waiting...", tpool->num_workers);
53     xbt_os_cond_signal(tpool->job_posted);
54     xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
55   }
56
57   /* Destroy pool's data structures */
58   xbt_os_cond_destroy(tpool->job_posted);
59   xbt_os_cond_destroy(tpool->job_taken);
60   xbt_os_cond_destroy(tpool->idle_worker);
61   xbt_os_mutex_release(tpool->mutex);
62   xbt_os_mutex_destroy(tpool->mutex);  
63   xbt_free(tpool);
64 }
65
66 void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
67 {
68   s_xbt_tpool_job_t job;
69   job.fun = fun;
70   job.fun_arg = fun_arg;
71
72   /* Wait until we can lock on the pool with some space on it for the job */
73   xbt_os_mutex_acquire(tpool->mutex);
74   while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
75     xbt_os_cond_wait(tpool->job_taken, tpool->mutex); 
76
77   DEBUG3("Queue job %p (%p) to thread pool %p", fun, fun_arg, tpool);
78
79   /* Push the job in the queue, signal the workers and unlock the pool */
80   xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
81   xbt_os_cond_signal(tpool->job_posted);
82   xbt_os_mutex_release(tpool->mutex);    
83   return;
84 }
85
86 void xbt_tpool_wait_all(xbt_tpool_t tpool)
87 {
88   DEBUG1("Wait all workers in thread pool %p", tpool);
89   xbt_os_mutex_acquire(tpool->mutex);
90
91   while(tpool->num_idle_workers < tpool->num_workers
92         || xbt_dynar_length(tpool->jobs_queue) > 0)
93     xbt_os_cond_wait(tpool->idle_worker, tpool->mutex);
94
95   xbt_os_mutex_release(tpool->mutex);
96   DEBUG1("Wait all workers done in thread pool %p", tpool);
97   return;
98 }
99
100 static void *_xbt_tpool_worker_main(void *arg)
101 {
102   s_xbt_tpool_job_t job;
103   xbt_tpool_t tpool = (xbt_tpool_t)arg;
104
105   unsigned long i = tpoolcounter++; /* Debug purposes */
106   DEBUG1("New worker thread created (%lu)", i);
107   
108
109   /* Worker's main loop */
110   while(1){
111     xbt_os_mutex_acquire(tpool->mutex);
112
113     tpool->num_idle_workers++;
114     xbt_os_cond_signal(tpool->idle_worker);
115
116     /* If there are no jobs in the queue wait for one */
117     while(xbt_dynar_length(tpool->jobs_queue) == 0
118           && tpool->flags != TPOOL_DESTROY){
119       DEBUG1("Worker %lu waiting for a job", i);
120       xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
121     }
122
123     DEBUG1("Worker %lu got a job", i);
124
125     /* If we are shutting down, signal the destroyer so it can kill the other */
126     /* workers, unlock the pool and return  */
127     if(tpool->flags == TPOOL_DESTROY){
128       DEBUG1("Shutting down worker %lu", i);
129       tpool->num_idle_workers--;
130       tpool->num_workers--;
131       xbt_os_cond_signal(tpool->job_taken);
132       xbt_os_mutex_release(tpool->mutex);
133       return NULL;
134     }
135
136     /* Get a job, signal the pool to inform jobs submitters and unlock it */
137     job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
138     xbt_os_cond_signal(tpool->job_taken);
139     tpool->num_idle_workers--;
140     xbt_os_mutex_release(tpool->mutex);
141   
142     /* Run the job and loop again ... */
143     job.fun(job.fun_arg);
144   }
145 }
146
147 #ifdef SIMGRID_TEST
148 #include "xbt.h"
149 #include "xbt/ex.h"
150
151 XBT_TEST_SUITE("tpool", "Thread pool");
152
153 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_tpool_unit, xbt, "Unit test for tpool");
154
155 xbt_tpool_t tpool;
156
157 void job(void *arg);
158
159 void job (void *arg)
160 {
161   DEBUG1("I'm job %lu", (unsigned long)arg);
162 }
163
164 XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage")
165 {
166   xbt_test_add0("Create thread pool");
167
168   unsigned long i,j;
169   /* Create thread pool */
170   tpool = xbt_tpool_new(5, 10);
171
172   for(j=0; j < 10; j++){
173     DEBUG1("Round %lu", j);
174     /* Queue some work */
175     for(i=0; i < 20; i++){
176       DEBUG1("Queuing job %lu", i);
177       xbt_tpool_queue_job(tpool, job, (void*)i);
178     }
179     /* Wait for everyone */
180     xbt_tpool_wait_all(tpool);
181   }
182
183   /* Destroy thread pool */
184   xbt_tpool_destroy(tpool);
185 }
186
187 #endif /* SIMGRID_TEST */