#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "atomic.h"
#include "thrpool.h"
static threadpool_job kill_job;
threadpool_t* threadpool_new(
int min,
int max) {
log_ereport(
LOG_VERBOSE,
"new threadpool (min: %d, max: %d)", min, max);
threadpool_t *pool = malloc(
sizeof(
threadpool_t));
pool->queue =
NULL;
pool->queue_len =
0;
pool->num_idle =
0;
pool->min_threads = min;
pool->max_threads = max;
pool->num_threads =
0;
pthread_mutex_init(&pool->queue_lock,
NULL);
pthread_mutex_init(&pool->avlbl_lock,
NULL);
pthread_cond_init(&pool->available,
NULL);
return pool;
}
int threadpool_start(
threadpool_t *pool) {
for(
int i=
0;i<pool->min_threads;i++) {
pthread_t t;
if (pthread_create(&t,
NULL, threadpool_func, pool) !=
0) {
log_ereport(
LOG_FAILURE,
"threadpool_start: pthread_create failed: %s", strerror(errno));
return 1;
}
}
return 0;
}
void* threadpool_func(
void *data) {
threadpool_t *pool = (
threadpool_t*)data;
ws_atomic_inc32(&pool->num_threads);
for(;;) {
threadpool_job *job = threadpool_get_job(pool);
if(job == &kill_job) {
break;
}
job->callback(job->data);
free(job);
}
uint32_t nthreads = ws_atomic_dec32(&pool->num_threads);
if(nthreads ==
0) {
log_ereport(
LOG_INFORM,
"threadpool closed");
}
return NULL;
}
threadpool_job* threadpool_get_job(
threadpool_t *pool) {
pthread_mutex_lock(&pool->queue_lock);
threadpool_job *job =
NULL;
pool->num_idle++;
while(job ==
NULL) {
if(pool->queue_len ==
0) {
pthread_cond_wait(&pool->available, &pool->queue_lock);
continue;
}
else {
pool_queue_t *q = pool->queue;
job = q->job;
pool->queue = q->next;
pool->queue_len--;
free(q);
}
}
pool->num_idle--;
pthread_mutex_unlock(&pool->queue_lock);
return job;
}
void threadpool_run(
threadpool_t *pool, job_callback_f func,
void *data) {
if(pool->num_threads ==
0) {
threadpool_start(pool);
}
threadpool_job *job = malloc(
sizeof(threadpool_job));
job->callback = func;
job->data = data;
pthread_mutex_lock(&pool->queue_lock);
threadpool_enqueue_job(pool, job);
int create_thread =
0;
int destroy_thread =
0;
int diff = pool->queue_len - pool->num_idle;
if(diff >
0 && pool->num_threads < pool->max_threads) {
create_thread =
1;
}
else if(diff <
0 && pool->num_threads > pool->min_threads) {
destroy_thread =
1;
}
pthread_cond_signal(&pool->available);
if(create_thread) {
pthread_t t;
if (pthread_create(&t,
NULL, threadpool_func, pool) !=
0) {
log_ereport(
LOG_FAILURE,
"threadpool_run: pthread_create failed: %s", strerror(errno));
}
}
if(destroy_thread) {
threadpool_enqueue_job(pool, &kill_job);
pthread_cond_signal(&pool->available);
}
pthread_mutex_unlock(&pool->queue_lock);
}
void threadpool_enqueue_job(
threadpool_t *pool, threadpool_job *job) {
pool_queue_t *q = malloc(
sizeof(
pool_queue_t));
q->job = job;
q->next =
NULL;
if(pool->queue ==
NULL) {
pool->queue = q;
}
else {
pool_queue_t *last_elem = pool->queue;
while(last_elem->next !=
NULL) {
last_elem = last_elem->next;
}
last_elem->next = q;
}
pool->queue_len++;
}
void threadpool_shutdown(
threadpool_t *pool) {
int nthreads = pool->max_threads;
for(
int i=
0;i<nthreads;i++) {
pthread_mutex_lock(&pool->queue_lock);
threadpool_enqueue_job(pool, &kill_job);
pthread_cond_signal(&pool->available);
pthread_mutex_unlock(&pool->queue_lock);
}
}