add threadpool debug logging

2 months ago

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sun, 26 Jan 2025 23:09:43 +0100 (2 months ago)
changeset 569
70bca6190669
parent 568
3f8c587734aa
child 570
f95868a8ec37

add threadpool debug logging

src/server/public/nsapi.h file | annotate | diff | comparison | revisions
src/server/util/thrpool.c file | annotate | diff | comparison | revisions
src/server/util/thrpool.h file | annotate | diff | comparison | revisions
--- a/src/server/public/nsapi.h	Fri Jan 24 17:42:58 2025 +0100
+++ b/src/server/public/nsapi.h	Sun Jan 26 23:09:43 2025 +0100
@@ -1626,7 +1626,7 @@
 threadpool_t* threadpool_new(int min, int max);
 int threadpool_start(threadpool_t *pool);
 void* threadpool_func(void *data);
-threadpool_job* threadpool_get_job(threadpool_t *pool);
+threadpool_job* threadpool_get_job(threadpool_t *pool, int thread_index);
 void threadpool_run(threadpool_t *pool, job_callback_f func, void *data);
 
 int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event);
--- a/src/server/util/thrpool.c	Fri Jan 24 17:42:58 2025 +0100
+++ b/src/server/util/thrpool.c	Sun Jan 26 23:09:43 2025 +0100
@@ -36,7 +36,7 @@
 static threadpool_job kill_job;
 
 threadpool_t* threadpool_new(int min, int max) {
-    log_ereport(LOG_VERBOSE, "new threadpool (min: %d, max: %d)", min, max);
+    log_ereport(LOG_INFORM, "new threadpool (min: %d, max: %d)", min, max);
     threadpool_t *pool = malloc(sizeof(threadpool_t));
     pool->queue = NULL;
     pool->queue_len = 0;
@@ -44,6 +44,11 @@
     pool->min_threads = min;
     pool->max_threads = max;
     pool->num_threads = 0;
+    pool->last_job = 0;
+    pool->last_thread = -1;
+    
+    pool->threads = calloc(max, sizeof(pthread_t));
+    pool->thrstatus = calloc(max, sizeof(int));
 
     pthread_mutex_init(&pool->queue_lock, NULL);
     pthread_mutex_init(&pool->avlbl_lock, NULL);
@@ -55,8 +60,7 @@
 int threadpool_start(threadpool_t *pool) {
     /* create pool threads */
     for(int i=0;i<pool->min_threads;i++) {
-        pthread_t t;
-        if (pthread_create(&t, NULL, threadpool_func, pool) != 0) {
+        if (pthread_create(pool->threads + i, NULL, threadpool_func, pool) != 0) {
             log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno));
             return 1;
         }
@@ -67,9 +71,23 @@
 void* threadpool_func(void *data) {
     threadpool_t *pool = (threadpool_t*)data;
     
+    pthread_t thr_self = pthread_self();
+    int thr_index = -1;
+    for(int i=0;i<pool->max_threads;i++) {
+        if(pool->threads[i] == thr_self) {
+            thr_index = i;
+            break;
+        }
+    }
+    
+    if(thr_index == -1) {
+        log_ereport(LOG_CATASTROPHE, "threadpool: cannot find thread index for thread %ull\n", (unsigned long long)thr_self);
+        return NULL;
+    }
+    
     ws_atomic_inc32(&pool->num_threads);
     for(;;) {
-        threadpool_job *job = threadpool_get_job(pool);
+        threadpool_job *job = threadpool_get_job(pool, thr_index);
         if(job == &kill_job) {
             break;
         }
@@ -85,14 +103,25 @@
     return NULL;
 }
 
-threadpool_job* threadpool_get_job(threadpool_t *pool) {
-    pthread_mutex_lock(&pool->queue_lock);
+threadpool_job* threadpool_get_job(threadpool_t *pool, int thread_index) {
+    struct timespec timeout;
+    clock_gettime(CLOCK_REALTIME, &timeout);
+    timeout.tv_sec += 30;
+    
+    while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) {
+        log_ereport(LOG_INFORM, "threadpool_get_job: mutex timeout");
+        timeout.tv_sec += 30;
+    }
 
     threadpool_job *job = NULL;
     pool->num_idle++;
     while(job == NULL) {
         if(pool->queue_len == 0) {
-            pthread_cond_wait(&pool->available, &pool->queue_lock);
+            timeout.tv_sec += 30;
+            while(pthread_cond_timedwait(&pool->available, &pool->queue_lock, &timeout)) {
+                log_ereport(LOG_DEBUG, "threadpool_get_job: cond timeout: thread: %d queue: %u", thread_index, (unsigned int)pool->queue_len);
+                timeout.tv_sec += 60;
+            }
             continue;
         } else {
             pool_queue_t *q = pool->queue;
@@ -103,6 +132,9 @@
         }
     }
     pool->num_idle--;
+    
+    pool->last_thread = thread_index;
+    pool->last_job = time(NULL);
 
     pthread_mutex_unlock(&pool->queue_lock);
     return job;
@@ -119,7 +151,14 @@
     job->callback = func;
     job->data = data;
 
-    pthread_mutex_lock(&pool->queue_lock);
+    struct timespec timeout;
+    clock_gettime(CLOCK_REALTIME, &timeout);
+    timeout.tv_sec += 30;
+    
+    while(pthread_mutex_timedlock(&pool->queue_lock, &timeout)) {
+        log_ereport(LOG_INFORM, "threadpool_run: mutex timeout");
+        timeout.tv_sec += 30;
+    }
     threadpool_enqueue_job(pool, job);
 
     int create_thread = 0;
@@ -146,6 +185,26 @@
         pthread_cond_signal(&pool->available);
     }
     
+    // some diagnostics:
+    // if the queue has multiple elements, but the last job was started
+    // over a minute ago, print some diagnostic message, because
+    // this does look wrong
+    if(pool->queue_len > 5 && pool->last_job != 0) {
+        // reuse timeout sec value, because we don't need the most accurate
+        // time value here
+        time_t current = timeout.tv_sec;
+        if(pool->last_job + 60 < current) {
+            // looks like the threadpool is blocked
+            struct tm lastjob;
+            localtime_r(&pool->last_job, &lastjob);
+            log_ereport(
+                    LOG_WARN,
+                    "high threadpool wait time: queue: %u lastjob: %02d:%02d:%02d",
+                    (unsigned int)pool->queue_len,
+                    lastjob.tm_hour, lastjob.tm_min, lastjob.tm_sec);
+        }
+    }
+    
     pthread_mutex_unlock(&pool->queue_lock);
 }
 
--- a/src/server/util/thrpool.h	Fri Jan 24 17:42:58 2025 +0100
+++ b/src/server/util/thrpool.h	Sun Jan 26 23:09:43 2025 +0100
@@ -31,6 +31,7 @@
 
 #include "../public/nsapi.h"
 #include <pthread.h>
+#include <time.h>
 
 #ifdef	__cplusplus
 extern "C" {
@@ -47,6 +48,10 @@
     uint32_t        num_threads;
     int             min_threads;
     int             max_threads;
+    pthread_t       *threads;
+    int             *thrstatus;
+    time_t          last_job;
+    int             last_thread;
 };
 
 struct _threadpool_job {

mercurial