Logo AND Algorithmique Numérique Distribuée

Public GIT Repository
Rename condition variable + improve debugging
[simgrid.git] / src / xbt / threadpool.c
index aab6fc4..8459517 100644 (file)
@@ -25,7 +25,7 @@ xbt_tpool_t xbt_tpool_new(unsigned int num_workers, unsigned int max_jobs)
   tpool->mutex = xbt_os_mutex_init();
   tpool->job_posted = xbt_os_cond_init();
   tpool->job_taken = xbt_os_cond_init();
-  tpool->job_done = xbt_os_cond_init();
+  tpool->idle_worker = xbt_os_cond_init();
   tpool->jobs_queue = xbt_dynar_new(sizeof(s_xbt_tpool_job_t), NULL);
   tpool->num_workers = num_workers;
   tpool->num_idle_workers = 0;
@@ -49,6 +49,7 @@ void xbt_tpool_destroy(xbt_tpool_t tpool)
   tpool->flags = TPOOL_DESTROY; 
 
   while(tpool->num_workers){
+    DEBUG1("Still %u workers, waiting...", tpool->num_workers);
     xbt_os_cond_signal(tpool->job_posted);
     xbt_os_cond_wait(tpool->job_taken, tpool->mutex);
   }
@@ -56,7 +57,7 @@ void xbt_tpool_destroy(xbt_tpool_t tpool)
   /* Destroy pool's data structures */
   xbt_os_cond_destroy(tpool->job_posted);
   xbt_os_cond_destroy(tpool->job_taken);
-  xbt_os_cond_destroy(tpool->job_done);
+  xbt_os_cond_destroy(tpool->idle_worker);
   xbt_os_mutex_release(tpool->mutex);
   xbt_os_mutex_destroy(tpool->mutex);  
   xbt_free(tpool);
@@ -84,13 +85,15 @@ void xbt_tpool_queue_job(xbt_tpool_t tpool, void_f_pvoid_t fun, void* fun_arg)
 
 void xbt_tpool_wait_all(xbt_tpool_t tpool)
 {
-  DEBUG1("Wait all in thread pool %p", tpool);
+  DEBUG1("Wait all workers in thread pool %p", tpool);
   xbt_os_mutex_acquire(tpool->mutex);
+
   while(tpool->num_idle_workers < tpool->num_workers
         || xbt_dynar_length(tpool->jobs_queue) > 0)
-    xbt_os_cond_wait(tpool->job_done, tpool->mutex);
+    xbt_os_cond_wait(tpool->idle_worker, tpool->mutex);
+
   xbt_os_mutex_release(tpool->mutex);
-  DEBUG1("Wait all done in thread pool %p", tpool);
+  DEBUG1("Wait all workers done in thread pool %p", tpool);
   return;
 }
 
@@ -102,17 +105,20 @@ static void *_xbt_tpool_worker_main(void *arg)
   unsigned long i = tpoolcounter++; /* Debug purposes */
   DEBUG1("New worker thread created (%lu)", i);
   
+
   /* Worker's main loop */
   while(1){
     xbt_os_mutex_acquire(tpool->mutex);
-    xbt_os_cond_signal(tpool->job_done);
-    tpool->num_idle_workers++;
 
-    DEBUG1("Worker %lu waiting for a job", i);
+    tpool->num_idle_workers++;
+    xbt_os_cond_signal(tpool->idle_worker);
 
     /* If there are no jobs in the queue wait for one */
-    while(xbt_dynar_length(tpool->jobs_queue) == 0)
+    while(xbt_dynar_length(tpool->jobs_queue) == 0
+          && tpool->flags != TPOOL_DESTROY){
+      DEBUG1("Worker %lu waiting for a job", i);
       xbt_os_cond_wait(tpool->job_posted, tpool->mutex);
+    }
 
     DEBUG1("Worker %lu got a job", i);
 
@@ -135,8 +141,6 @@ static void *_xbt_tpool_worker_main(void *arg)
   
     /* Run the job and loop again ... */
     job.fun(job.fun_arg);
-
-    DEBUG1("Worker %lu done with job", i);
   }
 }
 
@@ -146,31 +150,38 @@ static void *_xbt_tpool_worker_main(void *arg)
 
 XBT_TEST_SUITE("tpool", "Thread pool");
 
+XBT_LOG_NEW_DEFAULT_SUBCATEGORY(xbt_tpool_unit, xbt, "Unit test for tpool");
+
 xbt_tpool_t tpool;
 
 void job(void *arg);
 
 void job (void *arg)
 {
-  xbt_test_log1("I'm job %lu", (unsigned long)arg);
+  DEBUG1("I'm job %lu", (unsigned long)arg);
 }
 
 XBT_TEST_UNIT("basic", test_tpool_basic, "Basic usage")
 {
   xbt_test_add0("Create thread pool");
 
-  unsigned long i;
+  unsigned long i,j;
   /* Create thread pool */
   tpool = xbt_tpool_new(5, 10);
 
-  /* Queue some work */
-  for(i=0; i < 20; i++)
-    xbt_tpool_queue_job(tpool, job, (void*)i);
+  for(j=0; j < 10; j++){
+    DEBUG1("Round %lu", j);
+    /* Queue some work */
+    for(i=0; i < 20; i++){
+      DEBUG1("Queuing job %lu", i);
+      xbt_tpool_queue_job(tpool, job, (void*)i);
+    }
+    /* Wait for everyone */
+    xbt_tpool_wait_all(tpool);
+  }
 
   /* Destroy thread pool */
   xbt_tpool_destroy(tpool);
-
-  xbt_test_assert0(0, "lala");
 }
 
 #endif /* SIMGRID_TEST */