ui/common/threadpool.c

changeset 950
39641cf150eb
parent 949
ef8f13c8c08f
child 955
ea9a999b4fc8
equal deleted inserted replaced
949:ef8f13c8c08f 950:39641cf150eb
38 38
39 static threadpool_job kill_job; 39 static threadpool_job kill_job;
40 40
41 UiThreadpool* threadpool_new(int min, int max) { 41 UiThreadpool* threadpool_new(int min, int max) {
42 UiThreadpool *pool = malloc(sizeof(UiThreadpool)); 42 UiThreadpool *pool = malloc(sizeof(UiThreadpool));
43 pool->queue = NULL; 43 pool->queue = ui_queue_create();
44 pool->queue_len = 0;
45 pool->num_idle = 0; 44 pool->num_idle = 0;
46 pool->min_threads = min; 45 pool->min_threads = min;
47 pool->max_threads = max; 46 pool->max_threads = max;
48
49 pthread_mutex_init(&pool->queue_lock, NULL);
50 pthread_mutex_init(&pool->avlbl_lock, NULL);
51 pthread_cond_init(&pool->available, NULL);
52 47
53 return pool; 48 return pool;
54 } 49 }
55 50
56 int threadpool_start(UiThreadpool *pool) { 51 int threadpool_start(UiThreadpool *pool) {
91 } 86 }
92 return NULL; 87 return NULL;
93 } 88 }
94 89
95 threadpool_job* threadpool_get_job(UiThreadpool *pool) { 90 threadpool_job* threadpool_get_job(UiThreadpool *pool) {
96 pthread_mutex_lock(&pool->queue_lock); 91 threadpool_job *job = ui_queue_get_wait(pool->queue);
97
98 threadpool_job *job = NULL;
99 pool->num_idle++;
100 while(job == NULL) {
101 if(pool->queue_len == 0) {
102 pthread_cond_wait(&pool->available, &pool->queue_lock);
103 continue;
104 } else {
105 pool_queue_t *q = pool->queue;
106 job = q->job;
107 pool->queue = q->next;
108 pool->queue_len--;
109 free(q);
110 }
111 }
112 pool->num_idle--;
113
114 pthread_mutex_unlock(&pool->queue_lock);
115 return job; 92 return job;
116 } 93 }
117 94
118 void threadpool_run(UiThreadpool *pool, job_callback_f func, void *data) { 95 void threadpool_run(UiThreadpool *pool, job_callback_f func, void *data) {
119 // TODO: handle errors
120
121 threadpool_job *job = malloc(sizeof(threadpool_job)); 96 threadpool_job *job = malloc(sizeof(threadpool_job));
122 job->callback = func; 97 job->callback = func;
123 job->data = data; 98 job->data = data;
124 99 ui_queue_put(pool->queue, job);
125 pthread_mutex_lock(&pool->queue_lock); 100 }
126 threadpool_enqueue_job(pool, job);
127
128 int create_thread = 0;
129 int destroy_thread = 0;
130 int diff = pool->queue_len - pool->num_idle;
131
132 //if(pool->queue_len == 1) {
133 pthread_cond_signal(&pool->available);
134 //}
135
136 pthread_mutex_unlock(&pool->queue_lock);
137 }
138
139 void threadpool_enqueue_job(UiThreadpool *pool, threadpool_job *job) {
140 pool_queue_t *q = malloc(sizeof(pool_queue_t));
141 q->job = job;
142 q->next = NULL;
143
144 if(pool->queue == NULL) {
145 pool->queue = q;
146 } else {
147 pool_queue_t *last_elem = pool->queue;
148 while(last_elem->next != NULL) {
149 last_elem = last_elem->next;
150 }
151 last_elem->next = q;
152 }
153 pool->queue_len++;
154 }
155
156
157
158 101
159 102
160 103
161 UiThreadpool* ui_threadpool_create(int nthreads) { 104 UiThreadpool* ui_threadpool_create(int nthreads) {
162 UiThreadpool *pool = threadpool_new(nthreads, nthreads); 105 UiThreadpool *pool = threadpool_new(nthreads, nthreads);
200 job->finish_callback = f; 143 job->finish_callback = f;
201 job->finish_data = fd; 144 job->finish_data = fd;
202 threadpool_run(pool, ui_threadpool_job_func, job); 145 threadpool_run(pool, ui_threadpool_job_func, job);
203 } 146 }
204 147
148 /* --------------------------------- Queue --------------------------------- */
149
150 UiQueue* ui_queue_create(void) {
151 UiQueue *queue = calloc(1, sizeof(UiQueue));
152 pthread_mutex_init(&queue->lock, NULL);
153 pthread_mutex_init(&queue->avlbl_lock, NULL);
154 pthread_cond_init(&queue->available, NULL);
155 return queue;
156 }
157
158 void ui_queue_free(UiQueue *queue) {
159 // TODO
160 }
161
162 void ui_queue_put(UiQueue *queue, void *data) {
163 // create queue element
164 UiQueueElm *elm = malloc(sizeof(UiQueueElm));
165 elm->data = data;
166 elm->next = NULL;
167
168 pthread_mutex_lock(&queue->lock);
169
170 // put queue element at the end of the linked list
171 if(queue->elements) {
172 UiQueueElm *end = queue->elements;
173 while(end->next) {
174 end = end->next;
175 }
176 end->next = elm;
177 } else {
178 queue->elements = elm;
179 }
180 queue->length++;
181
182 // signal new available data
183 pthread_cond_signal(&queue->available);
184
185 pthread_mutex_unlock(&queue->lock);
186 }
187
188 void* ui_queue_get_wait(UiQueue *queue) {
189 pthread_mutex_lock(&queue->lock);
190
191 void *data = NULL;
192 while(data == NULL) {
193 if(queue->length == 0) {
194 pthread_cond_wait(&queue->available, &queue->lock);
195 continue;
196 } else {
197 UiQueueElm *q = queue->elements;
198 data = q->data;
199 queue->elements = q->next;
200 queue->length--;
201 free(q);
202 }
203 }
204
205 pthread_mutex_unlock(&queue->lock);
206 return data;
207 }
205 208
206 #endif 209 #endif
207 210

mercurial