ui/common/threadpool.c

Sat, 10 Jan 2026 19:44:10 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sat, 10 Jan 2026 19:44:10 +0100
changeset 1042
f3e2811ecf3a
parent 955
ea9a999b4fc8
permissions
-rw-r--r--

add function for buffering mainthread calls

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 *
 * Copyright 2024 Olaf Wintermann. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   1. Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *   2. Redistributions in binary form must reproduce the above copyright
 *      notice, this list of conditions and the following disclaimer in the
 *      documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _WIN32

#include "threadpool.h"
#include "context.h"
#include <cx/linked_list.h>

#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

static threadpool_job kill_job;



static pthread_mutex_t mc_buffer_mutex;
static CxList *mainthread_call_buffer;
static volatile int mainthread_call_buffered = 0;

typedef struct UiMainCall {
    ui_threadfunc func;
    void *data;
} UiMainCall;

void uic_init_threads(void) {
    pthread_mutex_init(&mc_buffer_mutex, NULL);
    mainthread_call_buffer = cxLinkedListCreate(NULL, sizeof(UiMainCall));
}

int uic_mainthread_calls_is_buffered(void) {
    return mainthread_call_buffered;
}

void uic_add_buffered_mainthread_call(ui_threadfunc func, void *data) {
    pthread_mutex_lock(&mc_buffer_mutex);
    UiMainCall call;
    call.func = func;
    call.data = data;
    cxListAdd(mainthread_call_buffer, &call);
    pthread_mutex_unlock(&mc_buffer_mutex);
}


void ui_buffer_mainthread_calls(UiBool enable_buffering) {
    mainthread_call_buffered = enable_buffering;
    if(!enable_buffering) {
        ui_exec_buffered_mainthread_calls();
    }
}
void ui_exec_buffered_mainthread_calls(void) {
    pthread_mutex_lock(&mc_buffer_mutex);
    CxIterator i = cxListIterator(mainthread_call_buffer);
    cx_foreach(UiMainCall *, call, i) {
        if(call->func) {
            call->func(call->data);
        }
    }
    cxListClear(mainthread_call_buffer);
    pthread_mutex_unlock(&mc_buffer_mutex);
}



UiThreadpool* threadpool_new(int min, int max) {
    UiThreadpool *pool = malloc(sizeof(UiThreadpool));
    pool->queue = ui_queue_create();
    pool->num_idle = 0;
    pool->min_threads = min;
    pool->max_threads = max;

    return pool;
}

int threadpool_start(UiThreadpool *pool) {
    pool->nthreads = pool->min_threads;
    pool->threads = calloc(pool->max_threads, sizeof(pthread_t));
    /* create pool threads */
    for(int i=0;i<pool->nthreads;i++) {
        if (pthread_create(&pool->threads[i], NULL, threadpool_func, pool) != 0) {
            fprintf(stderr, "uic: threadpool_start: pthread_create failed: %s", strerror(errno));
            return 1;
        }
    }
    return 0;
}

int threadpool_join(UiThreadpool *pool) {
    int err = 0;
    for(int i=0;i<pool->nthreads;i++) {
        if(pthread_join(pool->threads[i], NULL)) {
            err = 1;
        }
    }
    return err;
}

void* threadpool_func(void *data) {
    UiThreadpool *pool = (UiThreadpool*)data;
    
    for(;;) {
        threadpool_job *job = threadpool_get_job(pool);
        if(job == &kill_job) {
            break;
        }
        
        job->callback(job->data);

        free(job);
    }
    return NULL;
}

threadpool_job* threadpool_get_job(UiThreadpool *pool) {
    threadpool_job *job = ui_queue_get_wait(pool->queue); 
    return job;
}

void threadpool_run(UiThreadpool *pool, job_callback_f func, void *data) { 
    threadpool_job *job = malloc(sizeof(threadpool_job));
    job->callback = func;
    job->data = data;
    ui_queue_put(pool->queue, job);
}



UiThreadpool* ui_threadpool_create(int nthreads) {
    UiThreadpool *pool = threadpool_new(nthreads, nthreads);
    threadpool_start(pool); // TODO: check return value
    return pool;
}

void ui_threadpool_destroy(UiThreadpool* pool) {
    
}

static int ui_threadpool_job_finish(void *data) {
    UiJob *job = data;
    UiEvent event;
    event.obj = job->obj;
    event.window = job->obj->window;
    event.document = job->obj->ctx->document;
    event.intval = 0;
    event.eventdata = NULL;
    event.eventdatatype = 0;
    job->finish_callback(&event, job->finish_data);
    free(job);
    return 0;
}

static void* ui_threadpool_job_func(void *data) {
    UiJob *job = data;
    if (!job->job_func(job->job_data) && job->finish_callback) {
        ui_call_mainthread(ui_threadpool_job_finish, job);
    } else {
        free(job);
    }
    return NULL;
}

void ui_threadpool_job(UiThreadpool* pool, UiObject* obj, ui_threadfunc tf, void* td, ui_callback f, void* fd) {
    UiJob* job = malloc(sizeof(UiJob));
    job->obj = obj;
    job->job_func = tf;
    job->job_data = td;
    job->finish_callback = f;
    job->finish_data = fd;
    threadpool_run(pool, ui_threadpool_job_func, job);
}

/* --------------------------------- Queue --------------------------------- */

UiQueue* ui_queue_create(void) {
    UiQueue *queue = calloc(1, sizeof(UiQueue));
    pthread_mutex_init(&queue->lock, NULL);
    pthread_mutex_init(&queue->avlbl_lock, NULL);
    pthread_cond_init(&queue->available, NULL);
    return queue;
}

void ui_queue_free(UiQueue *queue) {
    // The queue must be empty, we could free UiQueueElm,
    // but not the payload data
    pthread_mutex_destroy(&queue->lock);
    pthread_mutex_destroy(&queue->avlbl_lock);
    pthread_cond_destroy(&queue->available);
    free(queue);
}

void ui_queue_put(UiQueue *queue, void *data) {
    // create queue element
    UiQueueElm *elm = malloc(sizeof(UiQueueElm));
    elm->data = data;
    elm->next = NULL;
    
    pthread_mutex_lock(&queue->lock);
    
    // put queue element at the end of the linked list
    if(queue->elements) {
        UiQueueElm *end = queue->elements;
        while(end->next) {
            end = end->next;
        }
        end->next = elm;
    } else {
        queue->elements = elm;
    }
    queue->length++;
    
    // signal new available data
    pthread_cond_signal(&queue->available);
    
    pthread_mutex_unlock(&queue->lock);
}

void* ui_queue_get_wait(UiQueue *queue) {
    pthread_mutex_lock(&queue->lock);

    void *data = NULL;
    while(data == NULL) {
        if(queue->length == 0) {
            pthread_cond_wait(&queue->available, &queue->lock);
            continue;
        } else {
            UiQueueElm *q = queue->elements;
            data = q->data;
            queue->elements = q->next;
            queue->length--;
            free(q);
        }
    }

    pthread_mutex_unlock(&queue->lock);
    return data;
}

#endif

mercurial