2 months ago
add threadpool debug logging
--- a/src/server/public/nsapi.h Fri Jan 24 17:42:58 2025 +0100 +++ b/src/server/public/nsapi.h Sun Jan 26 23:09:43 2025 +0100 @@ -1626,7 +1626,7 @@ threadpool_t* threadpool_new(int min, int max); int threadpool_start(threadpool_t *pool); void* threadpool_func(void *data); -threadpool_job* threadpool_get_job(threadpool_t *pool); +threadpool_job* threadpool_get_job(threadpool_t *pool, int thread_index); void threadpool_run(threadpool_t *pool, job_callback_f func, void *data); int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event);
--- a/src/server/util/thrpool.c Fri Jan 24 17:42:58 2025 +0100 +++ b/src/server/util/thrpool.c Sun Jan 26 23:09:43 2025 +0100 @@ -36,7 +36,7 @@ static threadpool_job kill_job; threadpool_t* threadpool_new(int min, int max) { - log_ereport(LOG_VERBOSE, "new threadpool (min: %d, max: %d)", min, max); + log_ereport(LOG_INFORM, "new threadpool (min: %d, max: %d)", min, max); threadpool_t *pool = malloc(sizeof(threadpool_t)); pool->queue = NULL; pool->queue_len = 0; @@ -44,6 +44,11 @@ pool->min_threads = min; pool->max_threads = max; pool->num_threads = 0; + pool->last_job = 0; + pool->last_thread = -1; + + pool->threads = calloc(max, sizeof(pthread_t)); + pool->thrstatus = calloc(max, sizeof(int)); pthread_mutex_init(&pool->queue_lock, NULL); pthread_mutex_init(&pool->avlbl_lock, NULL); @@ -55,8 +60,7 @@ int threadpool_start(threadpool_t *pool) { /* create pool threads */ for(int i=0;i<pool->min_threads;i++) { - pthread_t t; - if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { + if (pthread_create(pool->threads + i, NULL, threadpool_func, pool) != 0) { log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno)); return 1; } @@ -67,9 +71,23 @@ void* threadpool_func(void *data) { threadpool_t *pool = (threadpool_t*)data; + pthread_t thr_self = pthread_self(); + int thr_index = -1; + for(int i=0;i<pool->max_threads;i++) { + if(pool->threads[i] == thr_self) { + thr_index = i; + break; + } + } + + if(thr_index == -1) { + log_ereport(LOG_CATASTROPHE, "threadpool: cannot find thread index for thread %ull\n", (unsigned long long)thr_self); + return NULL; + } + ws_atomic_inc32(&pool->num_threads); for(;;) { - threadpool_job *job = threadpool_get_job(pool); + threadpool_job *job = threadpool_get_job(pool, thr_index); if(job == &kill_job) { break; } @@ -85,14 +103,25 @@ return NULL; } -threadpool_job* threadpool_get_job(threadpool_t *pool) { - pthread_mutex_lock(&pool->queue_lock); +threadpool_job* threadpool_get_job(threadpool_t *pool, int thread_index) { + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec += 30; + + while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) { + log_ereport(LOG_INFORM, "threadpool_get_job: mutex timeout"); + timeout.tv_sec += 30; + } threadpool_job *job = NULL; pool->num_idle++; while(job == NULL) { if(pool->queue_len == 0) { - pthread_cond_wait(&pool->available, &pool->queue_lock); + timeout.tv_sec += 30; + while(pthread_cond_timedwait(&pool->available, &pool->queue_lock, &timeout)) { + log_ereport(LOG_DEBUG, "threadpool_get_job: cond timeout: thread: %d queue: %u", thread_index, (unsigned int)pool->queue_len); + timeout.tv_sec += 60; + } continue; } else { pool_queue_t *q = pool->queue; @@ -103,6 +132,9 @@ } } pool->num_idle--; + + pool->last_thread = thread_index; + pool->last_job = time(NULL); pthread_mutex_unlock(&pool->queue_lock); return job; @@ -119,7 +151,14 @@ job->callback = func; job->data = data; - pthread_mutex_lock(&pool->queue_lock); + struct timespec timeout; + clock_gettime(CLOCK_REALTIME, &timeout); + timeout.tv_sec += 30; + + while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) { + log_ereport(LOG_INFORM, "threadpool_run: mutex timeout"); + timeout.tv_sec += 30; + } threadpool_enqueue_job(pool, job); int create_thread = 0; @@ -146,6 +185,26 @@ pthread_cond_signal(&pool->available); } + // some diagnostics: + // if the queue has multiple elements, but the last job was started + // over a minute ago, print some diagnostic message, because + // this does look wrong + if(pool->queue_len > 5 && pool->last_job != 0) { + // reuse timeout sec value, because we don't need the most accurate + // time value here + time_t current = timeout.tv_sec; + if(pool->last_job + 60 < current) { + // looks like the threadpool is blocked + struct tm lastjob; + localtime_r(&pool->last_job, &lastjob); + log_ereport( + LOG_WARN, + "high threadpool wait time: queue: %u lastjob: %02d:%02d:%02d", + (unsigned int)pool->queue_len, + lastjob.tm_hour, lastjob.tm_min, lastjob.tm_sec); + } + } + pthread_mutex_unlock(&pool->queue_lock); }
--- a/src/server/util/thrpool.h Fri Jan 24 17:42:58 2025 +0100 +++ b/src/server/util/thrpool.h Sun Jan 26 23:09:43 2025 +0100 @@ -31,6 +31,7 @@ #include "../public/nsapi.h" #include <pthread.h> +#include <time.h> #ifdef __cplusplus extern "C" { @@ -47,6 +48,10 @@ uint32_t num_threads; int min_threads; int max_threads; + pthread_t *threads; + int *thrstatus; + time_t last_job; + int last_thread; }; struct _threadpool_job {