1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2 * All rights reserved. */
4 /* This program is free software; you can redistribute it and/or modify it
5 * under the terms of the license (GNU LGPL) which comes with this package. */
7 #include "threadpool_private.h"
9 static void *_xbt_tpool_worker_main(void *tpool);
11 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
14 xbt_os_thread_t worker = NULL;
16 /* Initialize thread pool data structure */
17 xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
18 tpool->mutex = xbt_os_mutex_init();
19 tpool->job_posted = xbt_os_cond_init();
20 tpool->job_taken = xbt_os_cond_init();
21 tpool->all_jobs_done = xbt_os_cond_init();
22 tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
23 tpool->num_workers = num_workers;
24 tpool->max_jobs = max_jobs;
26 /* Create the pool of worker threads */
27 for(i=0; i < num_workers; i++){
28 worker = xbt_os_thread_create(NULL, _xbt_tpool_worker_main, tpool, NULL);
29 xbt_os_thread_detach(worker);
35 void xbt_tpool_destroy(xbt_tpool_t tpool)
37 /* Lock the pool, then signal every worker an wait for each to finish */
38 xbt_os_mutex_acquire(tpool->mutex);
39 tpool->flags = TPOOL_DESTROY;
41 while(tpool->num_workers){
42 xbt_os_cond_signal(tpool->job_posted);
43 xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
46 /* Destroy pool's data structures */
47 xbt_os_cond_destroy(tpool->job_posted);
48 xbt_os_cond_destroy(tpool->job_taken);
49 xbt_os_cond_destroy(tpool->all_jobs_done);
50 xbt_os_mutex_release(tpool->mutex);
51 xbt_os_mutex_destroy(tpool->mutex);
55 void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
57 s_xbt_tpool_job_t job;
59 job.fun_arg = fun_arg;
61 /* Wait until we can lock on the pool with some space on it for the job */
62 xbt_os_mutex_acquire(tpool->mutex);
63 while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
64 xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
66 /* Push the job in the queue, signal the workers and unlock the pool */
67 xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
68 xbt_os_cond_signal(tpool->job_posted);
69 xbt_os_mutex_release(tpool->mutex);
73 void xbt_tpool_wait_all(xbt_tpool_t tpool)
75 xbt_os_mutex_acquire(tpool->mutex);
76 while(xbt_dynar_length(tpool->jobs_queue))
77 xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
78 xbt_os_mutex_release(tpool->mutex);
83 static void *_xbt_tpool_worker_main(void *arg)
85 s_xbt_tpool_job_t job;
86 xbt_tpool_t tpool = (xbt_tpool_t)arg;
88 /* Worker's main loop */
90 xbt_os_mutex_acquire(tpool->mutex);
92 /* If there are no jobs in the queue wait for one */
93 if(!xbt_dynar_length(tpool->jobs_queue))
94 xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
96 /* If we are shutting down, signal the destroyer so it can kill the other */
97 /* workers, unlock the pool and return */
98 if(tpool->flags == TPOOL_DESTROY){
100 xbt_os_cond_signal(tpool->job_taken);
101 xbt_os_mutex_release(tpool->mutex);
105 /* Get a job, signal the pool to inform jobs submitters and unlock it */
106 job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
107 xbt_os_cond_signal(tpool->job_taken);
108 xbt_os_mutex_release(tpool->mutex);
110 /* Run the job and loop again ... */
111 job.fun(job.fun_arg);