Wed, 27 Nov 2024 23:00:07 +0100
add TODO to use a future ucx feature
/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * * Copyright 2013 Olaf Wintermann. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include <stdio.h> #include <stdlib.h> #include <cx/hash_map.h> #include "threadpools.h" static CxMap *thread_pool_map; static int num_thrpools; static CxMap *io_pool_map; static int num_iopools; static threadpool_t *default_thread_pool; static threadpool_t *last_thrpool_c; static threadpool_t *default_io_pool; static threadpool_t *last_io_pool; int create_threadpool(cxstring name, ThreadPoolConfig *cfg) { if(thread_pool_map == NULL) { thread_pool_map = cxHashMapCreate(cxDefaultAllocator, CX_STORE_POINTERS, 16); } CxHashKey key = cx_hash_key_bytes((const unsigned char*)name.ptr, name.length); threadpool_t *pool = cxMapGet(thread_pool_map, key); if(pool) { if(pool->min_threads > cfg->max_threads) { pool->min_threads = cfg->min_threads; pool->max_threads = cfg->max_threads; } else { pool->max_threads = cfg->max_threads; pool->min_threads = cfg->min_threads; } return 0; } else { threadpool_t *tp = threadpool_new(cfg->min_threads, cfg->max_threads); int ret = 0; if(!threadpool_start(tp)) { ret = cxMapPut(thread_pool_map, key, tp); } else { ret = 1; } if(ret == 0) { num_thrpools++; last_thrpool_c = tp; if(!default_thread_pool) { default_thread_pool = tp; } } return ret; } } int create_io_pool(cxstring name, int numthreads) { if(io_pool_map == NULL) { io_pool_map = cxHashMapCreate(cxDefaultAllocator, CX_STORE_POINTERS, 4); } CxHashKey key = cx_hash_key_bytes((const unsigned char*)name.ptr, name.length); threadpool_t *pool = cxMapGet(io_pool_map, key); if(pool) { pool->min_threads = numthreads; pool->max_threads = numthreads; return 0; } else { threadpool_t *tp = threadpool_new(numthreads, numthreads); int ret = cxMapPut(io_pool_map, key, tp); if(ret == 0) { num_iopools++; last_io_pool = tp; if(!default_io_pool) { default_io_pool = tp; } } return ret; } } int check_thread_pool_cfg() { if(num_thrpools == 0) { ThreadPoolConfig cfg; cfg.min_threads = 4; cfg.max_threads = 8; cfg.queue_size = 64; cfg.stack_size = 262144; if(create_threadpool(cx_str("default"), &cfg)) { return 1; } } if(num_iopools == 0) { if(create_io_pool(cx_str("default"), 8)) { return 1; } } return 0; } threadpool_t* get_default_threadpool() { return default_thread_pool; } threadpool_t* get_threadpool(cxstring name) { return cxMapGet(thread_pool_map, cx_hash_key_bytes((const unsigned char*)name.ptr, name.length)); } threadpool_t* get_default_iopool() { return default_io_pool; } threadpool_t* get_iopool(cxstring name) { return cxMapGet(io_pool_map, cx_hash_key_bytes((const unsigned char*)name.ptr, name.length)); } void shutdown_threadpools(void) { log_ereport(LOG_INFORM, "shutdown threadpools"); CxIterator i = cxMapIteratorValues(thread_pool_map); cx_foreach(threadpool_t*, tp, i) { threadpool_shutdown(tp); } i = cxMapIteratorValues(io_pool_map); cx_foreach(threadpool_t*, tp, i) { threadpool_shutdown(tp); } }