1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <stdio.h>
30 #include <stdlib.h>
31
32 #include <cx/hash_map.h>
33
34 #include "threadpools.h"
35
36
37 static CxMap *thread_pool_map;
38 static int num_thrpools;
39 static CxMap *io_pool_map;
40 static int num_iopools;
41
42 static threadpool_t *default_thread_pool;
43 static threadpool_t *last_thrpool_c;
44
45 static threadpool_t *default_io_pool;
46 static threadpool_t *last_io_pool;
47
48 int create_threadpool(cxstring name, ThreadPoolConfig *cfg) {
49 if(thread_pool_map ==
NULL) {
50 thread_pool_map = cxHashMapCreate(cxDefaultAllocator,
CX_STORE_POINTERS,
16);
51 }
52
53 CxHashKey key = cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length);
54 threadpool_t *pool = cxMapGet(thread_pool_map, key);
55 if(pool) {
56 if(pool->min_threads > cfg->max_threads) {
57 pool->min_threads = cfg->min_threads;
58 pool->max_threads = cfg->max_threads;
59 }
else {
60 pool->max_threads = cfg->max_threads;
61 pool->min_threads = cfg->min_threads;
62 }
63 return 0;
64 }
else {
65 threadpool_t *tp = threadpool_new(cfg->min_threads, cfg->max_threads);
66
67 int ret =
0;
68 if(!threadpool_start(tp)) {
69 ret = cxMapPut(thread_pool_map, key, tp);
70 }
else {
71 ret =
1;
72 }
73
74 if(ret ==
0) {
75 num_thrpools++;
76 last_thrpool_c = tp;
77 if(!default_thread_pool) {
78 default_thread_pool = tp;
79 }
80 }
81
82 return ret;
83 }
84 }
85
86 int create_io_pool(cxstring name,
int numthreads) {
87 if(io_pool_map ==
NULL) {
88 io_pool_map = cxHashMapCreate(cxDefaultAllocator,
CX_STORE_POINTERS,
4);
89 }
90 CxHashKey key = cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length);
91 threadpool_t *pool = cxMapGet(io_pool_map, key);
92 if(pool) {
93 pool->min_threads = numthreads;
94 pool->max_threads = numthreads;
95 return 0;
96 }
else {
97 threadpool_t *tp = threadpool_new(numthreads, numthreads);
98
99 int ret = cxMapPut(io_pool_map, key, tp);
100
101 if(ret ==
0) {
102 num_iopools++;
103 last_io_pool = tp;
104 if(!default_io_pool) {
105 default_io_pool = tp;
106 }
107 }
108
109 return ret;
110 }
111 }
112
113 int check_thread_pool_cfg() {
114 if(num_thrpools ==
0) {
115 ThreadPoolConfig cfg;
116 cfg.min_threads =
4;
117 cfg.max_threads =
8;
118 cfg.queue_size =
64;
119 cfg.stack_size =
262144;
120 if(create_threadpool(cx_str(
"default"), &cfg)) {
121 return 1;
122 }
123 }
124
125 if(num_iopools ==
0) {
126 if(create_io_pool(cx_str(
"default"),
8)) {
127 return 1;
128 }
129 }
130
131 return 0;
132 }
133
134 threadpool_t* get_default_threadpool() {
135 return default_thread_pool;
136 }
137
138 threadpool_t* get_threadpool(cxstring name) {
139 return cxMapGet(thread_pool_map, cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length));
140 }
141
142 threadpool_t* get_default_iopool() {
143 return default_io_pool;
144 }
145
146 threadpool_t* get_iopool(cxstring name) {
147 return cxMapGet(io_pool_map, cx_hash_key_bytes((
const unsigned char*)name.ptr, name.length));
148 }
149
150
151 void shutdown_threadpools(
void) {
152 log_ereport(
LOG_INFORM,
"shutdown threadpools");
153 CxIterator i = cxMapIteratorValues(thread_pool_map);
154 cx_foreach(
threadpool_t*, tp, i) {
155 threadpool_shutdown(tp);
156 }
157 i = cxMapIteratorValues(io_pool_map);
158 cx_foreach(
threadpool_t*, tp, i) {
159 threadpool_shutdown(tp);
160 }
161 }
162