1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #ifndef _WIN32
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <unistd.h>
34
35 #include "message.h"
36
37 int uic_message_send_(UiMessageHandler *handler, cxstring msg) {
38 return handler->send(handler, msg);
39 }
40
41 UiMessageHandler* uic_simple_msg_handler(
int in,
int out, msg_received_callback callback) {
42 UiSimpleMessageHandler *handler = malloc(
sizeof(UiSimpleMessageHandler));
43 handler->handler.start = uic_simple_msg_handler_start;
44 handler->handler.stop = uic_simple_msg_handler_stop;
45 handler->handler.send = uic_simple_msg_handler_send;
46 handler->handler.callback = callback;
47 handler->in = in;
48 handler->out = out;
49 handler->outbuf = cxBufferCreate(
NULL,
4096,
NULL,
CX_BUFFER_FREE_CONTENTS |
CX_BUFFER_AUTO_EXTEND);
50 handler->stop =
0;
51 pthread_mutex_init(&handler->queue_lock,
NULL);
52 pthread_mutex_init(&handler->avlbl_lock,
NULL);
53 pthread_cond_init(&handler->available,
NULL);
54 return (UiMessageHandler*)handler;
55 }
56
57 int uic_simple_msg_handler_start(UiMessageHandler *handler) {
58 UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
59 if(pthread_create(&sh->in_thread,
NULL, uic_simple_msg_handler_in_thread, sh)) {
60 return 1;
61 }
62 if(pthread_create(&sh->out_thread,
NULL, uic_simple_msg_handler_out_thread, sh)) {
63 return 1;
64 }
65 return 0;
66 }
67
68 int uic_simple_msg_handler_stop(UiMessageHandler *handler) {
69 UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
70 pthread_mutex_lock(&sh->queue_lock);
71 sh->stop =
0;
72 pthread_cond_signal(&sh->available);
73 pthread_mutex_unlock(&sh->queue_lock);
74 close(sh->in);
75 sh->in =
-1;
76
77 pthread_join(sh->in_thread,
NULL);
78 pthread_join(sh->out_thread,
NULL);
79
80 return 0;
81 }
82
83 int uic_simple_msg_handler_send(UiMessageHandler *handler, cxstring msg) {
84 UiSimpleMessageHandler *sh = (UiSimpleMessageHandler*)handler;
85 pthread_mutex_lock(&sh->queue_lock);
86 char header[
32];
87 snprintf(header,
32,
"%zu\n", msg.length);
88 cxBufferPutString(sh->outbuf, header);
89 cxBufferWrite(msg.ptr,
1, msg.length, sh->outbuf);
90 pthread_cond_signal(&sh->available);
91 pthread_mutex_unlock(&sh->queue_lock);
92 return 0;
93 }
94
95 #define HEADERBUF_SIZE 64
96
97 void* uic_simple_msg_handler_in_thread(
void *data) {
98 UiSimpleMessageHandler *handler = data;
99
100 char *msg =
NULL;
101 size_t msg_size =
0;
102 size_t msg_pos =
0;
103
104 char headerbuf[
HEADERBUF_SIZE];
105 size_t headerpos =
0;
106
107 char buf[
2048];
108 ssize_t r;
109 while((r = read(handler->in, buf,
2024)) >
0) {
110 char *buffer = buf;
111 size_t available = r;
112
113 while(available >
0) {
114 if(msg) {
115
116 size_t need = msg_size - msg_pos;
117 size_t cplen = r > need ? need : available;
118 memcpy(msg+msg_pos, buffer, cplen);
119 buffer += cplen;
120 available -= cplen;
121 msg_pos += cplen;
122 if(msg_pos == msg_size) {
123
124
125 if(handler->handler.callback) {
126 handler->handler.callback(cx_strn(msg, msg_size));
127 }
128 free(msg);
129 msg =
NULL;
130 msg_size =
0;
131 msg_pos =
0;
132 }
133 }
else {
134 size_t header_max =
HEADERBUF_SIZE - headerpos -
1;
135 if(header_max > available) {
136 header_max = available;
137 }
138
139 int i;
140 int header_complete =
0;
141 for(i=
0;i<header_max;i++) {
142 if(buffer[i] ==
'\n') {
143 header_complete =
1;
144 break;
145 }
146 }
147 i++;
148 memcpy(headerbuf+headerpos, buffer, i);
149 headerpos += i;
150 buffer += i;
151 available -= i;
152
153 if(header_complete) {
154 headerbuf[headerpos
-1] =
0;
155 char *end;
156 long length = strtol(headerbuf, &end,
10);
157 if(*end ==
'\0') {
158
159 msg = malloc(length);
160 msg_size = length;
161 headerpos =
0;
162 }
else {
163 fprintf(stderr,
"Error: invalid message {%s}\n", headerbuf);
164 }
165 }
else if(headerpos
+1 >=
HEADERBUF_SIZE) {
166 fprintf(stderr,
"Error: message header too big\n");
167 exit(
-1);
168 }
169 }
170 }
171
172
173 }
174 perror(
"error");
175 fprintf(stderr,
"stop simple_msg_handler_in_thread\n");
176
177 return NULL;
178 }
179
180 void* uic_simple_msg_handler_out_thread(
void *data) {
181 UiSimpleMessageHandler *handler = data;
182 CxBuffer *buffer = handler->outbuf;
183
184 pthread_mutex_lock(&handler->queue_lock);
185
186 for(;;) {
187 if(buffer->pos ==
0) {
188 pthread_cond_wait(&handler->available, &handler->queue_lock);
189 continue;
190 }
else {
191 size_t n = buffer->pos;
192 size_t pos =
0;
193 while(n >
0) {
194 ssize_t w = write(handler->out, buffer->space + pos, n);
195 if(w <=
0) {
196 fprintf(stderr,
"Error: output error\n");
197 break;
198 }
199 n -= w;
200 pos += w;
201 }
202 if(n >
0) {
203 break;
204 }
205 buffer->pos =
0;
206 }
207 }
208
209 pthread_mutex_unlock(&handler->queue_lock);
210
211 return NULL;
212 }
213
214 #endif
215