improve threadpool_shutdown cleanup

8 weeks ago

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Wed, 29 Jan 2025 20:40:19 +0100 (8 weeks ago)
changeset 570
f95868a8ec37
parent 569
70bca6190669
child 571
0fa595aefd0f

improve threadpool_shutdown cleanup

src/server/daemon/threadpools.c file | annotate | diff | comparison | revisions
src/server/daemon/threadpools.h file | annotate | diff | comparison | revisions
src/server/daemon/webserver.c file | annotate | diff | comparison | revisions
src/server/util/thrpool.c file | annotate | diff | comparison | revisions
src/server/util/thrpool.h file | annotate | diff | comparison | revisions
--- a/src/server/daemon/threadpools.c	Sun Jan 26 23:09:43 2025 +0100
+++ b/src/server/daemon/threadpools.c	Wed Jan 29 20:40:19 2025 +0100
@@ -148,14 +148,14 @@
 }
 
 
-void shutdown_threadpools(void) {
+void shutdown_threadpools(int timeout) {
     log_ereport(LOG_INFORM, "shutdown threadpools");
     CxIterator i = cxMapIteratorValues(thread_pool_map);
     cx_foreach(threadpool_t*, tp, i) {
-        threadpool_shutdown(tp);
+        threadpool_shutdown(tp, timeout);
     }
     i = cxMapIteratorValues(io_pool_map);
     cx_foreach(threadpool_t*, tp, i) {
-        threadpool_shutdown(tp);
+        threadpool_shutdown(tp, timeout);
     }
 }
--- a/src/server/daemon/threadpools.h	Sun Jan 26 23:09:43 2025 +0100
+++ b/src/server/daemon/threadpools.h	Wed Jan 29 20:40:19 2025 +0100
@@ -54,7 +54,7 @@
 threadpool_t* get_default_iopool();
 threadpool_t* get_iopool(cxstring name);
 
-void shutdown_threadpools(void);
+void shutdown_threadpools(int timeout);
 
 #ifdef	__cplusplus
 }
--- a/src/server/daemon/webserver.c	Sun Jan 26 23:09:43 2025 +0100
+++ b/src/server/daemon/webserver.c	Wed Jan 29 20:40:19 2025 +0100
@@ -290,7 +290,7 @@
         re = re->next;
     }
     
-    shutdown_threadpools();
+    shutdown_threadpools(60);
     
     shutdown_eventhandlers_wait();
     
--- a/src/server/util/thrpool.c	Sun Jan 26 23:09:43 2025 +0100
+++ b/src/server/util/thrpool.c	Wed Jan 29 20:40:19 2025 +0100
@@ -60,9 +60,12 @@
 int threadpool_start(threadpool_t *pool) {
     /* create pool threads */
     for(int i=0;i<pool->min_threads;i++) {
-        if (pthread_create(pool->threads + i, NULL, threadpool_func, pool) != 0) {
+        if (pthread_create(&pool->threads[i], NULL, threadpool_func, pool) != 0) {
             log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno));
             return 1;
+        } else {
+            log_ereport(LOG_DEBUG, "thread started: %lu", (unsigned long)pool->threads[i]);
+            pthread_detach(pool->threads[i]);
         }
     }
     return 0;
@@ -75,7 +78,7 @@
     int thr_index = -1;
     for(int i=0;i<pool->max_threads;i++) {
         if(pool->threads[i] == thr_self) {
-            thr_index = i;
+            thr_index = i; // TODO: this is stupid, trasfer the thread index per data
             break;
         }
     }
@@ -225,12 +228,45 @@
     pool->queue_len++;
 }
 
-void threadpool_shutdown(threadpool_t *pool) {
+void threadpool_shutdown(threadpool_t *pool, int timeout) {
+    struct timespec ts;
+    
     int nthreads = pool->max_threads;
     for(int i=0;i<nthreads;i++) {
-        pthread_mutex_lock(&pool->queue_lock);
+        clock_gettime(CLOCK_REALTIME, &ts);
+        ts.tv_sec += timeout;
+        if(pthread_mutex_timedlock(&pool->queue_lock, &ts)) {
+            log_ereport(LOG_FAILURE, "failed to shutdown threadpool: timeout");
+            return;
+        }
         threadpool_enqueue_job(pool, &kill_job);
         pthread_cond_signal(&pool->available);
         pthread_mutex_unlock(&pool->queue_lock);
     }
+    
+    // not the nicest way to wait for threads to shutdown, but it is very
+    // simple and good enough for the webserver shutdown
+    sleep(1);
+    // check if all threads are closed
+    time_t t = time(NULL);
+    time_t end = t + timeout;
+    int i = 0;
+    while(t < end || i < 2) {
+        uint32_t num_threads = pool->num_threads;
+        if(num_threads == 0) {
+            break;
+        }
+        
+        log_ereport(LOG_VERBOSE, "threadpool_shutdown: wait for thread shutdown: %u threads still running", (unsigned int)num_threads);
+        sleep(5);
+        i++;
+        t = time(NULL);
+    }
+    
+    if(pool->num_threads) {
+        log_ereport(LOG_VERBOSE, "threadpool_shutdown successful");
+    } else if(t > end) {
+        log_ereport(LOG_WARN, "threadpool_shutdown: timeout");
+    }
 }
+
--- a/src/server/util/thrpool.h	Sun Jan 26 23:09:43 2025 +0100
+++ b/src/server/util/thrpool.h	Wed Jan 29 20:40:19 2025 +0100
@@ -66,7 +66,7 @@
 
 void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job);
 
-void threadpool_shutdown(threadpool_t *pool);
+void threadpool_shutdown(threadpool_t *pool, int timeout);
 
 #ifdef	__cplusplus
 }

mercurial