1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32
33 #include "atomic.h"
34 #include "thrpool.h"
35
36 static threadpool_job kill_job;
37
38 threadpool_t* threadpool_new(
int min,
int max) {
39 log_ereport(
LOG_VERBOSE,
"new threadpool (min: %d, max: %d)", min, max);
40 threadpool_t *pool = malloc(
sizeof(
threadpool_t));
41 pool->queue =
NULL;
42 pool->queue_len =
0;
43 pool->num_idle =
0;
44 pool->min_threads = min;
45 pool->max_threads = max;
46 pool->num_threads =
0;
47
48 pthread_mutex_init(&pool->queue_lock,
NULL);
49 pthread_mutex_init(&pool->avlbl_lock,
NULL);
50 pthread_cond_init(&pool->available,
NULL);
51
52
53 for(
int i=
0;i<min;i++) {
54 pthread_t t;
55 if (pthread_create(&t,
NULL, threadpool_func, pool) !=
0) {
56 perror(
"Error: threadpool_new: pthread_create");
57 return NULL;
58 }
59 }
60
61 return pool;
62 }
63
64 void* threadpool_func(
void *data) {
65 threadpool_t *pool = (
threadpool_t*)data;
66
67 ws_atomic_inc32(&pool->num_threads);
68 for(;;) {
69 threadpool_job *job = threadpool_get_job(pool);
70 if(job == &kill_job) {
71 break;
72 }
73
74 job->callback(job->data);
75
76 free(job);
77 }
78 ws_atomic_dec32(&pool->num_threads);
79 return NULL;
80 }
81
82 threadpool_job* threadpool_get_job(
threadpool_t *pool) {
83 pthread_mutex_lock(&pool->queue_lock);
84
85 threadpool_job *job =
NULL;
86 pool->num_idle++;
87 while(job ==
NULL) {
88 if(pool->queue_len ==
0) {
89 pthread_cond_wait(&pool->available, &pool->queue_lock);
90 continue;
91 }
else {
92 pool_queue_t *q = pool->queue;
93 job = q->job;
94 pool->queue = q->next;
95 pool->queue_len--;
96 free(q);
97 }
98 }
99 pool->num_idle--;
100
101 pthread_mutex_unlock(&pool->queue_lock);
102 return job;
103 }
104
105 void threadpool_run(
threadpool_t *pool, job_callback_f func,
void *data) {
106 threadpool_job *job = malloc(
sizeof(threadpool_job));
107 job->callback = func;
108 job->data = data;
109
110 pthread_mutex_lock(&pool->queue_lock);
111 threadpool_enqueue_job(pool, job);
112
113 int create_thread =
0;
114 int destroy_thread =
0;
115 int diff = pool->queue_len - pool->num_idle;
116 if(diff >
0 && pool->num_threads < pool->max_threads) {
117 create_thread =
1;
118 }
else if(diff <
0 && pool->num_threads > pool->min_threads) {
119 destroy_thread =
1;
120 }
121
122
123 pthread_cond_signal(&pool->available);
124
125
126 if(create_thread) {
127 pthread_t t;
128 if (pthread_create(&t,
NULL, threadpool_func, pool) !=
0) {
129 perror(
"Error: threadpool_run: pthread_create");
130 }
131 }
132 if(destroy_thread) {
133 threadpool_enqueue_job(pool, &kill_job);
134 pthread_cond_signal(&pool->available);
135 }
136
137 pthread_mutex_unlock(&pool->queue_lock);
138 }
139
140 void threadpool_enqueue_job(
threadpool_t *pool, threadpool_job *job) {
141 pool_queue_t *q = malloc(
sizeof(
pool_queue_t));
142 q->job = job;
143 q->next =
NULL;
144
145 if(pool->queue ==
NULL) {
146 pool->queue = q;
147 }
else {
148 pool_queue_t *last_elem = pool->queue;
149 while(last_elem->next !=
NULL) {
150 last_elem = last_elem->next;
151 }
152 last_elem->next = q;
153 }
154 pool->queue_len++;
155 }
156