UNIXworkcode

1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 3 * 4 * Copyright 2013 Olaf Wintermann. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include <stdio.h> 30 #include <stdlib.h> 31 #include <unistd.h> 32 33 #include "atomic.h" 34 #include "thrpool.h" 35 36 static threadpool_job kill_job; 37 38 threadpool_t* threadpool_new(int min, int max) { 39 log_ereport(LOG_VERBOSE, "new threadpool (min: %d, max: %d)", min, max); 40 threadpool_t *pool = malloc(sizeof(threadpool_t)); 41 pool->queue = NULL; 42 pool->queue_len = 0; 43 pool->num_idle = 0; 44 pool->min_threads = min; 45 pool->max_threads = max; 46 pool->num_threads = 0; 47 48 pthread_mutex_init(&pool->queue_lock, NULL); 49 pthread_mutex_init(&pool->avlbl_lock, NULL); 50 pthread_cond_init(&pool->available, NULL); 51 52 /* create pool threads */ 53 for(int i=0;i<min;i++) { 54 pthread_t t; 55 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { 56 perror("Error: threadpool_new: pthread_create"); 57 return NULL; 58 } 59 } 60 61 return pool; 62 } 63 64 void* threadpool_func(void *data) { 65 threadpool_t *pool = (threadpool_t*)data; 66 67 ws_atomic_inc32(&pool->num_threads); 68 for(;;) { 69 threadpool_job *job = threadpool_get_job(pool); 70 if(job == &kill_job) { 71 break; 72 } 73 74 job->callback(job->data); 75 76 free(job); 77 } 78 ws_atomic_dec32(&pool->num_threads); 79 return NULL; 80 } 81 82 threadpool_job* threadpool_get_job(threadpool_t *pool) { 83 pthread_mutex_lock(&pool->queue_lock); 84 85 threadpool_job *job = NULL; 86 pool->num_idle++; 87 while(job == NULL) { 88 if(pool->queue_len == 0) { 89 pthread_cond_wait(&pool->available, &pool->queue_lock); 90 continue; 91 } else { 92 pool_queue_t *q = pool->queue; 93 job = q->job; 94 pool->queue = q->next; 95 pool->queue_len--; 96 free(q); 97 } 98 } 99 pool->num_idle--; 100 101 pthread_mutex_unlock(&pool->queue_lock); 102 return job; 103 } 104 105 void threadpool_run(threadpool_t *pool, job_callback_f func, void *data) { 106 threadpool_job *job = malloc(sizeof(threadpool_job)); 107 job->callback = func; 108 job->data = data; 109 110 pthread_mutex_lock(&pool->queue_lock); 111 threadpool_enqueue_job(pool, job); 112 113 int create_thread = 0; 114 int destroy_thread = 0; 115 int diff = pool->queue_len - pool->num_idle; 116 if(diff > 0 && pool->num_threads < pool->max_threads) { 117 create_thread = 1; 118 } else if(diff < 0 && pool->num_threads > pool->min_threads) { 119 destroy_thread = 1; 120 } 121 122 //if(pool->queue_len == 1) { 123 pthread_cond_signal(&pool->available); 124 //} 125 126 if(create_thread) { 127 pthread_t t; 128 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { 129 perror("Error: threadpool_run: pthread_create"); 130 } 131 } 132 if(destroy_thread) { 133 threadpool_enqueue_job(pool, &kill_job); 134 pthread_cond_signal(&pool->available); 135 } 136 137 pthread_mutex_unlock(&pool->queue_lock); 138 } 139 140 void threadpool_enqueue_job(threadpool_t *pool, threadpool_job *job) { 141 pool_queue_t *q = malloc(sizeof(pool_queue_t)); 142 q->job = job; 143 q->next = NULL; 144 145 if(pool->queue == NULL) { 146 pool->queue = q; 147 } else { 148 pool_queue_t *last_elem = pool->queue; 149 while(last_elem->next != NULL) { 150 last_elem = last_elem->next; 151 } 152 last_elem->next = q; 153 } 154 pool->queue_len++; 155 } 156