Mon, 08 Dec 2025 18:11:54 +0100
add server base structs
/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * * Copyright 2025 Olaf Wintermann. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #ifndef _WIN32 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include "message.h" UiMessageHandler* uic_simple_msg_handler(int in, int out, msg_received_callback callback) { UiSimpleMessageHandler *handler = malloc(sizeof(UiSimpleMessageHandler)); handler->handler.start = uic_simple_msg_handler_start; handler->handler.stop = uic_simple_msg_handler_stop; handler->handler.send = uic_simple_msg_handler_send; handler->handler.callback = callback; handler->in = in; handler->out = out; handler->outbuf = cxBufferCreate(NULL, 4096, NULL, CX_BUFFER_FREE_CONTENTS | CX_BUFFER_AUTO_EXTEND); handler->stop = 0; pthread_mutex_init(&handler->queue_lock, NULL); pthread_mutex_init(&handler->avlbl_lock, NULL); pthread_cond_init(&handler->available, NULL); return (UiMessageHandler*)handler; } int uic_simple_msg_handler_start(UiMessageHandler *handler) { UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler; if(pthread_create(&sh->in_thread, NULL, uic_simple_msg_handler_in_thread, sh)) { return 1; } if(pthread_create(&sh->out_thread, NULL, uic_simple_msg_handler_out_thread, sh)) { return 1; } return 0; } int uic_simple_msg_handler_stop(UiMessageHandler *handler) { UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler; pthread_mutex_lock(&sh->queue_lock); sh->stop = 0; pthread_cond_signal(&sh->available); pthread_mutex_unlock(&sh->queue_lock); close(sh->in); sh->in = -1; pthread_join(sh->in_thread, NULL); pthread_join(sh->out_thread, NULL); return 0; } int uic_simple_msg_handler_send(UiMessageHandler *handler, cxstring msg) { UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler; pthread_mutex_lock(&sh->queue_lock); cxBufferWrite(msg.ptr, 1, msg.length, sh->outbuf); pthread_cond_signal(&sh->available); pthread_mutex_unlock(&sh->queue_lock); return 0; } #define HEADERBUF_SIZE 64 void* uic_simple_msg_handler_in_thread(void *data) { UiSimpleMessageHandler *handler = data; char *msg = NULL; size_t msg_size = 0; size_t msg_pos = 0; // currently received message length char headerbuf[HEADERBUF_SIZE]; size_t headerpos = 0; char buf[2048]; ssize_t r; while((r = read(handler->in, buf, 2024)) > 0) { char *buffer = buf; size_t available = r; while(available > 0) { if(msg) { // read message size_t need = msg_size - msg_pos; size_t cplen = r > need ? need : available; memcpy(msg+msg_pos, buffer, cplen); buffer += cplen; available -= cplen; msg_pos += cplen; if(msg_pos == msg_size) { // message complete //fprintf(stderr, "send: %.*s\n", (int)msg_size, msg); if(handler->handler.callback) { handler->handler.callback(cx_strn(msg, msg_size)); } free(msg); msg = NULL; msg_size = 0; msg_pos = 0; } } else { size_t header_max = HEADERBUF_SIZE - headerpos - 1; if(header_max > available) { header_max = available; } // search for line break int i; int header_complete = 0; for(i=0;i<header_max;i++) { if(buffer[i] == '\n') { header_complete = 1; break; } } i++; memcpy(headerbuf+headerpos, buffer, i); headerpos += i; buffer += i; available -= i; if(header_complete) { headerbuf[headerpos-1] = 0; // terminate buffer char *end; long length = strtol(headerbuf, &end, 10); if(*end == '\0') { //fprintf(stderr, "header: %d\n", (int)length); msg = malloc(length); msg_size = length; headerpos = 0; } else { fprintf(stderr, "Error: invalid message {%s}\n", headerbuf); } } else if(headerpos+1 >= HEADERBUF_SIZE) { fprintf(stderr, "Error: message header too big\n"); exit(-1); } } } } perror("error"); fprintf(stderr, "stop simple_msg_handler_in_thread\n"); return NULL; } void* uic_simple_msg_handler_out_thread(void *data) { UiSimpleMessageHandler *handler = data; CxBuffer *buffer = handler->outbuf; pthread_mutex_lock(&handler->queue_lock); for(;;) { if(buffer->pos == 0) { pthread_cond_wait(&handler->available, &handler->queue_lock); continue; } else { size_t n = buffer->pos; size_t pos = 0; while(n > 0) { ssize_t w = write(handler->out, buffer->space + pos, n); if(w <= 0) { fprintf(stderr, "Error: output error\n"); break; } n -= w; pos += w; } if(n > 0) { break; // error } buffer->pos = 0; } } pthread_mutex_unlock(&handler->queue_lock); return NULL; } #endif