ui/common/threadpool.c

changeset 115
e57ca2747782
parent 108
77254bd6dccb
equal deleted inserted replaced
114:3da24640513a 115:e57ca2747782
38 38
39 static threadpool_job kill_job; 39 static threadpool_job kill_job;
40 40
41 UiThreadpool* threadpool_new(int min, int max) { 41 UiThreadpool* threadpool_new(int min, int max) {
42 UiThreadpool *pool = malloc(sizeof(UiThreadpool)); 42 UiThreadpool *pool = malloc(sizeof(UiThreadpool));
43 pool->queue = NULL; 43 pool->queue = ui_queue_create();
44 pool->queue_len = 0;
45 pool->num_idle = 0; 44 pool->num_idle = 0;
46 pool->min_threads = min; 45 pool->min_threads = min;
47 pool->max_threads = max; 46 pool->max_threads = max;
48 47
49 pthread_mutex_init(&pool->queue_lock, NULL);
50 pthread_mutex_init(&pool->avlbl_lock, NULL);
51 pthread_cond_init(&pool->available, NULL);
52
53 return pool; 48 return pool;
54 } 49 }
55 50
56 int threadpool_start(UiThreadpool *pool) { 51 int threadpool_start(UiThreadpool *pool) {
52 pool->nthreads = pool->min_threads;
53 pool->threads = calloc(pool->max_threads, sizeof(pthread_t));
57 /* create pool threads */ 54 /* create pool threads */
58 for(int i=0;i<pool->min_threads;i++) { 55 for(int i=0;i<pool->nthreads;i++) {
59 pthread_t t; 56 if (pthread_create(&pool->threads[i], NULL, threadpool_func, pool) != 0) {
60 if (pthread_create(&t, NULL, threadpool_func, pool) != 0) {
61 fprintf(stderr, "uic: threadpool_start: pthread_create failed: %s", strerror(errno)); 57 fprintf(stderr, "uic: threadpool_start: pthread_create failed: %s", strerror(errno));
62 return 1; 58 return 1;
63 } 59 }
64 } 60 }
65 return 0; 61 return 0;
62 }
63
64 int threadpool_join(UiThreadpool *pool) {
65 int err = 0;
66 for(int i=0;i<pool->nthreads;i++) {
67 if(pthread_join(pool->threads[i], NULL)) {
68 err = 1;
69 }
70 }
71 return err;
66 } 72 }
67 73
68 void* threadpool_func(void *data) { 74 void* threadpool_func(void *data) {
69 UiThreadpool *pool = (UiThreadpool*)data; 75 UiThreadpool *pool = (UiThreadpool*)data;
70 76
80 } 86 }
81 return NULL; 87 return NULL;
82 } 88 }
83 89
84 threadpool_job* threadpool_get_job(UiThreadpool *pool) { 90 threadpool_job* threadpool_get_job(UiThreadpool *pool) {
85 pthread_mutex_lock(&pool->queue_lock); 91 threadpool_job *job = ui_queue_get_wait(pool->queue);
86
87 threadpool_job *job = NULL;
88 pool->num_idle++;
89 while(job == NULL) {
90 if(pool->queue_len == 0) {
91 pthread_cond_wait(&pool->available, &pool->queue_lock);
92 continue;
93 } else {
94 pool_queue_t *q = pool->queue;
95 job = q->job;
96 pool->queue = q->next;
97 pool->queue_len--;
98 free(q);
99 }
100 }
101 pool->num_idle--;
102
103 pthread_mutex_unlock(&pool->queue_lock);
104 return job; 92 return job;
105 } 93 }
106 94
107 void threadpool_run(UiThreadpool *pool, job_callback_f func, void *data) { 95 void threadpool_run(UiThreadpool *pool, job_callback_f func, void *data) {
108 // TODO: handle errors
109
110 threadpool_job *job = malloc(sizeof(threadpool_job)); 96 threadpool_job *job = malloc(sizeof(threadpool_job));
111 job->callback = func; 97 job->callback = func;
112 job->data = data; 98 job->data = data;
113 99 ui_queue_put(pool->queue, job);
114 pthread_mutex_lock(&pool->queue_lock); 100 }
115 threadpool_enqueue_job(pool, job);
116
117 int create_thread = 0;
118 int destroy_thread = 0;
119 int diff = pool->queue_len - pool->num_idle;
120
121 //if(pool->queue_len == 1) {
122 pthread_cond_signal(&pool->available);
123 //}
124
125 pthread_mutex_unlock(&pool->queue_lock);
126 }
127
128 void threadpool_enqueue_job(UiThreadpool *pool, threadpool_job *job) {
129 pool_queue_t *q = malloc(sizeof(pool_queue_t));
130 q->job = job;
131 q->next = NULL;
132
133 if(pool->queue == NULL) {
134 pool->queue = q;
135 } else {
136 pool_queue_t *last_elem = pool->queue;
137 while(last_elem->next != NULL) {
138 last_elem = last_elem->next;
139 }
140 last_elem->next = q;
141 }
142 pool->queue_len++;
143 }
144
145
146
147 101
148 102
149 103
150 UiThreadpool* ui_threadpool_create(int nthreads) { 104 UiThreadpool* ui_threadpool_create(int nthreads) {
151 UiThreadpool *pool = threadpool_new(nthreads, nthreads); 105 UiThreadpool *pool = threadpool_new(nthreads, nthreads);
189 job->finish_callback = f; 143 job->finish_callback = f;
190 job->finish_data = fd; 144 job->finish_data = fd;
191 threadpool_run(pool, ui_threadpool_job_func, job); 145 threadpool_run(pool, ui_threadpool_job_func, job);
192 } 146 }
193 147
148 /* --------------------------------- Queue --------------------------------- */
149
150 UiQueue* ui_queue_create(void) {
151 UiQueue *queue = calloc(1, sizeof(UiQueue));
152 pthread_mutex_init(&queue->lock, NULL);
153 pthread_mutex_init(&queue->avlbl_lock, NULL);
154 pthread_cond_init(&queue->available, NULL);
155 return queue;
156 }
157
158 void ui_queue_free(UiQueue *queue) {
159 // The queue must be empty, we could free UiQueueElm,
160 // but not the payload data
161 pthread_mutex_destroy(&queue->lock);
162 pthread_mutex_destroy(&queue->avlbl_lock);
163 pthread_cond_destroy(&queue->available);
164 free(queue);
165 }
166
167 void ui_queue_put(UiQueue *queue, void *data) {
168 // create queue element
169 UiQueueElm *elm = malloc(sizeof(UiQueueElm));
170 elm->data = data;
171 elm->next = NULL;
172
173 pthread_mutex_lock(&queue->lock);
174
175 // put queue element at the end of the linked list
176 if(queue->elements) {
177 UiQueueElm *end = queue->elements;
178 while(end->next) {
179 end = end->next;
180 }
181 end->next = elm;
182 } else {
183 queue->elements = elm;
184 }
185 queue->length++;
186
187 // signal new available data
188 pthread_cond_signal(&queue->available);
189
190 pthread_mutex_unlock(&queue->lock);
191 }
192
193 void* ui_queue_get_wait(UiQueue *queue) {
194 pthread_mutex_lock(&queue->lock);
195
196 void *data = NULL;
197 while(data == NULL) {
198 if(queue->length == 0) {
199 pthread_cond_wait(&queue->available, &queue->lock);
200 continue;
201 } else {
202 UiQueueElm *q = queue->elements;
203 data = q->data;
204 queue->elements = q->next;
205 queue->length--;
206 free(q);
207 }
208 }
209
210 pthread_mutex_unlock(&queue->lock);
211 return data;
212 }
194 213
195 #endif 214 #endif
196 215

mercurial