#include <stdio.h>
#include <stdlib.h>
#include <cx/hash_map.h>
#include "threadpools.h"
static CxMap *thread_pool_map;
static int num_thrpools;
static CxMap *io_pool_map;
static int num_iopools;
static threadpool_t *default_thread_pool;
static threadpool_t *last_thrpool_c;
static threadpool_t *default_io_pool;
static threadpool_t *last_io_pool;
int create_threadpool(cxstring name, ThreadPoolConfig *cfg) {
if(thread_pool_map ==
NULL) {
thread_pool_map = cxHashMapCreate(cxDefaultAllocator,
CX_STORE_POINTERS,
16);
}
CxHashKey key = cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length);
threadpool_t *pool = cxMapGet(thread_pool_map, key);
if(pool) {
if(pool->min_threads > cfg->max_threads) {
pool->min_threads = cfg->min_threads;
pool->max_threads = cfg->max_threads;
}
else {
pool->max_threads = cfg->max_threads;
pool->min_threads = cfg->min_threads;
}
return 0;
}
else {
threadpool_t *tp = threadpool_new(cfg->min_threads, cfg->max_threads);
int ret =
0;
if(!threadpool_start(tp)) {
ret = cxMapPut(thread_pool_map, key, tp);
}
else {
ret =
1;
}
if(ret ==
0) {
num_thrpools++;
last_thrpool_c = tp;
if(!default_thread_pool) {
default_thread_pool = tp;
}
}
return ret;
}
}
int create_io_pool(cxstring name,
int numthreads) {
if(io_pool_map ==
NULL) {
io_pool_map = cxHashMapCreate(cxDefaultAllocator,
CX_STORE_POINTERS,
4);
}
CxHashKey key = cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length);
threadpool_t *pool = cxMapGet(io_pool_map, key);
if(pool) {
pool->min_threads = numthreads;
pool->max_threads = numthreads;
return 0;
}
else {
threadpool_t *tp = threadpool_new(numthreads, numthreads);
int ret = cxMapPut(io_pool_map, key, tp);
if(ret ==
0) {
num_iopools++;
last_io_pool = tp;
if(!default_io_pool) {
default_io_pool = tp;
}
}
return ret;
}
}
int check_thread_pool_cfg() {
if(num_thrpools ==
0) {
ThreadPoolConfig cfg;
cfg.min_threads =
4;
cfg.max_threads =
8;
cfg.queue_size =
64;
cfg.stack_size =
262144;
if(create_threadpool(cx_str(
"default"), &cfg)) {
return 1;
}
}
if(num_iopools ==
0) {
if(create_io_pool(cx_str(
"default"),
8)) {
return 1;
}
}
return 0;
}
threadpool_t* get_default_threadpool() {
return default_thread_pool;
}
threadpool_t* get_threadpool(cxstring name) {
return cxMapGet(thread_pool_map, cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length));
}
threadpool_t* get_default_iopool() {
return default_io_pool;
}
threadpool_t* get_iopool(cxstring name) {
return cxMapGet(io_pool_map, cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length));
}
void shutdown_threadpools(
void) {
log_ereport(
LOG_INFORM,
"shutdown threadpools");
CxIterator i = cxMapIteratorValues(thread_pool_map);
cx_foreach(
threadpool_t*, tp, i) {
threadpool_shutdown(tp);
}
i = cxMapIteratorValues(io_pool_map);
cx_foreach(
threadpool_t*, tp, i) {
threadpool_shutdown(tp);
}
}