| 38 |
38 |
| 39 static threadpool_job kill_job; |
39 static threadpool_job kill_job; |
| 40 |
40 |
| 41 UiThreadpool* threadpool_new(int min, int max) { |
41 UiThreadpool* threadpool_new(int min, int max) { |
| 42 UiThreadpool *pool = malloc(sizeof(UiThreadpool)); |
42 UiThreadpool *pool = malloc(sizeof(UiThreadpool)); |
| 43 pool->queue = NULL; |
43 pool->queue = ui_queue_create(); |
| 44 pool->queue_len = 0; |
|
| 45 pool->num_idle = 0; |
44 pool->num_idle = 0; |
| 46 pool->min_threads = min; |
45 pool->min_threads = min; |
| 47 pool->max_threads = max; |
46 pool->max_threads = max; |
| 48 |
47 |
| 49 pthread_mutex_init(&pool->queue_lock, NULL); |
|
| 50 pthread_mutex_init(&pool->avlbl_lock, NULL); |
|
| 51 pthread_cond_init(&pool->available, NULL); |
|
| 52 |
|
| 53 return pool; |
48 return pool; |
| 54 } |
49 } |
| 55 |
50 |
| 56 int threadpool_start(UiThreadpool *pool) { |
51 int threadpool_start(UiThreadpool *pool) { |
| |
52 pool->nthreads = pool->min_threads; |
| |
53 pool->threads = calloc(pool->max_threads, sizeof(pthread_t)); |
| 57 /* create pool threads */ |
54 /* create pool threads */ |
| 58 for(int i=0;i<pool->min_threads;i++) { |
55 for(int i=0;i<pool->nthreads;i++) { |
| 59 pthread_t t; |
56 if (pthread_create(&pool->threads[i], NULL, threadpool_func, pool) != 0) { |
| 60 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) { |
|
| 61 fprintf(stderr, "uic: threadpool_start: pthread_create failed: %s", strerror(errno)); |
57 fprintf(stderr, "uic: threadpool_start: pthread_create failed: %s", strerror(errno)); |
| 62 return 1; |
58 return 1; |
| 63 } |
59 } |
| 64 } |
60 } |
| 65 return 0; |
61 return 0; |
| |
62 } |
| |
63 |
| |
64 int threadpool_join(UiThreadpool *pool) { |
| |
65 int err = 0; |
| |
66 for(int i=0;i<pool->nthreads;i++) { |
| |
67 if(pthread_join(pool->threads[i], NULL)) { |
| |
68 err = 1; |
| |
69 } |
| |
70 } |
| |
71 return err; |
| 66 } |
72 } |
| 67 |
73 |
| 68 void* threadpool_func(void *data) { |
74 void* threadpool_func(void *data) { |
| 69 UiThreadpool *pool = (UiThreadpool*)data; |
75 UiThreadpool *pool = (UiThreadpool*)data; |
| 70 |
76 |
| 80 } |
86 } |
| 81 return NULL; |
87 return NULL; |
| 82 } |
88 } |
| 83 |
89 |
| 84 threadpool_job* threadpool_get_job(UiThreadpool *pool) { |
90 threadpool_job* threadpool_get_job(UiThreadpool *pool) { |
| 85 pthread_mutex_lock(&pool->queue_lock); |
91 threadpool_job *job = ui_queue_get_wait(pool->queue); |
| 86 |
|
| 87 threadpool_job *job = NULL; |
|
| 88 pool->num_idle++; |
|
| 89 while(job == NULL) { |
|
| 90 if(pool->queue_len == 0) { |
|
| 91 pthread_cond_wait(&pool->available, &pool->queue_lock); |
|
| 92 continue; |
|
| 93 } else { |
|
| 94 pool_queue_t *q = pool->queue; |
|
| 95 job = q->job; |
|
| 96 pool->queue = q->next; |
|
| 97 pool->queue_len--; |
|
| 98 free(q); |
|
| 99 } |
|
| 100 } |
|
| 101 pool->num_idle--; |
|
| 102 |
|
| 103 pthread_mutex_unlock(&pool->queue_lock); |
|
| 104 return job; |
92 return job; |
| 105 } |
93 } |
| 106 |
94 |
| 107 void threadpool_run(UiThreadpool *pool, job_callback_f func, void *data) { |
95 void threadpool_run(UiThreadpool *pool, job_callback_f func, void *data) { |
| 108 // TODO: handle errors |
|
| 109 |
|
| 110 threadpool_job *job = malloc(sizeof(threadpool_job)); |
96 threadpool_job *job = malloc(sizeof(threadpool_job)); |
| 111 job->callback = func; |
97 job->callback = func; |
| 112 job->data = data; |
98 job->data = data; |
| 113 |
99 ui_queue_put(pool->queue, job); |
| 114 pthread_mutex_lock(&pool->queue_lock); |
100 } |
| 115 threadpool_enqueue_job(pool, job); |
|
| 116 |
|
| 117 int create_thread = 0; |
|
| 118 int destroy_thread = 0; |
|
| 119 int diff = pool->queue_len - pool->num_idle; |
|
| 120 |
|
| 121 //if(pool->queue_len == 1) { |
|
| 122 pthread_cond_signal(&pool->available); |
|
| 123 //} |
|
| 124 |
|
| 125 pthread_mutex_unlock(&pool->queue_lock); |
|
| 126 } |
|
| 127 |
|
| 128 void threadpool_enqueue_job(UiThreadpool *pool, threadpool_job *job) { |
|
| 129 pool_queue_t *q = malloc(sizeof(pool_queue_t)); |
|
| 130 q->job = job; |
|
| 131 q->next = NULL; |
|
| 132 |
|
| 133 if(pool->queue == NULL) { |
|
| 134 pool->queue = q; |
|
| 135 } else { |
|
| 136 pool_queue_t *last_elem = pool->queue; |
|
| 137 while(last_elem->next != NULL) { |
|
| 138 last_elem = last_elem->next; |
|
| 139 } |
|
| 140 last_elem->next = q; |
|
| 141 } |
|
| 142 pool->queue_len++; |
|
| 143 } |
|
| 144 |
|
| 145 |
|
| 146 |
|
| 147 |
101 |
| 148 |
102 |
| 149 |
103 |
| 150 UiThreadpool* ui_threadpool_create(int nthreads) { |
104 UiThreadpool* ui_threadpool_create(int nthreads) { |
| 151 UiThreadpool *pool = threadpool_new(nthreads, nthreads); |
105 UiThreadpool *pool = threadpool_new(nthreads, nthreads); |
| 189 job->finish_callback = f; |
143 job->finish_callback = f; |
| 190 job->finish_data = fd; |
144 job->finish_data = fd; |
| 191 threadpool_run(pool, ui_threadpool_job_func, job); |
145 threadpool_run(pool, ui_threadpool_job_func, job); |
| 192 } |
146 } |
| 193 |
147 |
| |
148 /* --------------------------------- Queue --------------------------------- */ |
| |
149 |
| |
150 UiQueue* ui_queue_create(void) { |
| |
151 UiQueue *queue = calloc(1, sizeof(UiQueue)); |
| |
152 pthread_mutex_init(&queue->lock, NULL); |
| |
153 pthread_mutex_init(&queue->avlbl_lock, NULL); |
| |
154 pthread_cond_init(&queue->available, NULL); |
| |
155 return queue; |
| |
156 } |
| |
157 |
| |
158 void ui_queue_free(UiQueue *queue) { |
| |
159 // The queue must be empty, we could free UiQueueElm, |
| |
160 // but not the payload data |
| |
161 pthread_mutex_destroy(&queue->lock); |
| |
162 pthread_mutex_destroy(&queue->avlbl_lock); |
| |
163 pthread_cond_destroy(&queue->available); |
| |
164 free(queue); |
| |
165 } |
| |
166 |
| |
167 void ui_queue_put(UiQueue *queue, void *data) { |
| |
168 // create queue element |
| |
169 UiQueueElm *elm = malloc(sizeof(UiQueueElm)); |
| |
170 elm->data = data; |
| |
171 elm->next = NULL; |
| |
172 |
| |
173 pthread_mutex_lock(&queue->lock); |
| |
174 |
| |
175 // put queue element at the end of the linked list |
| |
176 if(queue->elements) { |
| |
177 UiQueueElm *end = queue->elements; |
| |
178 while(end->next) { |
| |
179 end = end->next; |
| |
180 } |
| |
181 end->next = elm; |
| |
182 } else { |
| |
183 queue->elements = elm; |
| |
184 } |
| |
185 queue->length++; |
| |
186 |
| |
187 // signal new available data |
| |
188 pthread_cond_signal(&queue->available); |
| |
189 |
| |
190 pthread_mutex_unlock(&queue->lock); |
| |
191 } |
| |
192 |
| |
193 void* ui_queue_get_wait(UiQueue *queue) { |
| |
194 pthread_mutex_lock(&queue->lock); |
| |
195 |
| |
196 void *data = NULL; |
| |
197 while(data == NULL) { |
| |
198 if(queue->length == 0) { |
| |
199 pthread_cond_wait(&queue->available, &queue->lock); |
| |
200 continue; |
| |
201 } else { |
| |
202 UiQueueElm *q = queue->elements; |
| |
203 data = q->data; |
| |
204 queue->elements = q->next; |
| |
205 queue->length--; |
| |
206 free(q); |
| |
207 } |
| |
208 } |
| |
209 |
| |
210 pthread_mutex_unlock(&queue->lock); |
| |
211 return data; |
| |
212 } |
| 194 |
213 |
| 195 #endif |
214 #endif |
| 196 |
215 |