src/server/util/thrpool.c

changeset 570
f95868a8ec37
parent 569
70bca6190669
child 576
5c31cc844c68
equal deleted inserted replaced
569:70bca6190669 570:f95868a8ec37
58 } 58 }
59 59
60 int threadpool_start(threadpool_t *pool) { 60 int threadpool_start(threadpool_t *pool) {
61 /* create pool threads */ 61 /* create pool threads */
62 for(int i=0;i<pool->min_threads;i++) { 62 for(int i=0;i<pool->min_threads;i++) {
63 if (pthread_create(pool->threads + i, NULL, threadpool_func, pool) != 0) { 63 if (pthread_create(&pool->threads[i], NULL, threadpool_func, pool) != 0) {
64 log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno)); 64 log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno));
65 return 1; 65 return 1;
66 } else {
67 log_ereport(LOG_DEBUG, "thread started: %lu", (unsigned long)pool->threads[i]);
68 pthread_detach(pool->threads[i]);
66 } 69 }
67 } 70 }
68 return 0; 71 return 0;
69 } 72 }
70 73
73 76
74 pthread_t thr_self = pthread_self(); 77 pthread_t thr_self = pthread_self();
75 int thr_index = -1; 78 int thr_index = -1;
76 for(int i=0;i<pool->max_threads;i++) { 79 for(int i=0;i<pool->max_threads;i++) {
77 if(pool->threads[i] == thr_self) { 80 if(pool->threads[i] == thr_self) {
78 thr_index = i; 81 thr_index = i; // TODO: this is stupid, trasfer the thread index per data
79 break; 82 break;
80 } 83 }
81 } 84 }
82 85
83 if(thr_index == -1) { 86 if(thr_index == -1) {
223 last_elem->next = q; 226 last_elem->next = q;
224 } 227 }
225 pool->queue_len++; 228 pool->queue_len++;
226 } 229 }
227 230
228 void threadpool_shutdown(threadpool_t *pool) { 231 void threadpool_shutdown(threadpool_t *pool, int timeout) {
232 struct timespec ts;
233
229 int nthreads = pool->max_threads; 234 int nthreads = pool->max_threads;
230 for(int i=0;i<nthreads;i++) { 235 for(int i=0;i<nthreads;i++) {
231 pthread_mutex_lock(&pool->queue_lock); 236 clock_gettime(CLOCK_REALTIME, &ts);
237 ts.tv_sec += timeout;
238 if(pthread_mutex_timedlock(&pool->queue_lock, &ts)) {
239 log_ereport(LOG_FAILURE, "failed to shutdown threadpool: timeout");
240 return;
241 }
232 threadpool_enqueue_job(pool, &kill_job); 242 threadpool_enqueue_job(pool, &kill_job);
233 pthread_cond_signal(&pool->available); 243 pthread_cond_signal(&pool->available);
234 pthread_mutex_unlock(&pool->queue_lock); 244 pthread_mutex_unlock(&pool->queue_lock);
235 } 245 }
236 } 246
247 // not the nicest way to wait for threads to shutdown, but it is very
248 // simple and good enough for the webserver shutdown
249 sleep(1);
250 // check if all threads are closed
251 time_t t = time(NULL);
252 time_t end = t + timeout;
253 int i = 0;
254 while(t < end || i < 2) {
255 uint32_t num_threads = pool->num_threads;
256 if(num_threads == 0) {
257 break;
258 }
259
260 log_ereport(LOG_VERBOSE, "threadpool_shutdown: wait for thread shutdown: %u threads still running", (unsigned int)num_threads);
261 sleep(5);
262 i++;
263 t = time(NULL);
264 }
265
266 if(pool->num_threads) {
267 log_ereport(LOG_VERBOSE, "threadpool_shutdown successful");
268 } else if(t > end) {
269 log_ereport(LOG_WARN, "threadpool_shutdown: timeout");
270 }
271 }
272

mercurial