34 #include "thrpool.h" |
34 #include "thrpool.h" |
35 |
35 |
36 static threadpool_job kill_job; |
36 static threadpool_job kill_job; |
37 |
37 |
38 threadpool_t* threadpool_new(int min, int max) { |
38 threadpool_t* threadpool_new(int min, int max) { |
39 log_ereport(LOG_VERBOSE, "new threadpool (min: %d, max: %d)", min, max); |
39 log_ereport(LOG_INFORM, "new threadpool (min: %d, max: %d)", min, max); |
40 threadpool_t *pool = malloc(sizeof(threadpool_t)); |
40 threadpool_t *pool = malloc(sizeof(threadpool_t)); |
41 pool->queue = NULL; |
41 pool->queue = NULL; |
42 pool->queue_len = 0; |
42 pool->queue_len = 0; |
43 pool->num_idle = 0; |
43 pool->num_idle = 0; |
44 pool->min_threads = min; |
44 pool->min_threads = min; |
45 pool->max_threads = max; |
45 pool->max_threads = max; |
46 pool->num_threads = 0; |
46 pool->num_threads = 0; |
|
47 pool->last_job = 0; |
|
48 pool->last_thread = -1; |
|
49 |
|
50 pool->threads = calloc(max, sizeof(pthread_t)); |
|
51 pool->thrstatus = calloc(max, sizeof(int)); |
47 |
52 |
48 pthread_mutex_init(&pool->queue_lock, NULL); |
53 pthread_mutex_init(&pool->queue_lock, NULL); |
49 pthread_mutex_init(&pool->avlbl_lock, NULL); |
54 pthread_mutex_init(&pool->avlbl_lock, NULL); |
50 pthread_cond_init(&pool->available, NULL); |
55 pthread_cond_init(&pool->available, NULL); |
51 |
56 |
53 } |
58 } |
54 |
59 |
55 int threadpool_start(threadpool_t *pool) { |
60 int threadpool_start(threadpool_t *pool) { |
56 /* create pool threads */ |
61 /* create pool threads */ |
57 for(int i=0;i<pool->min_threads;i++) { |
62 for(int i=0;i<pool->min_threads;i++) { |
58 pthread_t t; |
63 if (pthread_create(pool->threads + i, NULL, threadpool_func, pool) != 0) { |
59 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { |
|
60 log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno)); |
64 log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno)); |
61 return 1; |
65 return 1; |
62 } |
66 } |
63 } |
67 } |
64 return 0; |
68 return 0; |
65 } |
69 } |
66 |
70 |
67 void* threadpool_func(void *data) { |
71 void* threadpool_func(void *data) { |
68 threadpool_t *pool = (threadpool_t*)data; |
72 threadpool_t *pool = (threadpool_t*)data; |
69 |
73 |
|
74 pthread_t thr_self = pthread_self(); |
|
75 int thr_index = -1; |
|
76 for(int i=0;i<pool->max_threads;i++) { |
|
77 if(pool->threads[i] == thr_self) { |
|
78 thr_index = i; |
|
79 break; |
|
80 } |
|
81 } |
|
82 |
|
83 if(thr_index == -1) { |
|
84 log_ereport(LOG_CATASTROPHE, "threadpool: cannot find thread index for thread %ull\n", (unsigned long long)thr_self); |
|
85 return NULL; |
|
86 } |
|
87 |
70 ws_atomic_inc32(&pool->num_threads); |
88 ws_atomic_inc32(&pool->num_threads); |
71 for(;;) { |
89 for(;;) { |
72 threadpool_job *job = threadpool_get_job(pool); |
90 threadpool_job *job = threadpool_get_job(pool, thr_index); |
73 if(job == &kill_job) { |
91 if(job == &kill_job) { |
74 break; |
92 break; |
75 } |
93 } |
76 |
94 |
77 job->callback(job->data); |
95 job->callback(job->data); |
83 log_ereport(LOG_INFORM, "threadpool closed"); // TODO: log threadpool name |
101 log_ereport(LOG_INFORM, "threadpool closed"); // TODO: log threadpool name |
84 } |
102 } |
85 return NULL; |
103 return NULL; |
86 } |
104 } |
87 |
105 |
88 threadpool_job* threadpool_get_job(threadpool_t *pool) { |
106 threadpool_job* threadpool_get_job(threadpool_t *pool, int thread_index) { |
89 pthread_mutex_lock(&pool->queue_lock); |
107 struct timespec timeout; |
|
108 clock_gettime(CLOCK_REALTIME, &timeout); |
|
109 timeout.tv_sec += 30; |
|
110 |
|
111 while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) { |
|
112 log_ereport(LOG_INFORM, "threadpool_get_job: mutex timeout"); |
|
113 timeout.tv_sec += 30; |
|
114 } |
90 |
115 |
91 threadpool_job *job = NULL; |
116 threadpool_job *job = NULL; |
92 pool->num_idle++; |
117 pool->num_idle++; |
93 while(job == NULL) { |
118 while(job == NULL) { |
94 if(pool->queue_len == 0) { |
119 if(pool->queue_len == 0) { |
95 pthread_cond_wait(&pool->available, &pool->queue_lock); |
120 timeout.tv_sec += 30; |
|
121 while(pthread_cond_timedwait(&pool->available, &pool->queue_lock, &timeout)) { |
|
122 log_ereport(LOG_DEBUG, "threadpool_get_job: cond timeout: thread: %d queue: %u", thread_index, (unsigned int)pool->queue_len); |
|
123 timeout.tv_sec += 60; |
|
124 } |
96 continue; |
125 continue; |
97 } else { |
126 } else { |
98 pool_queue_t *q = pool->queue; |
127 pool_queue_t *q = pool->queue; |
99 job = q->job; |
128 job = q->job; |
100 pool->queue = q->next; |
129 pool->queue = q->next; |
101 pool->queue_len--; |
130 pool->queue_len--; |
102 free(q); |
131 free(q); |
103 } |
132 } |
104 } |
133 } |
105 pool->num_idle--; |
134 pool->num_idle--; |
|
135 |
|
136 pool->last_thread = thread_index; |
|
137 pool->last_job = time(NULL); |
106 |
138 |
107 pthread_mutex_unlock(&pool->queue_lock); |
139 pthread_mutex_unlock(&pool->queue_lock); |
108 return job; |
140 return job; |
109 } |
141 } |
110 |
142 |
117 |
149 |
118 threadpool_job *job = malloc(sizeof(threadpool_job)); |
150 threadpool_job *job = malloc(sizeof(threadpool_job)); |
119 job->callback = func; |
151 job->callback = func; |
120 job->data = data; |
152 job->data = data; |
121 |
153 |
122 pthread_mutex_lock(&pool->queue_lock); |
154 struct timespec timeout; |
|
155 clock_gettime(CLOCK_REALTIME, &timeout); |
|
156 timeout.tv_sec += 30; |
|
157 |
|
158 while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) { |
|
159 log_ereport(LOG_INFORM, "threadpool_run: mutex timeout"); |
|
160 timeout.tv_sec += 30; |
|
161 } |
123 threadpool_enqueue_job(pool, job); |
162 threadpool_enqueue_job(pool, job); |
124 |
163 |
125 int create_thread = 0; |
164 int create_thread = 0; |
126 int destroy_thread = 0; |
165 int destroy_thread = 0; |
127 int diff = pool->queue_len - pool->num_idle; |
166 int diff = pool->queue_len - pool->num_idle; |
144 if(destroy_thread) { |
183 if(destroy_thread) { |
145 threadpool_enqueue_job(pool, &kill_job); |
184 threadpool_enqueue_job(pool, &kill_job); |
146 pthread_cond_signal(&pool->available); |
185 pthread_cond_signal(&pool->available); |
147 } |
186 } |
148 |
187 |
|
188 // some diagnostics: |
|
189 // if the queue has multiple elements, but the last job was started |
|
190 // over a minute ago, print some diagnostic message, because |
|
191 // this does look wrong |
|
192 if(pool->queue_len > 5 && pool->last_job != 0) { |
|
193 // reuse timeout sec value, because we don't need the most accurate |
|
194 // time value here |
|
195 time_t current = timeout.tv_sec; |
|
196 if(pool->last_job + 60 < current) { |
|
197 // looks like the threadpool is blocked |
|
198 struct tm lastjob; |
|
199 localtime_r(&pool->last_job, &lastjob); |
|
200 log_ereport( |
|
201 LOG_WARN, |
|
202 "high threadpool wait time: queue: %u lastjob: %02d:%02d:%02d", |
|
203 (unsigned int)pool->queue_len, |
|
204 lastjob.tm_hour, lastjob.tm_min, lastjob.tm_sec); |
|
205 } |
|
206 } |
|
207 |
149 pthread_mutex_unlock(&pool->queue_lock); |
208 pthread_mutex_unlock(&pool->queue_lock); |
150 } |
209 } |
151 |
210 |
152 void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) { |
211 void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) { |
153 pool_queue_t *q = malloc(sizeof(pool_queue_t)); |
212 pool_queue_t *q = malloc(sizeof(pool_queue_t)); |