src/server/util/thrpool.c

changeset 569
70bca6190669
parent 556
b036ccad4b49
child 570
f95868a8ec37
equal deleted inserted replaced
568:3f8c587734aa 569:70bca6190669
34 #include "thrpool.h" 34 #include "thrpool.h"
35 35
36 static threadpool_job kill_job; 36 static threadpool_job kill_job;
37 37
38 threadpool_t* threadpool_new(int min, int max) { 38 threadpool_t* threadpool_new(int min, int max) {
39 log_ereport(LOG_VERBOSE, "new threadpool (min: %d, max: %d)", min, max); 39 log_ereport(LOG_INFORM, "new threadpool (min: %d, max: %d)", min, max);
40 threadpool_t *pool = malloc(sizeof(threadpool_t)); 40 threadpool_t *pool = malloc(sizeof(threadpool_t));
41 pool->queue = NULL; 41 pool->queue = NULL;
42 pool->queue_len = 0; 42 pool->queue_len = 0;
43 pool->num_idle = 0; 43 pool->num_idle = 0;
44 pool->min_threads = min; 44 pool->min_threads = min;
45 pool->max_threads = max; 45 pool->max_threads = max;
46 pool->num_threads = 0; 46 pool->num_threads = 0;
47 pool->last_job = 0;
48 pool->last_thread = -1;
49
50 pool->threads = calloc(max, sizeof(pthread_t));
51 pool->thrstatus = calloc(max, sizeof(int));
47 52
48 pthread_mutex_init(&pool->queue_lock, NULL); 53 pthread_mutex_init(&pool->queue_lock, NULL);
49 pthread_mutex_init(&pool->avlbl_lock, NULL); 54 pthread_mutex_init(&pool->avlbl_lock, NULL);
50 pthread_cond_init(&pool->available, NULL); 55 pthread_cond_init(&pool->available, NULL);
51 56
53 } 58 }
54 59
55 int threadpool_start(threadpool_t *pool) { 60 int threadpool_start(threadpool_t *pool) {
56 /* create pool threads */ 61 /* create pool threads */
57 for(int i=0;i<pool->min_threads;i++) { 62 for(int i=0;i<pool->min_threads;i++) {
58 pthread_t t; 63 if (pthread_create(pool->threads + i, NULL, threadpool_func, pool) != 0) {
59 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) {
60 log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno)); 64 log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno));
61 return 1; 65 return 1;
62 } 66 }
63 } 67 }
64 return 0; 68 return 0;
65 } 69 }
66 70
67 void* threadpool_func(void *data) { 71 void* threadpool_func(void *data) {
68 threadpool_t *pool = (threadpool_t*)data; 72 threadpool_t *pool = (threadpool_t*)data;
69 73
74 pthread_t thr_self = pthread_self();
75 int thr_index = -1;
76 for(int i=0;i<pool->max_threads;i++) {
77 if(pool->threads[i] == thr_self) {
78 thr_index = i;
79 break;
80 }
81 }
82
83 if(thr_index == -1) {
84 log_ereport(LOG_CATASTROPHE, "threadpool: cannot find thread index for thread %ull\n", (unsigned long long)thr_self);
85 return NULL;
86 }
87
70 ws_atomic_inc32(&pool->num_threads); 88 ws_atomic_inc32(&pool->num_threads);
71 for(;;) { 89 for(;;) {
72 threadpool_job *job = threadpool_get_job(pool); 90 threadpool_job *job = threadpool_get_job(pool, thr_index);
73 if(job == &kill_job) { 91 if(job == &kill_job) {
74 break; 92 break;
75 } 93 }
76 94
77 job->callback(job->data); 95 job->callback(job->data);
83 log_ereport(LOG_INFORM, "threadpool closed"); // TODO: log threadpool name 101 log_ereport(LOG_INFORM, "threadpool closed"); // TODO: log threadpool name
84 } 102 }
85 return NULL; 103 return NULL;
86 } 104 }
87 105
88 threadpool_job* threadpool_get_job(threadpool_t *pool) { 106 threadpool_job* threadpool_get_job(threadpool_t *pool, int thread_index) {
89 pthread_mutex_lock(&pool->queue_lock); 107 struct timespec timeout;
108 clock_gettime(CLOCK_REALTIME, &timeout);
109 timeout.tv_sec += 30;
110
111 while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) {
112 log_ereport(LOG_INFORM, "threadpool_get_job: mutex timeout");
113 timeout.tv_sec += 30;
114 }
90 115
91 threadpool_job *job = NULL; 116 threadpool_job *job = NULL;
92 pool->num_idle++; 117 pool->num_idle++;
93 while(job == NULL) { 118 while(job == NULL) {
94 if(pool->queue_len == 0) { 119 if(pool->queue_len == 0) {
95 pthread_cond_wait(&pool->available, &pool->queue_lock); 120 timeout.tv_sec += 30;
121 while(pthread_cond_timedwait(&pool->available, &pool->queue_lock, &timeout)) {
122 log_ereport(LOG_DEBUG, "threadpool_get_job: cond timeout: thread: %d queue: %u", thread_index, (unsigned int)pool->queue_len);
123 timeout.tv_sec += 60;
124 }
96 continue; 125 continue;
97 } else { 126 } else {
98 pool_queue_t *q = pool->queue; 127 pool_queue_t *q = pool->queue;
99 job = q->job; 128 job = q->job;
100 pool->queue = q->next; 129 pool->queue = q->next;
101 pool->queue_len--; 130 pool->queue_len--;
102 free(q); 131 free(q);
103 } 132 }
104 } 133 }
105 pool->num_idle--; 134 pool->num_idle--;
135
136 pool->last_thread = thread_index;
137 pool->last_job = time(NULL);
106 138
107 pthread_mutex_unlock(&pool->queue_lock); 139 pthread_mutex_unlock(&pool->queue_lock);
108 return job; 140 return job;
109 } 141 }
110 142
117 149
118 threadpool_job *job = malloc(sizeof(threadpool_job)); 150 threadpool_job *job = malloc(sizeof(threadpool_job));
119 job->callback = func; 151 job->callback = func;
120 job->data = data; 152 job->data = data;
121 153
122 pthread_mutex_lock(&pool->queue_lock); 154 struct timespec timeout;
155 clock_gettime(CLOCK_REALTIME, &timeout);
156 timeout.tv_sec += 30;
157
158 while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) {
159 log_ereport(LOG_INFORM, "threadpool_run: mutex timeout");
160 timeout.tv_sec += 30;
161 }
123 threadpool_enqueue_job(pool, job); 162 threadpool_enqueue_job(pool, job);
124 163
125 int create_thread = 0; 164 int create_thread = 0;
126 int destroy_thread = 0; 165 int destroy_thread = 0;
127 int diff = pool->queue_len - pool->num_idle; 166 int diff = pool->queue_len - pool->num_idle;
144 if(destroy_thread) { 183 if(destroy_thread) {
145 threadpool_enqueue_job(pool, &kill_job); 184 threadpool_enqueue_job(pool, &kill_job);
146 pthread_cond_signal(&pool->available); 185 pthread_cond_signal(&pool->available);
147 } 186 }
148 187
188 // some diagnostics:
189 // if the queue has multiple elements, but the last job was started
190 // over a minute ago, print some diagnostic message, because
191 // this does look wrong
192 if(pool->queue_len > 5 && pool->last_job != 0) {
193 // reuse timeout sec value, because we don't need the most accurate
194 // time value here
195 time_t current = timeout.tv_sec;
196 if(pool->last_job + 60 < current) {
197 // looks like the threadpool is blocked
198 struct tm lastjob;
199 localtime_r(&pool->last_job, &lastjob);
200 log_ereport(
201 LOG_WARN,
202 "high threadpool wait time: queue: %u lastjob: %02d:%02d:%02d",
203 (unsigned int)pool->queue_len,
204 lastjob.tm_hour, lastjob.tm_min, lastjob.tm_sec);
205 }
206 }
207
149 pthread_mutex_unlock(&pool->queue_lock); 208 pthread_mutex_unlock(&pool->queue_lock);
150 } 209 }
151 210
152 void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) { 211 void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) {
153 pool_queue_t *q = malloc(sizeof(pool_queue_t)); 212 pool_queue_t *q = malloc(sizeof(pool_queue_t));

mercurial