add support for websockets in the reverse proxy

Sat, 07 Mar 2026 15:29:11 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sat, 07 Mar 2026 15:29:11 +0100
changeset 720
8c7d08d3be2e
parent 719
c4c2b8e8ddc5
child 721
482f4c153636

add support for websockets in the reverse proxy

src/server/daemon/protocol.c file | annotate | diff | comparison | revisions
src/server/proxy/httpclient.c file | annotate | diff | comparison | revisions
src/server/safs/proxy.c file | annotate | diff | comparison | revisions
--- a/src/server/daemon/protocol.c	Fri Mar 06 18:50:23 2026 +0100
+++ b/src/server/daemon/protocol.c	Sat Mar 07 15:29:11 2026 +0100
@@ -340,7 +340,7 @@
     if(ctlen && enc) {
         pblock_removekey(pb_key_transfer_encoding, rq->srvhdrs);
     }
-    if(!ctlen) {
+    if(!ctlen && rq->status_num != 101) {
         // set transfer-encoding header
         if(!enc) {
             pblock_kvinsert(
@@ -370,8 +370,10 @@
         cxBufferWrite("Connection: keep-alive\r\n", 1, 24, &writer->buf);
         pblock_kvinsert(pb_key_connection, "keep-alive", 10, rq->srvhdrs);
     } else {
-        cxBufferWrite("Connection: close\r\n", 1, 19, &writer->buf);
-        pblock_kvinsert(pb_key_connection, "close", 5, rq->srvhdrs);
+        if(!pblock_findkeyval(pb_key_connection, rq->srvhdrs)) {
+            cxBufferWrite("Connection: close\r\n", 1, 19, &writer->buf);
+            pblock_kvinsert(pb_key_connection, "close", 5, rq->srvhdrs);
+        }
     }
 
     // response header end
--- a/src/server/proxy/httpclient.c	Fri Mar 06 18:50:23 2026 +0100
+++ b/src/server/proxy/httpclient.c	Sat Mar 07 15:29:11 2026 +0100
@@ -243,7 +243,7 @@
 }
 
 int http_client_process(HttpClient *client, Event *event) {
-    int ret = client_process(client, event);
+    int ret = client->stage != 2 ? client_process(client, event) : client_ws_process(client, event);
     if(ret && client->error == 0 && client->event.fn == NULL) {
         if(client_start_poll(client)) {
             client->error = 1;
@@ -429,11 +429,18 @@
     }
     // readiness notification
     if(ret == 0) {
+        client->transfer_buffer_pos = 0;
+        client->transfer_buffer_len = 0;
         if(client->ws_msg_ready && client->ws_msg_ready(client, client->ws_msg_ready_userdata)) {
             return 0;
         }
-        client->transfer_buffer_pos = 0;
-        client->transfer_buffer_len = 0;
+        
+        if(client->transfer_buffer_pos < client->transfer_buffer_len) {
+            // ws_msg_ready has added data to the transfer buffer -> flush buffer again
+            if(client_send_buf(client) && client->error) {
+                return 0;
+            }
+        }
     }
     
     // flush transfer2 buffer
@@ -482,6 +489,10 @@
 // sends the content of the transfer buffer to client->socketfd
 static int client_send_buf(HttpClient *client) {
     size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos;
+    if(nbytes == 0) {
+        return 0;
+    }
+    //printf("reqbuf:\n\n%.*s\n\n", (int)nbytes, client->transfer_buffer + client->transfer_buffer_pos);
     ssize_t w;
     while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) {
         client->transfer_buffer_pos += w;
--- a/src/server/safs/proxy.c	Fri Mar 06 18:50:23 2026 +0100
+++ b/src/server/safs/proxy.c	Sat Mar 07 15:29:11 2026 +0100
@@ -67,6 +67,14 @@
     Event event;
     
     /*
+     * websocket read buffer
+     */
+    char *read_buf;
+    size_t read_buf_alloc;
+    size_t read_buf_size;
+    size_t read_buf_pos;
+    
+    /*
      * Has the response started (proxy_response_start called)
      */
     int response_started;
@@ -80,6 +88,7 @@
 static void proxy_unref(ProxyRequest *proxy) {
     if(--proxy->ref == 0) {
         http_client_free(proxy->client);
+        free(proxy->read_buf);
         free(proxy);
     }
 }
@@ -87,6 +96,11 @@
 static int proxy_response_start(HttpClient *client, int status, char *message, void *userdata) {
     ProxyRequest *proxy = userdata;
     
+    if(status == 101) {
+        pblock_fr("connection", proxy->response_header_rewrite, TRUE);
+        proxy->rq->rq_attr.keep_alive = 0;
+    }
+    
     HeaderArray *headers = client->response_headers;
     while(headers) {
         for(int i=0;i<headers->len;i++) {
@@ -117,7 +131,7 @@
     protocol_status(proxy->sn, proxy->rq, status, message);
     protocol_start_response(proxy->sn, proxy->rq);
     proxy->response_started = 1;
-    
+      
     return 0;
 }
 
@@ -227,6 +241,75 @@
     return ret;
 }
 
+static ssize_t proxy_ws_write(HttpClient *client, void *buf, size_t nbytes, void *userdata) {
+    ProxyRequest *proxy = userdata;
+    ssize_t w = net_write(proxy->sn->csd, buf, nbytes);
+    if(w < 0) {
+        IOStream *stream = proxy->sn->csd;
+        return stream->io_errno == EWOULDBLOCK ? HTTP_CLIENT_CALLBACK_WOULD_BLOCK : HTTP_CLIENT_CALLBACK_ERROR;
+    }
+    return w;
+}
+
+static int proxy_ws_msg_ready(HttpClient *client, void *userdata) {
+    ProxyRequest *proxy = userdata;
+    IOStream *st = proxy->sn->csd;
+    
+    if(proxy->read_buf == NULL) {
+        // proxy not initialized for websockets yet
+        
+        st->setmode(st, IO_MODE_RAW);
+        
+        proxy->read_buf_alloc = 1024*64;
+        proxy->read_buf = malloc(proxy->read_buf_alloc);
+        if(!proxy->read_buf) {
+            return 1;
+        }
+        
+        proxy->event.cookie = proxy;
+        proxy->event.fn = proxy_request_read_event;
+        proxy->event.finish = proxy_request_read_finished;
+        if(event_pollin(client->ev, st, &proxy->event)) {
+            return 1;
+        }
+        proxy->ref++;
+    }
+    
+    // is there still data in the read buffer, that needs to be transfered to
+    // the HttpClient message buffer?
+    while(proxy->read_buf_pos < proxy->read_buf_size) {
+        char *msg = proxy->read_buf + proxy->read_buf_pos;
+        size_t msglen = proxy->read_buf_size - proxy->read_buf_pos;
+        int n = http_client_add_message(client, msg, msglen);
+        if(n <= 0) {
+            return 0; // message buffer not ready
+        }
+        proxy->read_buf_pos += n;
+    }
+    // readbuf flushed, reset
+    proxy->read_buf_size = 0;
+    proxy->read_buf_pos = 0;
+    
+    while(proxy->read_buf_size < proxy->read_buf_alloc) {
+        ssize_t r = net_read(proxy->sn->csd, proxy->read_buf + proxy->read_buf_size, proxy->read_buf_alloc - proxy->read_buf_size);
+        if(r <= 0) {
+            if(proxy->read_buf_size > 0) {
+                // we have read some data, the next proxy_ws_msg_ready will
+                // transfer this to the message buffer
+                return proxy_ws_msg_ready(client, userdata);
+            } else {
+                // no data read, return
+                return st->io_errno == EWOULDBLOCK ? 0 : 1; // return 1 in case of an error
+            }
+        }
+        proxy->read_buf_size += r;
+    }
+    
+    // do this again, to add the now filled buffer to the HttpClient message buffer
+    // http_client_add_message or net_read should fail at some point
+    return proxy_ws_msg_ready(client, userdata);
+}
+
 int http_reverse_proxy_service(pblock *param, Session *sn, Request *rq) {
     EventHandler *ev = sn->ev;
     const char *method = pblock_findkeyval(pb_key_method, rq->reqpb);
@@ -270,12 +353,12 @@
     
     char srvhost_static[256];
     char *srvhost;
-    if(srv_url.hostlen < 255) {
+    if(srv_url.hostlen + 10 < 256) {
         memcpy(srvhost_static, srv_url.host, srv_url.hostlen);
         srvhost_static[srv_url.hostlen] = 0;
         srvhost = srvhost_static;
     } else {
-        srvhost = pool_malloc(sn->pool, srv_url.hostlen + 1);
+        srvhost = pool_malloc(sn->pool, srv_url.hostlen + 10);
         if(!srvhost) {
             return REQ_ABORTED;
         }
@@ -323,12 +406,16 @@
     proxy->request_header_rewrite = pblock_create_pool(sn->pool, 16);
     proxy->response_header_rewrite = pblock_create_pool(sn->pool, 16);
     proxy->response_started = 0;
+    proxy->read_buf = NULL;
+    proxy->read_buf_alloc = 0;
+    proxy->read_buf_size = 0;
+    proxy->read_buf_pos = 0;
     proxy->ref = 1;
     
     // Some request/response headers should be removed or altered
     // An empty string means, the header should be removed
     pblock_nvinsert("host", "", proxy->request_header_rewrite);
-    pblock_nvinsert("connection", "", proxy->request_header_rewrite);
+    //pblock_nvinsert("connection", "", proxy->request_header_rewrite);
     pblock_nvinsert("transfer-encoding", "", proxy->request_header_rewrite);
     pblock_nvinsert("content-length", "", proxy->request_header_rewrite);
     pblock_nvinsert("server", "", proxy->response_header_rewrite);
@@ -380,6 +467,19 @@
         return REQ_ABORTED;
     }
     
+    // add host header
+    if(!((srv_url.scheme_num == WS_URI_HTTP && srv_url.port == 80) || 
+         (srv_url.scheme_num == WS_URI_HTTPS && srv_url.port == 443)))
+    {
+        // we have reserved enough space for the port
+        srvhost[srv_url.hostlen] = ':';
+        snprintf(srvhost + srv_url.hostlen, 8, "%d", (int)srv_url.port);
+    }
+    if(http_client_add_request_header(client, cx_mutstr("host"), cx_mutstr(srvhost))) {
+        http_client_free(client);
+        return REQ_ABORTED;
+    }
+    
     // add request headers to the client
     CxIterator i = pblock_iterator(rq->headers);
     cx_foreach(pb_entry*, entry, i) {
@@ -434,6 +534,10 @@
     client->response_body_write_userdata = proxy;
     client->response_finished = proxy_response_finished;
     client->response_finished_userdata = proxy;
+    client->ws_write = proxy_ws_write;
+    client->ws_write_userdata = proxy;
+    client->ws_msg_ready = proxy_ws_msg_ready;
+    client->ws_msg_ready_userdata = proxy;
     
     net_setnonblock(sn->csd, 1);
     if(http_client_start(client)) {

mercurial