Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Bugfixes + test suitx
[simgrid.git] / src / xbt / threadpool.c
index 271dad4..aab6fc4 100644 (file)
@@ -6,21 +6,29 @@
 
 #include "threadpool_private.h"
 
 
 #include "threadpool_private.h"
 
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_threadpool, xbt,
+                                "threadpool: pool of worker threads");
+
 static void *_xbt_tpool_worker_main(void *tpool);
 
 static void *_xbt_tpool_worker_main(void *tpool);
 
+unsigned long tpoolcounter = 0;  /* Debug purposes */
+
 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
 {
   unsigned int i;
   xbt_os_thread_t worker = NULL;
 
 xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
 {
   unsigned int i;
   xbt_os_thread_t worker = NULL;
 
+  DEBUG2("Create new thread pool (%u, %u)", num_workers, max_jobs);
+
   /* Initialize thread pool data structure */
   xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
   tpool->mutex = xbt_os_mutex_init();
   tpool->job_posted = xbt_os_cond_init();
   tpool->job_taken = xbt_os_cond_init();
   /* Initialize thread pool data structure */
   xbt_tpool_t tpool = xbt_new0(s_xbt_tpool_t, 1);
   tpool->mutex = xbt_os_mutex_init();
   tpool->job_posted = xbt_os_cond_init();
   tpool->job_taken = xbt_os_cond_init();
-  tpool->all_jobs_done = xbt_os_cond_init();
+  tpool->job_done = xbt_os_cond_init();
   tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
   tpool->num_workers = num_workers;
   tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
   tpool->num_workers = num_workers;
+  tpool->num_idle_workers = 0;
   tpool->max_jobs = max_jobs;
   
   /* Create the pool of worker threads */
   tpool->max_jobs = max_jobs;
   
   /* Create the pool of worker threads */
@@ -34,19 +42,21 @@ xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
 
 void xbt_tpool_destroy(xbt_tpool_t tpool)
 { 
 
 void xbt_tpool_destroy(xbt_tpool_t tpool)
 { 
+  DEBUG1("Destroy thread pool %p", tpool);
+
   /* Lock the pool, then signal every worker an wait for each to finish */
   xbt_os_mutex_acquire(tpool->mutex);
   tpool->flags = TPOOL_DESTROY; 
 
   while(tpool->num_workers){
     xbt_os_cond_signal(tpool->job_posted);
   /* Lock the pool, then signal every worker an wait for each to finish */
   xbt_os_mutex_acquire(tpool->mutex);
   tpool->flags = TPOOL_DESTROY; 
 
   while(tpool->num_workers){
     xbt_os_cond_signal(tpool->job_posted);
-    xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
+    xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
   }
 
   /* Destroy pool's data structures */
   xbt_os_cond_destroy(tpool->job_posted);
   xbt_os_cond_destroy(tpool->job_taken);
   }
 
   /* Destroy pool's data structures */
   xbt_os_cond_destroy(tpool->job_posted);
   xbt_os_cond_destroy(tpool->job_taken);
-  xbt_os_cond_destroy(tpool->all_jobs_done);
+  xbt_os_cond_destroy(tpool->job_done);
   xbt_os_mutex_release(tpool->mutex);
   xbt_os_mutex_destroy(tpool->mutex);  
   xbt_free(tpool);
   xbt_os_mutex_release(tpool->mutex);
   xbt_os_mutex_destroy(tpool->mutex);  
   xbt_free(tpool);
@@ -63,6 +73,8 @@ void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
   while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
     xbt_os_cond_wait(tpool->job_taken, tpool->mutex); 
 
   while(xbt_dynar_length(tpool->jobs_queue) == tpool->max_jobs)
     xbt_os_cond_wait(tpool->job_taken, tpool->mutex); 
 
+  DEBUG3("Queue job %p (%p) to thread pool %p", fun, fun_arg, tpool);
+
   /* Push the job in the queue, signal the workers and unlock the pool */
   xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
   xbt_os_cond_signal(tpool->job_posted);
   /* Push the job in the queue, signal the workers and unlock the pool */
   xbt_dynar_push_as(tpool->jobs_queue, s_xbt_tpool_job_t, job);
   xbt_os_cond_signal(tpool->job_posted);
@@ -72,11 +84,13 @@ void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
 
 void xbt_tpool_wait_all(xbt_tpool_t tpool)
 {
 
 void xbt_tpool_wait_all(xbt_tpool_t tpool)
 {
+  DEBUG1("Wait all in thread pool %p", tpool);
   xbt_os_mutex_acquire(tpool->mutex);
   xbt_os_mutex_acquire(tpool->mutex);
-  while(xbt_dynar_length(tpool->jobs_queue))
-    xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
+  while(tpool->num_idle_workers < tpool->num_workers
+        || xbt_dynar_length(tpool->jobs_queue) > 0)
+    xbt_os_cond_wait(tpool->job_done, tpool->mutex);
   xbt_os_mutex_release(tpool->mutex);
   xbt_os_mutex_release(tpool->mutex);
-  
+  DEBUG1("Wait all done in thread pool %p", tpool);
   return;
 }
 
   return;
 }
 
@@ -84,18 +98,29 @@ static void *_xbt_tpool_worker_main(void *arg)
 {
   s_xbt_tpool_job_t job;
   xbt_tpool_t tpool = (xbt_tpool_t)arg;
 {
   s_xbt_tpool_job_t job;
   xbt_tpool_t tpool = (xbt_tpool_t)arg;
+
+  unsigned long i = tpoolcounter++; /* Debug purposes */
+  DEBUG1("New worker thread created (%lu)", i);
   
   /* Worker's main loop */
   while(1){
     xbt_os_mutex_acquire(tpool->mutex);
   
   /* Worker's main loop */
   while(1){
     xbt_os_mutex_acquire(tpool->mutex);
+    xbt_os_cond_signal(tpool->job_done);
+    tpool->num_idle_workers++;
+
+    DEBUG1("Worker %lu waiting for a job", i);
 
     /* If there are no jobs in the queue wait for one */
 
     /* If there are no jobs in the queue wait for one */
-    if(!xbt_dynar_length(tpool->jobs_queue))
+    while(xbt_dynar_length(tpool->jobs_queue) == 0)
       xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
 
       xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
 
+    DEBUG1("Worker %lu got a job", i);
+
     /* If we are shutting down, signal the destroyer so it can kill the other */
     /* workers, unlock the pool and return  */
     if(tpool->flags == TPOOL_DESTROY){
     /* If we are shutting down, signal the destroyer so it can kill the other */
     /* workers, unlock the pool and return  */
     if(tpool->flags == TPOOL_DESTROY){
+      DEBUG1("Shutting down worker %lu", i);
+      tpool->num_idle_workers--;
       tpool->num_workers--;
       xbt_os_cond_signal(tpool->job_taken);
       xbt_os_mutex_release(tpool->mutex);
       tpool->num_workers--;
       xbt_os_cond_signal(tpool->job_taken);
       xbt_os_mutex_release(tpool->mutex);
@@ -105,9 +130,47 @@ static void *_xbt_tpool_worker_main(void *arg)
     /* Get a job, signal the pool to inform jobs submitters and unlock it */
     job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
     xbt_os_cond_signal(tpool->job_taken);
     /* Get a job, signal the pool to inform jobs submitters and unlock it */
     job = xbt_dynar_pop_as(tpool->jobs_queue, s_xbt_tpool_job_t);
     xbt_os_cond_signal(tpool->job_taken);
+    tpool->num_idle_workers--;
     xbt_os_mutex_release(tpool->mutex);
   
     /* Run the job and loop again ... */
     job.fun(job.fun_arg);
     xbt_os_mutex_release(tpool->mutex);
   
     /* Run the job and loop again ... */
     job.fun(job.fun_arg);
+
+    DEBUG1("Worker %lu done with job", i);
   }
   }
-}
\ No newline at end of file
+}
+
+#ifdef SIMGRID_TEST
+#include "xbt.h"
+#include "xbt/ex.h"
+
+XBT_TEST_SUITE("tpool", "Thread pool");
+
+xbt_tpool_t tpool;
+
+void job(void *arg);
+
+void job (void *arg)
+{
+  xbt_test_log1("I'm job %lu", (unsigned long)arg);
+}
+
+XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage")
+{
+  xbt_test_add0("Create thread pool");
+
+  unsigned long i;
+  /* Create thread pool */
+  tpool = xbt_tpool_new(5, 10);
+
+  /* Queue some work */
+  for(i=0; i < 20; i++)
+    xbt_tpool_queue_job(tpool, job, (void*)i);
+
+  /* Destroy thread pool */
+  xbt_tpool_destroy(tpool);
+
+  xbt_test_assert0(0, "lala");
+}
+
+#endif /* SIMGRID_TEST */