dynamic thread pool

Sun, 26 May 2013 22:05:41 +0200

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sun, 26 May 2013 22:05:41 +0200
changeset 67
50505dc3f8a6
parent 66
74babc0082b7
child 68
f5102a892ed4

dynamic thread pool

src/server/daemon/auth.c file | annotate | diff | comparison | revisions
src/server/daemon/protocol.c file | annotate | diff | comparison | revisions
src/server/daemon/sessionhandler.c file | annotate | diff | comparison | revisions
src/server/daemon/threadpools.c file | annotate | diff | comparison | revisions
src/server/daemon/vfs.c file | annotate | diff | comparison | revisions
src/server/public/nsapi.h file | annotate | diff | comparison | revisions
src/server/ucx/map.h file | annotate | diff | comparison | revisions
src/server/util/thrpool.c file | annotate | diff | comparison | revisions
src/server/util/thrpool.h file | annotate | diff | comparison | revisions
--- a/src/server/daemon/auth.c	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/daemon/auth.c	Sun May 26 22:05:41 2013 +0200
@@ -70,6 +70,7 @@
     time_t now = time(NULL);
     size_t slot = mapkey.hash%cache.size;
     
+    User *u = NULL;
     pthread_mutex_lock(&auth_cache_mutex);
     
     UserCacheElm *elm = cache.map[slot];
@@ -80,44 +81,36 @@
     if(elm) {
         // compare the key data to be sure it is the correct user
         int n = (mapkey.len > elm->key.len) ? elm->key.len : mapkey.len;
-        if (memcmp(elm->key.data, mapkey.data, n)) {
-            free(key);
-            pthread_mutex_unlock(&auth_cache_mutex);
-            return NULL;
+        if (!memcmp(elm->key.data, mapkey.data, n)) {
+            // elm is now the correct UserCacheElm
+            // TODO: use configuration for expire time
+            if(now - elm->created > 120) {
+                // cached user expired
+                // remove all users from the list from the first to this one
+                UserCacheElm *e = cache.head;
+                while(e) {
+                    if(e == elm) {
+                        break;
+                    }
+                    UserCacheElm *n = e->next_user;
+                    auth_cache_remove_from_map(e);
+                    e = n;
+                }
+                cache.head = elm->next_user;
+                if(cache.trail == elm) {
+                    cache.trail = NULL;
+                }
+                auth_cache_remove_from_map(elm);
+                u = NULL;
+            } else {
+                u = (User*)elm->user;
+            }
         }
-    } else {
-        free(key);
-        pthread_mutex_unlock(&auth_cache_mutex);
-        return NULL;
-    }
-    
-    // elm is now the correct UserCacheElm
-    // TODO: use configuration for expire time
-    if(now - elm->created > 120) {
-        // cached user expired
-        // remove all users from the list from the first to this one
-        UserCacheElm *e = cache.head;
-        while(e) {
-            if(e == elm) {
-                break;
-            }
-            UserCacheElm *n = e->next_user;
-            auth_cache_remove_from_map(e);
-            e = n;
-        }
-        cache.head = elm->next_user;
-        if(cache.trail == elm) {
-            cache.trail = NULL;
-        }
-        auth_cache_remove_from_map(elm);
-        free(key);
-        pthread_mutex_unlock(&auth_cache_mutex);
-        return NULL;
     }
     
     pthread_mutex_unlock(&auth_cache_mutex);
     free(key);
-    return (User*)elm->user;
+    return u;
 }
 
 void auth_cache_add(
--- a/src/server/daemon/protocol.c	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/daemon/protocol.c	Sun May 26 22:05:41 2013 +0200
@@ -285,9 +285,9 @@
     }
 
     // set socket blocking
-    int flags;
-    flags = fcntl(fd, F_GETFL, 0);
-    fcntl(fd, F_SETFL, flags ^ O_NONBLOCK);
+    //int flags;
+    //flags = fcntl(fd, F_GETFL, 0);
+    //fcntl(fd, F_SETFL, flags ^ O_NONBLOCK);
 
     // output buffer
     sbuf_t *out = sbuf_new(512);
--- a/src/server/daemon/sessionhandler.c	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/daemon/sessionhandler.c	Sun May 26 22:05:41 2013 +0200
@@ -45,7 +45,7 @@
 
 SessionHandler* create_basic_session_handler() {
     BasicSessionHandler *handler = malloc(sizeof(BasicSessionHandler));
-    handler->threadpool = threadpool_new(8);
+    handler->threadpool = threadpool_new(4, 8);
     handler->sh.enqueue_connection = basic_enq_conn;
 
 
@@ -239,7 +239,7 @@
     EventHttpIO *io = event->cookie;
     HttpParser  *parser  = io->parser;
     HTTPRequest *request = io->request;
-    
+      
     int r = handle_request(request, NULL);
     if(r != 0) {
         // TODO: error message
--- a/src/server/daemon/threadpools.c	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/daemon/threadpools.c	Sun May 26 22:05:41 2013 +0200
@@ -53,7 +53,7 @@
         /* TODO: reconfig thread pool */
         return 0;
     } else {
-        threadpool_t *tp = threadpool_new(cfg->min_threads);
+        threadpool_t *tp = threadpool_new(cfg->min_threads, cfg->max_threads);
         
         int ret = ucx_map_sstr_put(thread_pool_map, name, tp);
         
--- a/src/server/daemon/vfs.c	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/daemon/vfs.c	Sun May 26 22:05:41 2013 +0200
@@ -411,10 +411,10 @@
 }
 
 int sys_acl_check(VFSContext *ctx, uint32_t access_mask, SysACL *sysacl) {
+    if(sysacl) {
+        sysacl->acl = NULL;
+    }
     if(!ctx) {
-        if(sysacl) {
-            sysacl->acl = NULL;
-        }
         return 0;
     }
     
--- a/src/server/public/nsapi.h	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/public/nsapi.h	Sun May 26 22:05:41 2013 +0200
@@ -1350,8 +1350,6 @@
 
 
 
-/* new macro and function definitions begin */
-
 /* netbuf functions */
 NSAPI_PUBLIC netbuf *netbuf_open(SYS_NETFD sd, int sz);
 
@@ -1379,12 +1377,14 @@
 NSAPI_PUBLIC off_t system_lseek(SYS_FILE fd, off_t offset, int whence);
 NSAPI_PUBLIC int system_fclose(SYS_FILE fd);
 
+/* new macro and function definitions begin */
+
 NSAPI_PUBLIC int util_errno2status(int errno_value); // new
 #define util_errno2status util_errno2status
 
 
 // threadpool
-threadpool_t* threadpool_new(int n);
+threadpool_t* threadpool_new(int min, int max);
 void* threadpool_func(void *data);
 threadpool_job* threadpool_get_job(threadpool_t *pool);
 void threadpool_run(threadpool_t *pool, job_callback_f func, void *data);
--- a/src/server/ucx/map.h	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/ucx/map.h	Sun May 26 22:05:41 2013 +0200
@@ -61,7 +61,7 @@
 };
 
 struct UcxKey {
-    char   *data;
+    void   *data;
     size_t len;
     int    hash;
 };
--- a/src/server/util/thrpool.c	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/util/thrpool.c	Sun May 26 22:05:41 2013 +0200
@@ -29,21 +29,28 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+
+#include "atomic.h"
 #include "thrpool.h"
 
-
+static threadpool_job kill_job;
 
-threadpool_t* threadpool_new(int n) {
+threadpool_t* threadpool_new(int min, int max) {
+    printf("threadpool(%d, %d)\n", min, max);
     threadpool_t *pool = malloc(sizeof(threadpool_t));
     pool->queue = NULL;
     pool->queue_len = 0;
+    pool->num_idle = 0;
+    pool->min_threads = min;
+    pool->max_threads = max;
+    pool->num_threads = 0;
 
     pthread_mutex_init(&pool->queue_lock, NULL);
     pthread_mutex_init(&pool->avlbl_lock, NULL);
     pthread_cond_init(&pool->available, NULL);
 
     /* create pool threads */
-    for(int i=0;i<n;i++) {
+    for(int i=0;i<min;i++) {
         pthread_t t;
         if (pthread_create(&t, NULL, threadpool_func, pool) != 0) {
             perror("Error: threadpool_new: pthread_create");
@@ -56,10 +63,11 @@
 
 void* threadpool_func(void *data) {
     threadpool_t *pool = (threadpool_t*)data;
-
+    
+    ws_atomic_inc32(&pool->num_threads);
     for(;;) {
         threadpool_job *job = threadpool_get_job(pool);
-        if(job == NULL) {
+        if(job == &kill_job) {
             break;
         }
 
@@ -67,6 +75,7 @@
 
         free(job);
     }
+    ws_atomic_dec32(&pool->num_threads);
     return NULL;
 }
 
@@ -74,6 +83,7 @@
     pthread_mutex_lock(&pool->queue_lock);
 
     threadpool_job *job = NULL;
+    pool->num_idle++;
     while(job == NULL) {
         if(pool->queue_len == 0) {
             pthread_cond_wait(&pool->available, &pool->queue_lock);
@@ -86,9 +96,9 @@
             free(q);
         }
     }
+    pool->num_idle--;
 
     pthread_mutex_unlock(&pool->queue_lock);
-
     return job;
 }
 
@@ -97,11 +107,41 @@
     job->callback = func;
     job->data = data;
 
+    pthread_mutex_lock(&pool->queue_lock);
+    threadpool_enqueue_job(pool, job);
+
+    int create_thread = 0;
+    int destroy_thread = 0;
+    int diff = pool->queue_len - pool->num_idle;
+    if(diff > 0 && pool->num_threads < pool->max_threads) {
+        create_thread = 1;
+    } else if(diff < 0 && pool->num_threads > pool->min_threads) {
+        destroy_thread = 1;
+    }
+    
+    //if(pool->queue_len == 1) {
+    pthread_cond_signal(&pool->available);
+    //}
+    
+    if(create_thread) {
+        pthread_t t;
+        if (pthread_create(&t, NULL, threadpool_func, pool) != 0) {
+            perror("Error: threadpool_run: pthread_create");
+        }
+    }
+    if(destroy_thread) {
+        threadpool_enqueue_job(pool, &kill_job);
+        pthread_cond_signal(&pool->available);
+    }
+    
+    pthread_mutex_unlock(&pool->queue_lock);
+}
+
+void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) {
     pool_queue_t *q = malloc(sizeof(pool_queue_t));
     q->job = job;
     q->next = NULL;
-
-    pthread_mutex_lock(&pool->queue_lock);
+    
     if(pool->queue == NULL) {
         pool->queue = q;
     } else {
@@ -112,11 +152,4 @@
         last_elem->next = q;
     }
     pool->queue_len++;
-
-    if(pool->queue_len == 1) {
-        pthread_cond_signal(&pool->available);
-    }
-
-    pthread_mutex_unlock(&pool->queue_lock);
-
 }
--- a/src/server/util/thrpool.h	Sun May 26 12:12:07 2013 +0200
+++ b/src/server/util/thrpool.h	Sun May 26 22:05:41 2013 +0200
@@ -42,7 +42,11 @@
     pthread_mutex_t avlbl_lock;
     pthread_cond_t  available;
     pool_queue_t    *queue;
-    int             queue_len;
+    uint32_t        queue_len;
+    uint32_t        num_idle;
+    uint32_t        num_threads;
+    int             min_threads;
+    int             max_threads;
 };
 
 struct _threadpool_job {
@@ -55,6 +59,8 @@
     pool_queue_t     *next;
 };
 
+void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job);
+
 #ifdef	__cplusplus
 }
 #endif

mercurial