1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #include "atomic.h"
34 #include "thrpool.h"
35
36 static threadpool_job kill_job;
37
38 threadpool_t* threadpool_new(
int min,
int max) {
39 log_ereport(
LOG_VERBOSE,
"new threadpool (min: %d, max: %d)", min, max);
40 threadpool_t *pool = malloc(
sizeof(
threadpool_t));
41 pool->queue =
NULL;
42 pool->queue_len =
0;
43 pool->num_idle =
0;
44 pool->min_threads = min;
45 pool->max_threads = max;
46 pool->num_threads =
0;
47
48 pthread_mutex_init(&pool->queue_lock,
NULL);
49 pthread_mutex_init(&pool->avlbl_lock,
NULL);
50 pthread_cond_init(&pool->available,
NULL);
51
52 return pool;
53 }
54
55 int threadpool_start(
threadpool_t *pool) {
56
57 for(
int i=
0;i<pool->min_threads;i++) {
58 pthread_t t;
59 if (pthread_create(&t,
NULL, threadpool_func, pool) !=
0) {
60 log_ereport(
LOG_FAILURE,
"threadpool_start: pthread_create failed: %s", strerror(errno));
61 return 1;
62 }
63 }
64 return 0;
65 }
66
67 void* threadpool_func(
void *data) {
68 threadpool_t *pool = (
threadpool_t*)data;
69
70 ws_atomic_inc32(&pool->num_threads);
71 for(;;) {
72 threadpool_job *job = threadpool_get_job(pool);
73 if(job == &kill_job) {
74 break;
75 }
76
77 job->callback(job->data);
78
79 free(job);
80 }
81 uint32_t nthreads = ws_atomic_dec32(&pool->num_threads);
82 if(nthreads ==
0) {
83 log_ereport(
LOG_INFORM,
"threadpool closed");
84 }
85 return NULL;
86 }
87
88 threadpool_job* threadpool_get_job(
threadpool_t *pool) {
89 pthread_mutex_lock(&pool->queue_lock);
90
91 threadpool_job *job =
NULL;
92 pool->num_idle++;
93 while(job ==
NULL) {
94 if(pool->queue_len ==
0) {
95 pthread_cond_wait(&pool->available, &pool->queue_lock);
96 continue;
97 }
else {
98 pool_queue_t *q = pool->queue;
99 job = q->job;
100 pool->queue = q->next;
101 pool->queue_len--;
102 free(q);
103 }
104 }
105 pool->num_idle--;
106
107 pthread_mutex_unlock(&pool->queue_lock);
108 return job;
109 }
110
111 void threadpool_run(
threadpool_t *pool, job_callback_f func,
void *data) {
112
113
114 if(pool->num_threads ==
0) {
115 threadpool_start(pool);
116 }
117
118 threadpool_job *job = malloc(
sizeof(threadpool_job));
119 job->callback = func;
120 job->data = data;
121
122 pthread_mutex_lock(&pool->queue_lock);
123 threadpool_enqueue_job(pool, job);
124
125 int create_thread =
0;
126 int destroy_thread =
0;
127 int diff = pool->queue_len - pool->num_idle;
128 if(diff >
0 && pool->num_threads < pool->max_threads) {
129 create_thread =
1;
130 }
else if(diff <
0 && pool->num_threads > pool->min_threads) {
131 destroy_thread =
1;
132 }
133
134
135 pthread_cond_signal(&pool->available);
136
137
138 if(create_thread) {
139 pthread_t t;
140 if (pthread_create(&t,
NULL, threadpool_func, pool) !=
0) {
141 log_ereport(
LOG_FAILURE,
"threadpool_run: pthread_create failed: %s", strerror(errno));
142 }
143 }
144 if(destroy_thread) {
145 threadpool_enqueue_job(pool, &kill_job);
146 pthread_cond_signal(&pool->available);
147 }
148
149 pthread_mutex_unlock(&pool->queue_lock);
150 }
151
152 void threadpool_enqueue_job(
threadpool_t *pool, threadpool_job *job) {
153 pool_queue_t *q = malloc(
sizeof(
pool_queue_t));
154 q->job = job;
155 q->next =
NULL;
156
157 if(pool->queue ==
NULL) {
158 pool->queue = q;
159 }
else {
160 pool_queue_t *last_elem = pool->queue;
161 while(last_elem->next !=
NULL) {
162 last_elem = last_elem->next;
163 }
164 last_elem->next = q;
165 }
166 pool->queue_len++;
167 }
168
169 void threadpool_shutdown(
threadpool_t *pool) {
170 int nthreads = pool->max_threads;
171 for(
int i=
0;i<nthreads;i++) {
172 pthread_mutex_lock(&pool->queue_lock);
173 threadpool_enqueue_job(pool, &kill_job);
174 pthread_cond_signal(&pool->available);
175 pthread_mutex_unlock(&pool->queue_lock);
176 }
177 }
178