--- a/src/server/util/thrpool.c Sun May 26 12:12:07 2013 +0200 +++ b/src/server/util/thrpool.c Sun May 26 22:05:41 2013 +0200 @@ -29,21 +29,28 @@ #include <stdio.h> #include <stdlib.h> #include <unistd.h> + +#include "atomic.h" #include "thrpool.h" - +static threadpool_job kill_job; -threadpool_t* threadpool_new(int n) { +threadpool_t* threadpool_new(int min, int max) { + printf("threadpool(%d, %d)\n", min, max); threadpool_t *pool = malloc(sizeof(threadpool_t)); pool->queue = NULL; pool->queue_len = 0; + pool->num_idle = 0; + pool->min_threads = min; + pool->max_threads = max; + pool->num_threads = 0; pthread_mutex_init(&pool->queue_lock, NULL); pthread_mutex_init(&pool->avlbl_lock, NULL); pthread_cond_init(&pool->available, NULL); /* create pool threads */ - for(int i=0;i<n;i++) { + for(int i=0;i<min;i++) { pthread_t t; if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { perror("Error: threadpool_new: pthread_create"); @@ -56,10 +63,11 @@ void* threadpool_func(void *data) { threadpool_t *pool = (threadpool_t*)data; - + + ws_atomic_inc32(&pool->num_threads); for(;;) { threadpool_job *job = threadpool_get_job(pool); - if(job == NULL) { + if(job == &kill_job) { break; } @@ -67,6 +75,7 @@ free(job); } + ws_atomic_dec32(&pool->num_threads); return NULL; } @@ -74,6 +83,7 @@ pthread_mutex_lock(&pool->queue_lock); threadpool_job *job = NULL; + pool->num_idle++; while(job == NULL) { if(pool->queue_len == 0) { pthread_cond_wait(&pool->available, &pool->queue_lock); @@ -86,9 +96,9 @@ free(q); } } + pool->num_idle--; pthread_mutex_unlock(&pool->queue_lock); - return job; } @@ -97,11 +107,41 @@ job->callback = func; job->data = data; + pthread_mutex_lock(&pool->queue_lock); + threadpool_enqueue_job(pool, job); + + int create_thread = 0; + int destroy_thread = 0; + int diff = pool->queue_len - pool->num_idle; + if(diff > 0 && pool->num_threads < pool->max_threads) { + create_thread = 1; + } else if(diff < 0 && pool->num_threads > pool->min_threads) { + destroy_thread = 1; + } + + //if(pool->queue_len == 1) { + pthread_cond_signal(&pool->available); + //} + + if(create_thread) { + pthread_t t; + if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { + perror("Error: threadpool_run: pthread_create"); + } + } + if(destroy_thread) { + threadpool_enqueue_job(pool, &kill_job); + pthread_cond_signal(&pool->available); + } + + pthread_mutex_unlock(&pool->queue_lock); +} + +void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) { pool_queue_t *q = malloc(sizeof(pool_queue_t)); q->job = job; q->next = NULL; - - pthread_mutex_lock(&pool->queue_lock); + if(pool->queue == NULL) { pool->queue = q; } else { @@ -112,11 +152,4 @@ last_elem->next = q; } pool->queue_len++; - - if(pool->queue_len == 1) { - pthread_cond_signal(&pool->available); - } - - pthread_mutex_unlock(&pool->queue_lock); - }