Sat, 07 Mar 2026 15:29:11 +0100
add support for websockets in the reverse proxy
--- a/src/server/daemon/protocol.c Fri Mar 06 18:50:23 2026 +0100 +++ b/src/server/daemon/protocol.c Sat Mar 07 15:29:11 2026 +0100 @@ -340,7 +340,7 @@ if(ctlen && enc) { pblock_removekey(pb_key_transfer_encoding, rq->srvhdrs); } - if(!ctlen) { + if(!ctlen && rq->status_num != 101) { // set transfer-encoding header if(!enc) { pblock_kvinsert( @@ -370,8 +370,10 @@ cxBufferWrite("Connection: keep-alive\r\n", 1, 24, &writer->buf); pblock_kvinsert(pb_key_connection, "keep-alive", 10, rq->srvhdrs); } else { - cxBufferWrite("Connection: close\r\n", 1, 19, &writer->buf); - pblock_kvinsert(pb_key_connection, "close", 5, rq->srvhdrs); + if(!pblock_findkeyval(pb_key_connection, rq->srvhdrs)) { + cxBufferWrite("Connection: close\r\n", 1, 19, &writer->buf); + pblock_kvinsert(pb_key_connection, "close", 5, rq->srvhdrs); + } } // response header end
--- a/src/server/proxy/httpclient.c Fri Mar 06 18:50:23 2026 +0100 +++ b/src/server/proxy/httpclient.c Sat Mar 07 15:29:11 2026 +0100 @@ -243,7 +243,7 @@ } int http_client_process(HttpClient *client, Event *event) { - int ret = client_process(client, event); + int ret = client->stage != 2 ? client_process(client, event) : client_ws_process(client, event); if(ret && client->error == 0 && client->event.fn == NULL) { if(client_start_poll(client)) { client->error = 1; @@ -429,11 +429,18 @@ } // readiness notification if(ret == 0) { + client->transfer_buffer_pos = 0; + client->transfer_buffer_len = 0; if(client->ws_msg_ready && client->ws_msg_ready(client, client->ws_msg_ready_userdata)) { return 0; } - client->transfer_buffer_pos = 0; - client->transfer_buffer_len = 0; + + if(client->transfer_buffer_pos < client->transfer_buffer_len) { + // ws_msg_ready has added data to the transfer buffer -> flush buffer again + if(client_send_buf(client) && client->error) { + return 0; + } + } } // flush transfer2 buffer @@ -482,6 +489,10 @@ // sends the content of the transfer buffer to client->socketfd static int client_send_buf(HttpClient *client) { size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos; + if(nbytes == 0) { + return 0; + } + //printf("reqbuf:\n\n%.*s\n\n", (int)nbytes, client->transfer_buffer + client->transfer_buffer_pos); ssize_t w; while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) { client->transfer_buffer_pos += w;
--- a/src/server/safs/proxy.c Fri Mar 06 18:50:23 2026 +0100 +++ b/src/server/safs/proxy.c Sat Mar 07 15:29:11 2026 +0100 @@ -67,6 +67,14 @@ Event event; /* + * websocket read buffer + */ + char *read_buf; + size_t read_buf_alloc; + size_t read_buf_size; + size_t read_buf_pos; + + /* * Has the response started (proxy_response_start called) */ int response_started; @@ -80,6 +88,7 @@ static void proxy_unref(ProxyRequest *proxy) { if(--proxy->ref == 0) { http_client_free(proxy->client); + free(proxy->read_buf); free(proxy); } } @@ -87,6 +96,11 @@ static int proxy_response_start(HttpClient *client, int status, char *message, void *userdata) { ProxyRequest *proxy = userdata; + if(status == 101) { + pblock_fr("connection", proxy->response_header_rewrite, TRUE); + proxy->rq->rq_attr.keep_alive = 0; + } + HeaderArray *headers = client->response_headers; while(headers) { for(int i=0;i<headers->len;i++) { @@ -117,7 +131,7 @@ protocol_status(proxy->sn, proxy->rq, status, message); protocol_start_response(proxy->sn, proxy->rq); proxy->response_started = 1; - + return 0; } @@ -227,6 +241,75 @@ return ret; } +static ssize_t proxy_ws_write(HttpClient *client, void *buf, size_t nbytes, void *userdata) { + ProxyRequest *proxy = userdata; + ssize_t w = net_write(proxy->sn->csd, buf, nbytes); + if(w < 0) { + IOStream *stream = proxy->sn->csd; + return stream->io_errno == EWOULDBLOCK ? HTTP_CLIENT_CALLBACK_WOULD_BLOCK : HTTP_CLIENT_CALLBACK_ERROR; + } + return w; +} + +static int proxy_ws_msg_ready(HttpClient *client, void *userdata) { + ProxyRequest *proxy = userdata; + IOStream *st = proxy->sn->csd; + + if(proxy->read_buf == NULL) { + // proxy not initialized for websockets yet + + st->setmode(st, IO_MODE_RAW); + + proxy->read_buf_alloc = 1024*64; + proxy->read_buf = malloc(proxy->read_buf_alloc); + if(!proxy->read_buf) { + return 1; + } + + proxy->event.cookie = proxy; + proxy->event.fn = proxy_request_read_event; + proxy->event.finish = proxy_request_read_finished; + if(event_pollin(client->ev, st, &proxy->event)) { + return 1; + } + proxy->ref++; + } + + // is there still data in the read buffer, that needs to be transfered to + // the HttpClient message buffer? + while(proxy->read_buf_pos < proxy->read_buf_size) { + char *msg = proxy->read_buf + proxy->read_buf_pos; + size_t msglen = proxy->read_buf_size - proxy->read_buf_pos; + int n = http_client_add_message(client, msg, msglen); + if(n <= 0) { + return 0; // message buffer not ready + } + proxy->read_buf_pos += n; + } + // readbuf flushed, reset + proxy->read_buf_size = 0; + proxy->read_buf_pos = 0; + + while(proxy->read_buf_size < proxy->read_buf_alloc) { + ssize_t r = net_read(proxy->sn->csd, proxy->read_buf + proxy->read_buf_size, proxy->read_buf_alloc - proxy->read_buf_size); + if(r <= 0) { + if(proxy->read_buf_size > 0) { + // we have read some data, the next proxy_ws_msg_ready will + // transfer this to the message buffer + return proxy_ws_msg_ready(client, userdata); + } else { + // no data read, return + return st->io_errno == EWOULDBLOCK ? 0 : 1; // return 1 in case of an error + } + } + proxy->read_buf_size += r; + } + + // do this again, to add the now filled buffer to the HttpClient message buffer + // http_client_add_message or net_read should fail at some point + return proxy_ws_msg_ready(client, userdata); +} + int http_reverse_proxy_service(pblock *param, Session *sn, Request *rq) { EventHandler *ev = sn->ev; const char *method = pblock_findkeyval(pb_key_method, rq->reqpb); @@ -270,12 +353,12 @@ char srvhost_static[256]; char *srvhost; - if(srv_url.hostlen < 255) { + if(srv_url.hostlen + 10 < 256) { memcpy(srvhost_static, srv_url.host, srv_url.hostlen); srvhost_static[srv_url.hostlen] = 0; srvhost = srvhost_static; } else { - srvhost = pool_malloc(sn->pool, srv_url.hostlen + 1); + srvhost = pool_malloc(sn->pool, srv_url.hostlen + 10); if(!srvhost) { return REQ_ABORTED; } @@ -323,12 +406,16 @@ proxy->request_header_rewrite = pblock_create_pool(sn->pool, 16); proxy->response_header_rewrite = pblock_create_pool(sn->pool, 16); proxy->response_started = 0; + proxy->read_buf = NULL; + proxy->read_buf_alloc = 0; + proxy->read_buf_size = 0; + proxy->read_buf_pos = 0; proxy->ref = 1; // Some request/response headers should be removed or altered // An empty string means, the header should be removed pblock_nvinsert("host", "", proxy->request_header_rewrite); - pblock_nvinsert("connection", "", proxy->request_header_rewrite); + //pblock_nvinsert("connection", "", proxy->request_header_rewrite); pblock_nvinsert("transfer-encoding", "", proxy->request_header_rewrite); pblock_nvinsert("content-length", "", proxy->request_header_rewrite); pblock_nvinsert("server", "", proxy->response_header_rewrite); @@ -380,6 +467,19 @@ return REQ_ABORTED; } + // add host header + if(!((srv_url.scheme_num == WS_URI_HTTP && srv_url.port == 80) || + (srv_url.scheme_num == WS_URI_HTTPS && srv_url.port == 443))) + { + // we have reserved enough space for the port + srvhost[srv_url.hostlen] = ':'; + snprintf(srvhost + srv_url.hostlen, 8, "%d", (int)srv_url.port); + } + if(http_client_add_request_header(client, cx_mutstr("host"), cx_mutstr(srvhost))) { + http_client_free(client); + return REQ_ABORTED; + } + // add request headers to the client CxIterator i = pblock_iterator(rq->headers); cx_foreach(pb_entry*, entry, i) { @@ -434,6 +534,10 @@ client->response_body_write_userdata = proxy; client->response_finished = proxy_response_finished; client->response_finished_userdata = proxy; + client->ws_write = proxy_ws_write; + client->ws_write_userdata = proxy; + client->ws_msg_ready = proxy_ws_msg_ready; + client->ws_msg_ready_userdata = proxy; net_setnonblock(sn->csd, 1); if(http_client_start(client)) {