#ifndef _WIN32
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "message.h"
int uic_message_send_(UiMessageHandler *handler, cxstring msg) {
return handler->send(handler, msg);
}
UiMessageHandler* uic_simple_msg_handler(
int in,
int out, msg_received_callback callback) {
UiSimpleMessageHandler *handler = malloc(
sizeof(UiSimpleMessageHandler));
handler->handler.start = uic_simple_msg_handler_start;
handler->handler.stop = uic_simple_msg_handler_stop;
handler->handler.send = uic_simple_msg_handler_send;
handler->handler.callback = callback;
handler->in = in;
handler->out = out;
handler->outbuf = cxBufferCreate(
NULL,
4096,
NULL,
CX_BUFFER_FREE_CONTENTS |
CX_BUFFER_AUTO_EXTEND);
handler->stop =
0;
pthread_mutex_init(&handler->queue_lock,
NULL);
pthread_mutex_init(&handler->avlbl_lock,
NULL);
pthread_cond_init(&handler->available,
NULL);
return (UiMessageHandler*)handler;
}
int uic_simple_msg_handler_start(UiMessageHandler *handler) {
UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
if(pthread_create(&sh->in_thread,
NULL, uic_simple_msg_handler_in_thread, sh)) {
return 1;
}
if(pthread_create(&sh->out_thread,
NULL, uic_simple_msg_handler_out_thread, sh)) {
return 1;
}
return 0;
}
int uic_simple_msg_handler_stop(UiMessageHandler *handler) {
UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
pthread_mutex_lock(&sh->queue_lock);
sh->stop =
0;
pthread_cond_signal(&sh->available);
pthread_mutex_unlock(&sh->queue_lock);
close(sh->in);
sh->in =
-1;
pthread_join(sh->in_thread,
NULL);
pthread_join(sh->out_thread,
NULL);
return 0;
}
int uic_simple_msg_handler_send(UiMessageHandler *handler, cxstring msg) {
UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
pthread_mutex_lock(&sh->queue_lock);
char header[
32];
snprintf(header,
32,
"%zu\n", msg.length);
cxBufferPutString(sh->outbuf, header);
cxBufferWrite(msg.ptr,
1, msg.length, sh->outbuf);
pthread_cond_signal(&sh->available);
pthread_mutex_unlock(&sh->queue_lock);
return 0;
}
#define HEADERBUF_SIZE 64
void* uic_simple_msg_handler_in_thread(
void *data) {
UiSimpleMessageHandler *handler = data;
char *msg =
NULL;
size_t msg_size =
0;
size_t msg_pos =
0;
char headerbuf[
HEADERBUF_SIZE];
size_t headerpos =
0;
char buf[
2048];
ssize_t r;
while((r = read(handler->in, buf,
2024)) >
0) {
char *buffer = buf;
size_t available = r;
while(available >
0) {
if(msg) {
size_t need = msg_size - msg_pos;
size_t cplen = r > need ? need : available;
memcpy(msg+msg_pos, buffer, cplen);
buffer += cplen;
available -= cplen;
msg_pos += cplen;
if(msg_pos == msg_size) {
if(handler->handler.callback) {
handler->handler.callback(cx_strn(msg, msg_size));
}
free(msg);
msg =
NULL;
msg_size =
0;
msg_pos =
0;
}
}
else {
size_t header_max =
HEADERBUF_SIZE - headerpos -
1;
if(header_max > available) {
header_max = available;
}
int i;
int header_complete =
0;
for(i=
0;i<header_max;i++) {
if(buffer[i] ==
'\n') {
header_complete =
1;
break;
}
}
i++;
memcpy(headerbuf+headerpos, buffer, i);
headerpos += i;
buffer += i;
available -= i;
if(header_complete) {
headerbuf[headerpos
-1] =
0;
char *end;
long length = strtol(headerbuf, &end,
10);
if(*end ==
'\0') {
msg = malloc(length);
msg_size = length;
headerpos =
0;
}
else {
fprintf(stderr,
"Error: invalid message {%s}\n", headerbuf);
}
}
else if(headerpos
+1 >=
HEADERBUF_SIZE) {
fprintf(stderr,
"Error: message header too big\n");
exit(
-1);
}
}
}
}
perror(
"error");
fprintf(stderr,
"stop simple_msg_handler_in_thread\n");
return NULL;
}
void* uic_simple_msg_handler_out_thread(
void *data) {
UiSimpleMessageHandler *handler = data;
CxBuffer *buffer = handler->outbuf;
pthread_mutex_lock(&handler->queue_lock);
for(;;) {
if(buffer->pos ==
0) {
pthread_cond_wait(&handler->available, &handler->queue_lock);
continue;
}
else {
size_t n = buffer->pos;
size_t pos =
0;
while(n >
0) {
ssize_t w = write(handler->out, buffer->space + pos, n);
if(w <=
0) {
fprintf(stderr,
"Error: output error\n");
break;
}
n -= w;
pos += w;
}
if(n >
0) {
break;
}
buffer->pos =
0;
}
}
pthread_mutex_unlock(&handler->queue_lock);
return NULL;
}
#endif