Thu, 05 Mar 2026 21:06:41 +0100
implement httpclient websocket IO
| src/server/proxy/httpclient.c | file | annotate | diff | comparison | revisions | |
| src/server/proxy/httpclient.h | file | annotate | diff | comparison | revisions |
--- a/src/server/proxy/httpclient.c Wed Mar 04 22:05:07 2026 +0100 +++ b/src/server/proxy/httpclient.c Thu Mar 05 21:06:41 2026 +0100 @@ -39,7 +39,9 @@ static int client_connected(EventHandler *ev, Event *event); static int client_io(EventHandler *ev, Event *event); +static int client_ws_io(EventHandler *ev, Event *event); static int client_process(HttpClient *client, Event *event); +static int client_ws_process(HttpClient *client, Event *event); static int client_finished(EventHandler *ev, Event *event); static int client_send_buf(HttpClient *client); @@ -318,6 +320,11 @@ return client_process(client, event); } +static int client_ws_io(EventHandler *ev, Event *event) { + HttpClient *client = event->cookie; + return client_ws_process(client, event); +} + static int client_process(HttpClient *client, Event *event) { client->last_event = event; if(client->stage < 0) { @@ -364,8 +371,7 @@ } int ret = 0; if(client->stage == 2) { - // websocket: write message buffer - ret = client_send_buf(client); + return client_ws_process(client, event); } if(client_read_response_body(client)) { ret = 1; @@ -378,6 +384,59 @@ return 0; } +static int client_ws_process(HttpClient *client, Event *event) { + // send available data from the transfer buffer + int ret = client_send_buf(client); + if(client->error) { + return 0; + } + // readiness notification + if(ret == 0 && client->ws_msg_ready) { + if(client->ws_msg_ready(client, client->ws_msg_ready_userdata)) { + return 0; + } + } + + // read message + char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; + size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; + ssize_t r = -1; + while(available > 0) { + ssize_t r = net_read(client->stream, buf, available); + if(r <= 0) { + break; + } + client->transfer2_buffer_len += r; + if(client->ws_write) { + char *out = client->transfer2_buffer + client->transfer2_buffer_pos; + size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; + while(nbytes > 0) { + ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata); + if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { + break; + } else if(w <= 0) { + client->error = 1; + return 0; + } + client->transfer2_buffer_pos += w; + + // adjust buffer + out = client->transfer2_buffer + client->transfer2_buffer_pos; + nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; + } + } else { + // noop + client->transfer2_buffer_pos = client->transfer2_buffer_len; + } + + // adjust buffer + buf = client->transfer2_buffer + client->transfer2_buffer_pos; + available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; + } + + return r == 0 || client->error ? 0 : 1; +} + static int client_finished(EventHandler *ev, Event *event) { HttpClient *client = event->cookie; @@ -600,11 +659,14 @@ if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) { client->stage = 2; client->event.events = EVENT_POLLIN|EVENT_POLLOUT; + client->event.fn = client_ws_io; // prepare IO buffers for websockets + // transfer_buffer is used for outgoing traffic client->transfer_buffer_len = 0; client->transfer_buffer_pos = 0; + // transfer2_buffer is used for reading client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE); if(!client->transfer2_buffer) { client->error = 1;
--- a/src/server/proxy/httpclient.h Wed Mar 04 22:05:07 2026 +0100 +++ b/src/server/proxy/httpclient.h Thu Mar 05 21:06:41 2026 +0100 @@ -126,6 +126,8 @@ * * This function is called, when client->socketfd is ready to accept * new messages (http_client_add_message) + * + * If the callback returns 1, the websocket connection is terminated. */ int (*ws_msg_ready)(HttpClient *, void *); void *ws_msg_ready_userdata;