UNIXworkcode

1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 3 * 4 * Copyright 2025 Olaf Wintermann. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #ifndef _WIN32 30 31 #include <stdio.h> 32 #include <stdlib.h> 33 #include <unistd.h> 34 35 #include "message.h" 36 37 int uic_message_send_(UiMessageHandler *handler, cxstring msg) { 38 return handler->send(handler, msg); 39 } 40 41 UiMessageHandler* uic_simple_msg_handler(int in, int out, msg_received_callback callback) { 42 UiSimpleMessageHandler *handler = malloc(sizeof(UiSimpleMessageHandler)); 43 handler->handler.start = uic_simple_msg_handler_start; 44 handler->handler.stop = uic_simple_msg_handler_stop; 45 handler->handler.send = uic_simple_msg_handler_send; 46 handler->handler.callback = callback; 47 handler->in = in; 48 handler->out = out; 49 handler->outbuf = cxBufferCreate(NULL, 4096, NULL, CX_BUFFER_FREE_CONTENTS | CX_BUFFER_AUTO_EXTEND); 50 handler->stop = 0; 51 pthread_mutex_init(&handler->queue_lock, NULL); 52 pthread_mutex_init(&handler->avlbl_lock, NULL); 53 pthread_cond_init(&handler->available, NULL); 54 return (UiMessageHandler*)handler; 55 } 56 57 int uic_simple_msg_handler_start(UiMessageHandler *handler) { 58 UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler; 59 if(pthread_create(&sh->in_thread, NULL, uic_simple_msg_handler_in_thread, sh)) { 60 return 1; 61 } 62 if(pthread_create(&sh->out_thread, NULL, uic_simple_msg_handler_out_thread, sh)) { 63 return 1; 64 } 65 return 0; 66 } 67 68 int uic_simple_msg_handler_stop(UiMessageHandler *handler) { 69 UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler; 70 pthread_mutex_lock(&sh->queue_lock); 71 sh->stop = 0; 72 pthread_cond_signal(&sh->available); 73 pthread_mutex_unlock(&sh->queue_lock); 74 close(sh->in); 75 sh->in = -1; 76 77 pthread_join(sh->in_thread, NULL); 78 pthread_join(sh->out_thread, NULL); 79 80 return 0; 81 } 82 83 int uic_simple_msg_handler_send(UiMessageHandler *handler, cxstring msg) { 84 UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler; 85 pthread_mutex_lock(&sh->queue_lock); 86 char header[32]; 87 snprintf(header, 32, "%zu\n", msg.length); 88 cxBufferPutString(sh->outbuf, header); 89 cxBufferWrite(msg.ptr, 1, msg.length, sh->outbuf); 90 pthread_cond_signal(&sh->available); 91 pthread_mutex_unlock(&sh->queue_lock); 92 return 0; 93 } 94 95 #define HEADERBUF_SIZE 64 96 97 void* uic_simple_msg_handler_in_thread(void *data) { 98 UiSimpleMessageHandler *handler = data; 99 100 char *msg = NULL; 101 size_t msg_size = 0; 102 size_t msg_pos = 0; // currently received message length 103 104 char headerbuf[HEADERBUF_SIZE]; 105 size_t headerpos = 0; 106 107 char buf[2048]; 108 ssize_t r; 109 while((r = read(handler->in, buf, 2024)) > 0) { 110 char *buffer = buf; 111 size_t available = r; 112 113 while(available > 0) { 114 if(msg) { 115 // read message 116 size_t need = msg_size - msg_pos; 117 size_t cplen = r > need ? need : available; 118 memcpy(msg+msg_pos, buffer, cplen); 119 buffer += cplen; 120 available -= cplen; 121 msg_pos += cplen; 122 if(msg_pos == msg_size) { 123 // message complete 124 //fprintf(stderr, "send: %.*s\n", (int)msg_size, msg); 125 if(handler->handler.callback) { 126 handler->handler.callback(cx_strn(msg, msg_size)); 127 } 128 free(msg); 129 msg = NULL; 130 msg_size = 0; 131 msg_pos = 0; 132 } 133 } else { 134 size_t header_max = HEADERBUF_SIZE - headerpos - 1; 135 if(header_max > available) { 136 header_max = available; 137 } 138 // search for line break 139 int i; 140 int header_complete = 0; 141 for(i=0;i<header_max;i++) { 142 if(buffer[i] == '\n') { 143 header_complete = 1; 144 break; 145 } 146 } 147 i++; 148 memcpy(headerbuf+headerpos, buffer, i); 149 headerpos += i; 150 buffer += i; 151 available -= i; 152 153 if(header_complete) { 154 headerbuf[headerpos-1] = 0; // terminate buffer 155 char *end; 156 long length = strtol(headerbuf, &end, 10); 157 if(*end == '\0') { 158 //fprintf(stderr, "header: %d\n", (int)length); 159 msg = malloc(length); 160 msg_size = length; 161 headerpos = 0; 162 } else { 163 fprintf(stderr, "Error: invalid message {%s}\n", headerbuf); 164 } 165 } else if(headerpos+1 >= HEADERBUF_SIZE) { 166 fprintf(stderr, "Error: message header too big\n"); 167 exit(-1); 168 } 169 } 170 } 171 172 173 } 174 perror("error"); 175 fprintf(stderr, "stop simple_msg_handler_in_thread\n"); 176 177 return NULL; 178 } 179 180 void* uic_simple_msg_handler_out_thread(void *data) { 181 UiSimpleMessageHandler *handler = data; 182 CxBuffer *buffer = handler->outbuf; 183 184 pthread_mutex_lock(&handler->queue_lock); 185 186 for(;;) { 187 if(buffer->pos == 0) { 188 pthread_cond_wait(&handler->available, &handler->queue_lock); 189 continue; 190 } else { 191 size_t n = buffer->pos; 192 size_t pos = 0; 193 while(n > 0) { 194 ssize_t w = write(handler->out, buffer->space + pos, n); 195 if(w <= 0) { 196 fprintf(stderr, "Error: output error\n"); 197 break; 198 } 199 n -= w; 200 pos += w; 201 } 202 if(n > 0) { 203 break; // error 204 } 205 buffer->pos = 0; 206 } 207 } 208 209 pthread_mutex_unlock(&handler->queue_lock); 210 211 return NULL; 212 } 213 214 #endif 215