# HG changeset patch # User Olaf Wintermann # Date 1369598741 -7200 # Node ID 50505dc3f8a68e587d5c25dacbc3f7a981729ef0 # Parent 74babc0082b710590a89c8f140097c4c922bf117 dynamic thread pool diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/daemon/auth.c --- a/src/server/daemon/auth.c Sun May 26 12:12:07 2013 +0200 +++ b/src/server/daemon/auth.c Sun May 26 22:05:41 2013 +0200 @@ -70,6 +70,7 @@ time_t now = time(NULL); size_t slot = mapkey.hash%cache.size; + User *u = NULL; pthread_mutex_lock(&auth_cache_mutex); UserCacheElm *elm = cache.map[slot]; @@ -80,44 +81,36 @@ if(elm) { // compare the key data to be sure it is the correct user int n = (mapkey.len > elm->key.len) ? elm->key.len : mapkey.len; - if (memcmp(elm->key.data, mapkey.data, n)) { - free(key); - pthread_mutex_unlock(&auth_cache_mutex); - return NULL; + if (!memcmp(elm->key.data, mapkey.data, n)) { + // elm is now the correct UserCacheElm + // TODO: use configuration for expire time + if(now - elm->created > 120) { + // cached user expired + // remove all users from the list from the first to this one + UserCacheElm *e = cache.head; + while(e) { + if(e == elm) { + break; + } + UserCacheElm *n = e->next_user; + auth_cache_remove_from_map(e); + e = n; + } + cache.head = elm->next_user; + if(cache.trail == elm) { + cache.trail = NULL; + } + auth_cache_remove_from_map(elm); + u = NULL; + } else { + u = (User*)elm->user; + } } - } else { - free(key); - pthread_mutex_unlock(&auth_cache_mutex); - return NULL; - } - - // elm is now the correct UserCacheElm - // TODO: use configuration for expire time - if(now - elm->created > 120) { - // cached user expired - // remove all users from the list from the first to this one - UserCacheElm *e = cache.head; - while(e) { - if(e == elm) { - break; - } - UserCacheElm *n = e->next_user; - auth_cache_remove_from_map(e); - e = n; - } - cache.head = elm->next_user; - if(cache.trail == elm) { - cache.trail = NULL; - } - auth_cache_remove_from_map(elm); - free(key); - pthread_mutex_unlock(&auth_cache_mutex); - return NULL; } pthread_mutex_unlock(&auth_cache_mutex); free(key); - return (User*)elm->user; + return u; } void auth_cache_add( diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/daemon/protocol.c --- a/src/server/daemon/protocol.c Sun May 26 12:12:07 2013 +0200 +++ b/src/server/daemon/protocol.c Sun May 26 22:05:41 2013 +0200 @@ -285,9 +285,9 @@ } // set socket blocking - int flags; - flags = fcntl(fd, F_GETFL, 0); - fcntl(fd, F_SETFL, flags ^ O_NONBLOCK); + //int flags; + //flags = fcntl(fd, F_GETFL, 0); + //fcntl(fd, F_SETFL, flags ^ O_NONBLOCK); // output buffer sbuf_t *out = sbuf_new(512); diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/daemon/sessionhandler.c --- a/src/server/daemon/sessionhandler.c Sun May 26 12:12:07 2013 +0200 +++ b/src/server/daemon/sessionhandler.c Sun May 26 22:05:41 2013 +0200 @@ -45,7 +45,7 @@ SessionHandler* create_basic_session_handler() { BasicSessionHandler *handler = malloc(sizeof(BasicSessionHandler)); - handler->threadpool = threadpool_new(8); + handler->threadpool = threadpool_new(4, 8); handler->sh.enqueue_connection = basic_enq_conn; @@ -239,7 +239,7 @@ EventHttpIO *io = event->cookie; HttpParser *parser = io->parser; HTTPRequest *request = io->request; - + int r = handle_request(request, NULL); if(r != 0) { // TODO: error message diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/daemon/threadpools.c --- a/src/server/daemon/threadpools.c Sun May 26 12:12:07 2013 +0200 +++ b/src/server/daemon/threadpools.c Sun May 26 22:05:41 2013 +0200 @@ -53,7 +53,7 @@ /* TODO: reconfig thread pool */ return 0; } else { - threadpool_t *tp = threadpool_new(cfg->min_threads); + threadpool_t *tp = threadpool_new(cfg->min_threads, cfg->max_threads); int ret = ucx_map_sstr_put(thread_pool_map, name, tp); diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/daemon/vfs.c --- a/src/server/daemon/vfs.c Sun May 26 12:12:07 2013 +0200 +++ b/src/server/daemon/vfs.c Sun May 26 22:05:41 2013 +0200 @@ -411,10 +411,10 @@ } int sys_acl_check(VFSContext *ctx, uint32_t access_mask, SysACL *sysacl) { + if(sysacl) { + sysacl->acl = NULL; + } if(!ctx) { - if(sysacl) { - sysacl->acl = NULL; - } return 0; } diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/public/nsapi.h --- a/src/server/public/nsapi.h Sun May 26 12:12:07 2013 +0200 +++ b/src/server/public/nsapi.h Sun May 26 22:05:41 2013 +0200 @@ -1350,8 +1350,6 @@ -/* new macro and function definitions begin */ - /* netbuf functions */ NSAPI_PUBLIC netbuf *netbuf_open(SYS_NETFD sd, int sz); @@ -1379,12 +1377,14 @@ NSAPI_PUBLIC off_t system_lseek(SYS_FILE fd, off_t offset, int whence); NSAPI_PUBLIC int system_fclose(SYS_FILE fd); +/* new macro and function definitions begin */ + NSAPI_PUBLIC int util_errno2status(int errno_value); // new #define util_errno2status util_errno2status // threadpool -threadpool_t* threadpool_new(int n); +threadpool_t* threadpool_new(int min, int max); void* threadpool_func(void *data); threadpool_job* threadpool_get_job(threadpool_t *pool); void threadpool_run(threadpool_t *pool, job_callback_f func, void *data); diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/ucx/map.h --- a/src/server/ucx/map.h Sun May 26 12:12:07 2013 +0200 +++ b/src/server/ucx/map.h Sun May 26 22:05:41 2013 +0200 @@ -61,7 +61,7 @@ }; struct UcxKey { - char *data; + void *data; size_t len; int hash; }; diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/util/thrpool.c --- a/src/server/util/thrpool.c Sun May 26 12:12:07 2013 +0200 +++ b/src/server/util/thrpool.c Sun May 26 22:05:41 2013 +0200 @@ -29,21 +29,28 @@ #include #include #include + +#include "atomic.h" #include "thrpool.h" - +static threadpool_job kill_job; -threadpool_t* threadpool_new(int n) { +threadpool_t* threadpool_new(int min, int max) { + printf("threadpool(%d, %d)\n", min, max); threadpool_t *pool = malloc(sizeof(threadpool_t)); pool->queue = NULL; pool->queue_len = 0; + pool->num_idle = 0; + pool->min_threads = min; + pool->max_threads = max; + pool->num_threads = 0; pthread_mutex_init(&pool->queue_lock, NULL); pthread_mutex_init(&pool->avlbl_lock, NULL); pthread_cond_init(&pool->available, NULL); /* create pool threads */ - for(int i=0;inum_threads); for(;;) { threadpool_job *job = threadpool_get_job(pool); - if(job == NULL) { + if(job == &kill_job) { break; } @@ -67,6 +75,7 @@ free(job); } + ws_atomic_dec32(&pool->num_threads); return NULL; } @@ -74,6 +83,7 @@ pthread_mutex_lock(&pool->queue_lock); threadpool_job *job = NULL; + pool->num_idle++; while(job == NULL) { if(pool->queue_len == 0) { pthread_cond_wait(&pool->available, &pool->queue_lock); @@ -86,9 +96,9 @@ free(q); } } + pool->num_idle--; pthread_mutex_unlock(&pool->queue_lock); - return job; } @@ -97,11 +107,41 @@ job->callback = func; job->data = data; + pthread_mutex_lock(&pool->queue_lock); + threadpool_enqueue_job(pool, job); + + int create_thread = 0; + int destroy_thread = 0; + int diff = pool->queue_len - pool->num_idle; + if(diff > 0 && pool->num_threads < pool->max_threads) { + create_thread = 1; + } else if(diff < 0 && pool->num_threads > pool->min_threads) { + destroy_thread = 1; + } + + //if(pool->queue_len == 1) { + pthread_cond_signal(&pool->available); + //} + + if(create_thread) { + pthread_t t; + if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { + perror("Error: threadpool_run: pthread_create"); + } + } + if(destroy_thread) { + threadpool_enqueue_job(pool, &kill_job); + pthread_cond_signal(&pool->available); + } + + pthread_mutex_unlock(&pool->queue_lock); +} + +void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) { pool_queue_t *q = malloc(sizeof(pool_queue_t)); q->job = job; q->next = NULL; - - pthread_mutex_lock(&pool->queue_lock); + if(pool->queue == NULL) { pool->queue = q; } else { @@ -112,11 +152,4 @@ last_elem->next = q; } pool->queue_len++; - - if(pool->queue_len == 1) { - pthread_cond_signal(&pool->available); - } - - pthread_mutex_unlock(&pool->queue_lock); - } diff -r 74babc0082b7 -r 50505dc3f8a6 src/server/util/thrpool.h --- a/src/server/util/thrpool.h Sun May 26 12:12:07 2013 +0200 +++ b/src/server/util/thrpool.h Sun May 26 22:05:41 2013 +0200 @@ -42,7 +42,11 @@ pthread_mutex_t avlbl_lock; pthread_cond_t available; pool_queue_t *queue; - int queue_len; + uint32_t queue_len; + uint32_t num_idle; + uint32_t num_threads; + int min_threads; + int max_threads; }; struct _threadpool_job { @@ -55,6 +59,8 @@ pool_queue_t *next; }; +void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job); + #ifdef __cplusplus } #endif