src/server/util/thrpool.c

changeset 67
50505dc3f8a6
parent 44
3da1f7b6847f
child 115
51d9a15eac98
--- a/src/server/util/thrpool.c	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/util/thrpool.c	Sun May 26 22:05:41 2013 +0200
@@ -29,21 +29,28 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+
+#include "atomic.h"
 #include "thrpool.h"
 
-
+static threadpool_job kill_job;
 
-threadpool_t* threadpool_new(int n) {
+threadpool_t* threadpool_new(int min, int max) {
+    printf("threadpool(%d, %d)\n", min, max);
     threadpool_t *pool = malloc(sizeof(threadpool_t));
     pool->queue = NULL;
     pool->queue_len = 0;
+    pool->num_idle = 0;
+    pool->min_threads = min;
+    pool->max_threads = max;
+    pool->num_threads = 0;
 
     pthread_mutex_init(&pool->queue_lock, NULL);
     pthread_mutex_init(&pool->avlbl_lock, NULL);
     pthread_cond_init(&pool->available, NULL);
 
     /* create pool threads */
-    for(int i=0;i<n;i++) {
+    for(int i=0;i<min;i++) {
         pthread_t t;
         if (pthread_create(&t, NULL, threadpool_func, pool) != 0) {
             perror("Error: threadpool_new: pthread_create");
@@ -56,10 +63,11 @@
 
 void* threadpool_func(void *data) {
     threadpool_t *pool = (threadpool_t*)data;
-
+    
+    ws_atomic_inc32(&pool->num_threads);
     for(;;) {
         threadpool_job *job = threadpool_get_job(pool);
-        if(job == NULL) {
+        if(job == &kill_job) {
             break;
         }
 
@@ -67,6 +75,7 @@
 
         free(job);
     }
+    ws_atomic_dec32(&pool->num_threads);
     return NULL;
 }
 
@@ -74,6 +83,7 @@
     pthread_mutex_lock(&pool->queue_lock);
 
     threadpool_job *job = NULL;
+    pool->num_idle++;
     while(job == NULL) {
         if(pool->queue_len == 0) {
             pthread_cond_wait(&pool->available, &pool->queue_lock);
@@ -86,9 +96,9 @@
             free(q);
         }
     }
+    pool->num_idle--;
 
     pthread_mutex_unlock(&pool->queue_lock);
-
     return job;
 }
 
@@ -97,11 +107,41 @@
     job->callback = func;
     job->data = data;
 
+    pthread_mutex_lock(&pool->queue_lock);
+    threadpool_enqueue_job(pool, job);
+
+    int create_thread = 0;
+    int destroy_thread = 0;
+    int diff = pool->queue_len - pool->num_idle;
+    if(diff > 0 && pool->num_threads < pool->max_threads) {
+        create_thread = 1;
+    } else if(diff < 0 && pool->num_threads > pool->min_threads) {
+        destroy_thread = 1;
+    }
+    
+    //if(pool->queue_len == 1) {
+    pthread_cond_signal(&pool->available);
+    //}
+    
+    if(create_thread) {
+        pthread_t t;
+        if (pthread_create(&t, NULL, threadpool_func, pool) != 0) {
+            perror("Error: threadpool_run: pthread_create");
+        }
+    }
+    if(destroy_thread) {
+        threadpool_enqueue_job(pool, &kill_job);
+        pthread_cond_signal(&pool->available);
+    }
+    
+    pthread_mutex_unlock(&pool->queue_lock);
+}
+
+void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) {
     pool_queue_t *q = malloc(sizeof(pool_queue_t));
     q->job = job;
     q->next = NULL;
-
-    pthread_mutex_lock(&pool->queue_lock);
+    
     if(pool->queue == NULL) {
         pool->queue = q;
     } else {
@@ -112,11 +152,4 @@
         last_elem->next = q;
     }
     pool->queue_len++;
-
-    if(pool->queue_len == 1) {
-        pthread_cond_signal(&pool->available);
-    }
-
-    pthread_mutex_unlock(&pool->queue_lock);
-
 }

mercurial