Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bugfixes + test suitx
[simgrid.git] / src / xbt / threadpool.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "threadpool_private.h"
8
9 XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_threadpool, xbt,
10                                 "threadpool: pool of worker threads");
11
12 static void *_xbt_tpool_worker_main(void *tpool);
13
14 unsigned long tpoolcounter = 0;  /* Debug purposes */
15
16 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
17 {
18   unsigned int i;
19   xbt_os_thread_t worker = NULL;
20
21   DEBUG2("Create new thread pool (%u, %u)", num_workers, max_jobs);
22
23   /* Initialize thread pool data structure */
24   xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
25   tpool->mutex = xbt_os_mutex_init();
26   tpool->job_posted = xbt_os_cond_init();
27   tpool->job_taken = xbt_os_cond_init();
28   tpool->job_done = xbt_os_cond_init();
29   tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
30   tpool->num_workers = num_workers;
31   tpool->num_idle_workers = 0;
32   tpool->max_jobs = max_jobs;
33   
34   /* Create the pool of worker threads */
35   for(i=0; i < num_workers; i++){
36     worker = xbt_os_thread_create(NULL, _xbt_tpool_worker_main, tpool, NULL);
37     xbt_os_thread_detach(worker);
38   }
39   
40   return tpool;
41 }
42
43 void xbt_tpool_destroy(xbt_tpool_t tpool)
44
45   DEBUG1("Destroy thread pool %p", tpool);
46
47   /* Lock the pool, then signal every worker an wait for each to finish */
48   xbt_os_mutex_acquire(tpool->mutex);
49   tpool->flags = TPOOL_DESTROY; 
50
51   while(tpool->num_workers){
52     xbt_os_cond_signal(tpool->job_posted);
53     xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
54   }
55
56   /* Destroy pool's data structures */
57   xbt_os_cond_destroy(tpool->job_posted);
58   xbt_os_cond_destroy(tpool->job_taken);
59   xbt_os_cond_destroy(tpool->job_done);
60   xbt_os_mutex_release(tpool->mutex);
61   xbt_os_mutex_destroy(tpool->mutex);  
62   xbt_free(tpool);
63 }
64
65 void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
66 {
67   s_xbt_tpool_job_t job;
68   job.fun = fun;
69   job.fun_arg = fun_arg;
70
71   /* Wait until we can lock on the pool with some space on it for the job */
72   xbt_os_mutex_acquire(tpool->mutex);
73   while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
74     xbt_os_cond_wait(tpool->job_taken, tpool->mutex); 
75
76   DEBUG3("Queue job %p (%p) to thread pool %p", fun, fun_arg, tpool);
77
78   /* Push the job in the queue, signal the workers and unlock the pool */
79   xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
80   xbt_os_cond_signal(tpool->job_posted);
81   xbt_os_mutex_release(tpool->mutex);    
82   return;
83 }
84
85 void xbt_tpool_wait_all(xbt_tpool_t tpool)
86 {
87   DEBUG1("Wait all in thread pool %p", tpool);
88   xbt_os_mutex_acquire(tpool->mutex);
89   while(tpool->num_idle_workers < tpool->num_workers
90         || xbt_dynar_length(tpool->jobs_queue) > 0)
91     xbt_os_cond_wait(tpool->job_done, tpool->mutex);
92   xbt_os_mutex_release(tpool->mutex);
93   DEBUG1("Wait all done in thread pool %p", tpool);
94   return;
95 }
96
97 static void *_xbt_tpool_worker_main(void *arg)
98 {
99   s_xbt_tpool_job_t job;
100   xbt_tpool_t tpool = (xbt_tpool_t)arg;
101
102   unsigned long i = tpoolcounter++; /* Debug purposes */
103   DEBUG1("New worker thread created (%lu)", i);
104   
105   /* Worker's main loop */
106   while(1){
107     xbt_os_mutex_acquire(tpool->mutex);
108     xbt_os_cond_signal(tpool->job_done);
109     tpool->num_idle_workers++;
110
111     DEBUG1("Worker %lu waiting for a job", i);
112
113     /* If there are no jobs in the queue wait for one */
114     while(xbt_dynar_length(tpool->jobs_queue) == 0)
115       xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
116
117     DEBUG1("Worker %lu got a job", i);
118
119     /* If we are shutting down, signal the destroyer so it can kill the other */
120     /* workers, unlock the pool and return  */
121     if(tpool->flags == TPOOL_DESTROY){
122       DEBUG1("Shutting down worker %lu", i);
123       tpool->num_idle_workers--;
124       tpool->num_workers--;
125       xbt_os_cond_signal(tpool->job_taken);
126       xbt_os_mutex_release(tpool->mutex);
127       return NULL;
128     }
129
130     /* Get a job, signal the pool to inform jobs submitters and unlock it */
131     job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
132     xbt_os_cond_signal(tpool->job_taken);
133     tpool->num_idle_workers--;
134     xbt_os_mutex_release(tpool->mutex);
135   
136     /* Run the job and loop again ... */
137     job.fun(job.fun_arg);
138
139     DEBUG1("Worker %lu done with job", i);
140   }
141 }
142
143 #ifdef SIMGRID_TEST
144 #include "xbt.h"
145 #include "xbt/ex.h"
146
147 XBT_TEST_SUITE("tpool", "Thread pool");
148
149 xbt_tpool_t tpool;
150
151 void job(void *arg);
152
153 void job (void *arg)
154 {
155   xbt_test_log1("I'm job %lu", (unsigned long)arg);
156 }
157
158 XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage")
159 {
160   xbt_test_add0("Create thread pool");
161
162   unsigned long i;
163   /* Create thread pool */
164   tpool = xbt_tpool_new(5, 10);
165
166   /* Queue some work */
167   for(i=0; i < 20; i++)
168     xbt_tpool_queue_job(tpool, job, (void*)i);
169
170   /* Destroy thread pool */
171   xbt_tpool_destroy(tpool);
172
173   xbt_test_assert0(0, "lala");
174 }
175
176 #endif /* SIMGRID_TEST */