UNIXworkcode

1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 3 * 4 * Copyright 2013 Olaf Wintermann. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include <stdio.h> 30 #include <stdlib.h> 31 #include <unistd.h> 32 33 #include "atomic.h" 34 #include "thrpool.h" 35 36 static threadpool_job kill_job; 37 38 threadpool_t* threadpool_new(int min, int max) { 39 log_ereport(LOG_VERBOSE, "new threadpool (min: %d, max: %d)", min, max); 40 threadpool_t *pool = malloc(sizeof(threadpool_t)); 41 pool->queue = NULL; 42 pool->queue_len = 0; 43 pool->num_idle = 0; 44 pool->min_threads = min; 45 pool->max_threads = max; 46 pool->num_threads = 0; 47 48 pthread_mutex_init(&pool->queue_lock, NULL); 49 pthread_mutex_init(&pool->avlbl_lock, NULL); 50 pthread_cond_init(&pool->available, NULL); 51 52 return pool; 53 } 54 55 int threadpool_start(threadpool_t *pool) { 56 /* create pool threads */ 57 for(int i=0;i<pool->min_threads;i++) { 58 pthread_t t; 59 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { 60 log_ereport(LOG_FAILURE, "threadpool_start: pthread_create failed: %s", strerror(errno)); 61 return 1; 62 } 63 } 64 return 0; 65 } 66 67 void* threadpool_func(void *data) { 68 threadpool_t *pool = (threadpool_t*)data; 69 70 ws_atomic_inc32(&pool->num_threads); 71 for(;;) { 72 threadpool_job *job = threadpool_get_job(pool); 73 if(job == &kill_job) { 74 break; 75 } 76 77 job->callback(job->data); 78 79 free(job); 80 } 81 uint32_t nthreads = ws_atomic_dec32(&pool->num_threads); 82 if(nthreads == 0) { 83 log_ereport(LOG_INFORM, "threadpool closed"); // TODO: log threadpool name 84 } 85 return NULL; 86 } 87 88 threadpool_job* threadpool_get_job(threadpool_t *pool) { 89 pthread_mutex_lock(&pool->queue_lock); 90 91 threadpool_job *job = NULL; 92 pool->num_idle++; 93 while(job == NULL) { 94 if(pool->queue_len == 0) { 95 pthread_cond_wait(&pool->available, &pool->queue_lock); 96 continue; 97 } else { 98 pool_queue_t *q = pool->queue; 99 job = q->job; 100 pool->queue = q->next; 101 pool->queue_len--; 102 free(q); 103 } 104 } 105 pool->num_idle--; 106 107 pthread_mutex_unlock(&pool->queue_lock); 108 return job; 109 } 110 111 void threadpool_run(threadpool_t *pool, job_callback_f func, void *data) { 112 // TODO: handle errors 113 114 if(pool->num_threads == 0) { 115 threadpool_start(pool); 116 } 117 118 threadpool_job *job = malloc(sizeof(threadpool_job)); 119 job->callback = func; 120 job->data = data; 121 122 pthread_mutex_lock(&pool->queue_lock); 123 threadpool_enqueue_job(pool, job); 124 125 int create_thread = 0; 126 int destroy_thread = 0; 127 int diff = pool->queue_len - pool->num_idle; 128 if(diff > 0 && pool->num_threads < pool->max_threads) { 129 create_thread = 1; 130 } else if(diff < 0 && pool->num_threads > pool->min_threads) { 131 destroy_thread = 1; 132 } 133 134 //if(pool->queue_len == 1) { 135 pthread_cond_signal(&pool->available); 136 //} 137 138 if(create_thread) { 139 pthread_t t; 140 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { 141 log_ereport(LOG_FAILURE, "threadpool_run: pthread_create failed: %s", strerror(errno)); 142 } 143 } 144 if(destroy_thread) { 145 threadpool_enqueue_job(pool, &kill_job); 146 pthread_cond_signal(&pool->available); 147 } 148 149 pthread_mutex_unlock(&pool->queue_lock); 150 } 151 152 void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) { 153 pool_queue_t *q = malloc(sizeof(pool_queue_t)); 154 q->job = job; 155 q->next = NULL; 156 157 if(pool->queue == NULL) { 158 pool->queue = q; 159 } else { 160 pool_queue_t *last_elem = pool->queue; 161 while(last_elem->next != NULL) { 162 last_elem = last_elem->next; 163 } 164 last_elem->next = q; 165 } 166 pool->queue_len++; 167 } 168 169 void threadpool_shutdown(threadpool_t *pool) { 170 int nthreads = pool->max_threads; 171 for(int i=0;i<nthreads;i++) { 172 pthread_mutex_lock(&pool->queue_lock); 173 threadpool_enqueue_job(pool, &kill_job); 174 pthread_cond_signal(&pool->available); 175 pthread_mutex_unlock(&pool->queue_lock); 176 } 177 } 178