# HG changeset patch # User Olaf Wintermann # Date 1771755527 -3600 # Node ID fea7c3d74cc6c6e9db5656fa4e44caf9d60efbf3 # Parent 3ddfd45d4e4738297171bd6a04dee2422dc083c4 prepare httpclient for websockets diff -r 3ddfd45d4e47 -r fea7c3d74cc6 src/server/proxy/httpclient.c --- a/src/server/proxy/httpclient.c Sun Feb 22 10:05:37 2026 +0100 +++ b/src/server/proxy/httpclient.c Sun Feb 22 11:18:47 2026 +0100 @@ -41,7 +41,7 @@ static int client_io(EventHandler *ev, Event *event); static int client_finished(EventHandler *ev, Event *event); -static int client_send_request(HttpClient *client); +static int client_send_buf(HttpClient *client); static int client_send_request_body(HttpClient *client); static int client_read_response_header(HttpClient *client); static int client_read_response_body(HttpClient *client); @@ -89,6 +89,8 @@ client->stream->st.free(&client->stream->st); } free(client->buffer.inbuf); + free(client->transfer_buffer); + free(client->transfer2_buffer); free(client->addr); free(client->method); free(client->uri); @@ -217,6 +219,26 @@ return ret; } +int http_client_process(HttpClient *client) { + return client_io(client->ev, &client->event); +} + +size_t http_client_message_buf_size_available(HttpClient *client) { + return client->transfer_buffer_alloc - client->transfer_buffer_len; +} + +int http_client_add_message(HttpClient *client, const void *buf, size_t size) { + size_t available = http_client_message_buf_size_available(client); + if(available == 0) { + return HTTP_CLIENT_CALLBACK_WOULD_BLOCK; + } + if(size > available) { + size = available; + } + memcpy(client->transfer_buffer + client->transfer_buffer_len, buf, size); + return size; +} + static int create_req_buffer(HttpClient *client) { CxBuffer buf; if(cxBufferInit(&buf, cxDefaultAllocator, NULL, HTTP_CLIENT_BUFFER_SIZE, CX_BUFFER_AUTO_EXTEND)) { @@ -264,7 +286,7 @@ HttpClient *client = event->cookie; if(client->stage == 0) { if(client->transfer_buffer_pos < client->transfer_buffer_len) { - if(client_send_request(client)) { + if(client_send_buf(client)) { return client->error == 0; } } @@ -275,9 +297,6 @@ return client->error == 0; } } - - client->transfer_buffer_pos = 0; - client->transfer_buffer_len = 0; } // writing complete, switch to read events @@ -287,7 +306,16 @@ if(client_read_response_header(client)) { return client->error == 0; } + int ret = 0; + if(client->stage == 2) { + // websocket: write message buffer + ret = client_send_buf(client); + } if(client_read_response_body(client)) { + ret = 1; + } + + if(ret) { return client->error == 0; } @@ -308,7 +336,8 @@ return 0; } -static int client_send_request(HttpClient *client) { +// sends the content of the transfer buffer to client->socketfd +static int client_send_buf(HttpClient *client) { size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos; ssize_t w; while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) { @@ -387,7 +416,7 @@ client->req_contentlength_pos += r; client->transfer_buffer_pos = startpos; client->transfer_buffer_len = rbody_buf_offset + r; - if(client_send_request(client)) { + if(client_send_buf(client)) { return 1; } } @@ -398,7 +427,7 @@ client->transfer_buffer_pos = 0; client->transfer_buffer_len = 5; client->request_body_terminated = 1; - if(client_send_request(client)) { + if(client_send_buf(client)) { return 1; } @@ -479,19 +508,29 @@ HeaderArray *headers = client->parser->headers; long long contentlength = 0; int chunkedtransferenc = 0; + cxmutstr hdr_connection = CX_NULLSTR; + cxmutstr hdr_upgrade = CX_NULLSTR; while(headers) { for(int i=0;ilen;i++) { - if(!cx_strcasecmp(headers->headers[i].name, "content-length")) { - if(!cx_strtoll(headers->headers[i].value, &contentlength, 10)) { + cxmutstr header = headers->headers[i].name; + cxmutstr hvalue = headers->headers[i].value; + if(!cx_strcasecmp(header, "content-length")) { + if(!cx_strtoll(hvalue, &contentlength, 10)) { headers = NULL; break; } - } else if(!cx_strcasecmp(headers->headers[i].name, "transfer-encoding")) { - if(!cx_strcmp(headers->headers[i].value, "chunked")) { + } else if(!cx_strcasecmp(header, "transfer-encoding")) { + if(!cx_strcmp(hvalue, "chunked")) { chunkedtransferenc = 1; headers = NULL; break; } + } else if(!cx_strcasecmp(header, "connection")) { + hdr_connection = hvalue; + } + + if(client->statuscode == 101 && !cx_strcasecmp(header, "upgrade")) { + hdr_upgrade = hvalue; } } @@ -500,7 +539,30 @@ } } - if(contentlength > 0 || chunkedtransferenc) { + if(client->statuscode == 101) { + if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) { + client->stage = 2; + client->event.events = EVENT_POLLIN|EVENT_POLLOUT; + + // prepare IO buffers for websockets + client->transfer_buffer_len = 0; + client->transfer_buffer_pos = 0; + + client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE); + if(!client->transfer2_buffer) { + client->error = 1; + return 1; + } + client->transfer2_buffer_alloc = HTTP_CLIENT_BUFFER_SIZE; + client->transfer2_buffer_len = 0; + client->transfer2_buffer_pos = 0; + } else { + // error: unknown protocol + log_ereport(LOG_FAILURE, "http-client: unknown protocol upgrade: %.*s", (int)hdr_upgrade.length, hdr_upgrade.ptr); + client->error = 1; + return 1; + } + } else if(contentlength > 0 || chunkedtransferenc) { IOStream *fd = Sysstream_new(NULL, client->socketfd); if(!fd) { client->error = 1; @@ -510,6 +572,15 @@ if(!http) { fd->free(fd); } + + // we can reuse the already allocated transfer_bufer for transfer2 + client->transfer2_buffer = client->transfer_buffer; + client->transfer2_buffer_alloc = client->transfer_buffer_alloc; + client->transfer2_buffer_len = 0; + client->transfer_buffer_pos = 0; + client->transfer_buffer = NULL; + client->transfer_buffer_alloc = 0; + if(contentlength > 0) { http->max_read = contentlength; httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); @@ -527,16 +598,16 @@ // returns 0 success // 1 would block or error static int client_write_response(HttpClient *client) { - while(client->transfer_buffer_pos < client->transfer_buffer_len) { - char *buf = client->transfer_buffer + client->transfer_buffer_pos; - size_t len = client->transfer_buffer_len - client->transfer_buffer_pos; + while(client->transfer2_buffer_pos < client->transfer2_buffer_len) { + char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; + size_t len = client->transfer2_buffer_len - client->transfer2_buffer_pos; int ret = client->response_body_write(client, buf, len, client->response_body_write_userdata); if(ret > 0) { - client->transfer_buffer_pos += ret; + client->transfer2_buffer_pos += ret; } else if(ret == 0) { // EOF? // check if the write is incomplete, which would be an error - client->error == client->transfer_buffer_pos < client->transfer_buffer_len; + client->error == client->transfer2_buffer_pos < client->transfer2_buffer_len; return client->error; } else { if(ret != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { @@ -560,13 +631,13 @@ return 1; } - char *buf = client->transfer_buffer; - size_t nbytes = client->transfer_buffer_alloc; + char *buf = client->transfer2_buffer; + size_t nbytes = client->transfer2_buffer_alloc; ssize_t r; while((r = net_read(&client->stream->st, buf, nbytes)) > 0) { - client->transfer_buffer_len = r; - client->transfer_buffer_pos = 0; + client->transfer2_buffer_len = r; + client->transfer2_buffer_pos = 0; if(client->response_body_write) { if(client_write_response(client)) { return 1; @@ -613,7 +684,7 @@ // test client_send_request - int ret = client_send_request(client); + int ret = client_send_buf(client); // It is very likely that the first client_send_request call doesn't // fully write the request buffer to the socket // In that case it returns 1 but without the error flag @@ -630,7 +701,7 @@ ssize_t r = read(sock, tmpbuf, 1024); CX_TEST_ASSERT(r >= 0); cxBufferWrite(tmpbuf, 1, r, &buf); - ret = client_send_request(client); + ret = client_send_buf(client); CX_TEST_ASSERT(ret == 0 || (ret == 1 && !client->error)); writes++; diff -r 3ddfd45d4e47 -r fea7c3d74cc6 src/server/proxy/httpclient.h --- a/src/server/proxy/httpclient.h Sun Feb 22 10:05:37 2026 +0100 +++ b/src/server/proxy/httpclient.h Sun Feb 22 11:18:47 2026 +0100 @@ -110,6 +110,26 @@ void *response_body_write_userdata; /* + * Websocket write callback function + * + * ssize_t ws_write(HttpClient *client, void *buf, size_t size, void *userdata) + * + * Return: number of processed bytes, + * HTTP_CLIENT_CALLBACK_WOULD_BLOCK or HTTP_CLIENT_CALLBACK_ERROR. + */ + ssize_t (*ws_write)(HttpClient *, void *, size_t, void *); + void *ws_write_userdata; + + /* + * Websocket message IO available + * + * This function is called, when client->socketfd is ready to accept + * new messages (http_client_add_message) + */ + int (*ws_msg_ready)(HttpClient *, void *); + void *ws_msg_ready_userdata; + + /* * Response finished callback * * After this callback, the client object is no longer used. The callback @@ -125,17 +145,25 @@ HttpParser *parser; netbuf buffer; + // transfer_buffer: buffer for sending data to socketfd char *transfer_buffer; size_t transfer_buffer_alloc; size_t transfer_buffer_len; size_t transfer_buffer_pos; + // transfer2_buffer: buffer for response_body_write or ws_write + char *transfer2_buffer; + size_t transfer2_buffer_alloc; + size_t transfer2_buffer_len; + size_t transfer2_buffer_pos; + size_t req_contentlength_pos; - int stage; // 0: request, 1: response + int stage; // 0: request, 1: response, 2: websocket int request_body_complete; int request_body_terminated; int response_header_complete; + int keep_alive; Event event; }; @@ -180,8 +208,24 @@ */ int http_client_enable_chunked_transfer_encoding(HttpClient *client); +/* + * Start request processing + */ int http_client_start(HttpClient *client); +/* + * Handle HttpClient IO and process the request/response + */ +int http_client_process(HttpClient *client); + +/* + * Adds message data, that will be sent to client->socketfd. This function + * should only be called when processing websockets. + */ +int http_client_add_message(HttpClient *client, const void *buf, size_t size); + +size_t http_client_message_buf_size_available(HttpClient *client); + void http_client_add_tests(CxTestSuite *suite);