ui/common/message.c

Mon, 08 Dec 2025 18:11:54 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Mon, 08 Dec 2025 18:11:54 +0100
changeset 981
1d47e71f26b6
parent 953
c98404829cd3
child 982
9102a53c5385
permissions
-rw-r--r--

add server base structs

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 *
 * Copyright 2025 Olaf Wintermann. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   1. Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *   2. Redistributions in binary form must reproduce the above copyright
 *      notice, this list of conditions and the following disclaimer in the
 *      documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _WIN32

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include "message.h"

UiMessageHandler* uic_simple_msg_handler(int in, int out, msg_received_callback callback) {
    UiSimpleMessageHandler *handler = malloc(sizeof(UiSimpleMessageHandler));
    handler->handler.start = uic_simple_msg_handler_start;
    handler->handler.stop = uic_simple_msg_handler_stop;
    handler->handler.send = uic_simple_msg_handler_send;
    handler->handler.callback = callback;
    handler->in = in;
    handler->out = out;
    handler->outbuf = cxBufferCreate(NULL, 4096, NULL, CX_BUFFER_FREE_CONTENTS | CX_BUFFER_AUTO_EXTEND);
    handler->stop = 0;
    pthread_mutex_init(&handler->queue_lock, NULL);
    pthread_mutex_init(&handler->avlbl_lock, NULL);
    pthread_cond_init(&handler->available, NULL);  
    return (UiMessageHandler*)handler;
}

int uic_simple_msg_handler_start(UiMessageHandler *handler) {
    UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
    if(pthread_create(&sh->in_thread, NULL, uic_simple_msg_handler_in_thread, sh)) {
        return 1;
    }
    if(pthread_create(&sh->out_thread, NULL, uic_simple_msg_handler_out_thread, sh)) {
        return 1;
    }
    return 0;
}

int uic_simple_msg_handler_stop(UiMessageHandler *handler) {
    UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
    pthread_mutex_lock(&sh->queue_lock);
    sh->stop = 0;
    pthread_cond_signal(&sh->available);
    pthread_mutex_unlock(&sh->queue_lock);
    close(sh->in);
    sh->in = -1;
    
    pthread_join(sh->in_thread, NULL);
    pthread_join(sh->out_thread, NULL);
    
    return 0;
}

int uic_simple_msg_handler_send(UiMessageHandler *handler, cxstring msg) {
    UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
    pthread_mutex_lock(&sh->queue_lock);
    cxBufferWrite(msg.ptr, 1, msg.length, sh->outbuf);
    pthread_cond_signal(&sh->available);
    pthread_mutex_unlock(&sh->queue_lock);
    return 0;
}

#define HEADERBUF_SIZE 64

void* uic_simple_msg_handler_in_thread(void *data) {
    UiSimpleMessageHandler *handler = data;
    
    char *msg = NULL;
    size_t msg_size = 0;
    size_t msg_pos = 0; // currently received message length
    
    char headerbuf[HEADERBUF_SIZE];
    size_t headerpos = 0;
    
    char buf[2048];
    ssize_t r;
    while((r = read(handler->in, buf, 2024)) > 0) {
        char *buffer = buf;
        size_t available = r;
        
        while(available > 0) {
            if(msg) {
                // read message
                size_t need = msg_size - msg_pos;
                size_t cplen = r > need ? need : available;
                memcpy(msg+msg_pos, buffer, cplen);
                buffer += cplen;
                available -= cplen;
                msg_pos += cplen;
                if(msg_pos == msg_size) {
                    // message complete
                    //fprintf(stderr, "send: %.*s\n", (int)msg_size, msg);
                    if(handler->handler.callback) {
                        handler->handler.callback(cx_strn(msg, msg_size));
                    }
                    free(msg);
                    msg = NULL;
                    msg_size = 0;
                    msg_pos = 0;
                }
            } else {
                size_t header_max = HEADERBUF_SIZE - headerpos - 1;
                if(header_max > available) {
                    header_max = available;
                }
                // search for line break
                int i;
                int header_complete = 0;
                for(i=0;i<header_max;i++) {
                    if(buffer[i] == '\n') {
                        header_complete = 1;
                        break;
                    }
                }
                i++;
                memcpy(headerbuf+headerpos, buffer, i);
                headerpos += i;
                buffer += i;
                available -= i;
                
                if(header_complete) {
                    headerbuf[headerpos-1] = 0; // terminate buffer
                    char *end;
                    long length = strtol(headerbuf, &end, 10);
                    if(*end == '\0') {
                        //fprintf(stderr, "header: %d\n", (int)length);
                        msg = malloc(length);
                        msg_size = length;
                        headerpos = 0;
                    } else {
                        fprintf(stderr, "Error: invalid message {%s}\n", headerbuf);
                    }
                } else if(headerpos+1 >= HEADERBUF_SIZE) {
                    fprintf(stderr, "Error: message header too big\n");
                    exit(-1);
                }
            }
        }
        
        
    }
    perror("error");
    fprintf(stderr, "stop simple_msg_handler_in_thread\n");
    
    return NULL;
}

void* uic_simple_msg_handler_out_thread(void *data) {
    UiSimpleMessageHandler *handler = data;
    CxBuffer *buffer = handler->outbuf;
    
    pthread_mutex_lock(&handler->queue_lock);
    
    for(;;) {
        if(buffer->pos == 0) {
            pthread_cond_wait(&handler->available, &handler->queue_lock);
            continue;
        } else {
            size_t n = buffer->pos;
            size_t pos = 0;
            while(n > 0) {
                ssize_t w = write(handler->out, buffer->space + pos, n);
                if(w <= 0) {
                    fprintf(stderr, "Error: output error\n");
                    break;
                }
                n -= w;
                pos += w;
            }
            if(n > 0) {
                break; // error
            }
            buffer->pos = 0;
        }
    }
    
    pthread_mutex_unlock(&handler->queue_lock);
    
    return NULL;
}

#endif

mercurial