Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
[trace] detailed description of new configuration options
[simgrid.git] / src / xbt / threadpool.c
1 /* Copyright (c) 2004, 2005, 2007, 2009, 2010. The SimGrid Team.
2  * All rights reserved.                                                     */
3
4 /* This program is free software; you can redistribute it and/or modify it
5  * under the terms of the license (GNU LGPL) which comes with this package. */
6
7 #include "threadpool_private.h"
8
9 static void *_xbt_tpool_worker_main(void *tpool);
10
11 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
12 {
13   unsigned int i;
14   xbt_os_thread_t worker = NULL;
15
16   /* Initialize thread pool data structure */
17   xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
18   tpool->mutex = xbt_os_mutex_init();
19   tpool->job_posted = xbt_os_cond_init();
20   tpool->job_taken = xbt_os_cond_init();
21   tpool->all_jobs_done = xbt_os_cond_init();
22   tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
23   tpool->num_workers = num_workers;
24   tpool->max_jobs = max_jobs;
25   
26   /* Create the pool of worker threads */
27   for(i=0; i < num_workers; i++){
28     worker = xbt_os_thread_create(NULL, _xbt_tpool_worker_main, tpool, NULL);
29     xbt_os_thread_detach(worker);
30   }
31   
32   return tpool;
33 }
34
35 void xbt_tpool_destroy(xbt_tpool_t tpool)
36
37   /* Lock the pool, then signal every worker an wait for each to finish */
38   xbt_os_mutex_acquire(tpool->mutex);
39   tpool->flags = TPOOL_DESTROY; 
40
41   while(tpool->num_workers){
42     xbt_os_cond_signal(tpool->job_posted);
43     xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
44   }
45
46   /* Destroy pool's data structures */
47   xbt_os_cond_destroy(tpool->job_posted);
48   xbt_os_cond_destroy(tpool->job_taken);
49   xbt_os_cond_destroy(tpool->all_jobs_done);
50   xbt_os_mutex_release(tpool->mutex);
51   xbt_os_mutex_destroy(tpool->mutex);  
52   xbt_free(tpool);
53 }
54
55 void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
56 {
57   s_xbt_tpool_job_t job;
58   job.fun = fun;
59   job.fun_arg = fun_arg;
60
61   /* Wait until we can lock on the pool with some space on it for the job */
62   xbt_os_mutex_acquire(tpool->mutex);
63   while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
64     xbt_os_cond_wait(tpool->job_taken, tpool->mutex); 
65
66   /* Push the job in the queue, signal the workers and unlock the pool */
67   xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
68   xbt_os_cond_signal(tpool->job_posted);
69   xbt_os_mutex_release(tpool->mutex);    
70   return;
71 }
72
73 void xbt_tpool_wait_all(xbt_tpool_t tpool)
74 {
75   xbt_os_mutex_acquire(tpool->mutex);
76   while(xbt_dynar_length(tpool->jobs_queue))
77     xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
78   xbt_os_mutex_release(tpool->mutex);
79   
80   return;
81 }
82
83 static void *_xbt_tpool_worker_main(void *arg)
84 {
85   s_xbt_tpool_job_t job;
86   xbt_tpool_t tpool = (xbt_tpool_t)arg;
87   
88   /* Worker's main loop */
89   while(1){
90     xbt_os_mutex_acquire(tpool->mutex);
91
92     /* If there are no jobs in the queue wait for one */
93     if(!xbt_dynar_length(tpool->jobs_queue))
94       xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
95
96     /* If we are shutting down, signal the destroyer so it can kill the other */
97     /* workers, unlock the pool and return  */
98     if(tpool->flags == TPOOL_DESTROY){
99       tpool->num_workers--;
100       xbt_os_cond_signal(tpool->job_taken);
101       xbt_os_mutex_release(tpool->mutex);
102       return NULL;
103     }
104
105     /* Get a job, signal the pool to inform jobs submitters and unlock it */
106     job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
107     xbt_os_cond_signal(tpool->job_taken);
108     xbt_os_mutex_release(tpool->mutex);
109   
110     /* Run the job and loop again ... */
111     job.fun(job.fun_arg);
112   }
113 }