prepare httpclient for websockets

Sun, 22 Feb 2026 11:18:47 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sun, 22 Feb 2026 11:18:47 +0100
changeset 698
fea7c3d74cc6
parent 697
3ddfd45d4e47
child 699
d794871da099

prepare httpclient for websockets

src/server/proxy/httpclient.c file | annotate | diff | comparison | revisions
src/server/proxy/httpclient.h file | annotate | diff | comparison | revisions
--- a/src/server/proxy/httpclient.c	Sun Feb 22 10:05:37 2026 +0100
+++ b/src/server/proxy/httpclient.c	Sun Feb 22 11:18:47 2026 +0100
@@ -41,7 +41,7 @@
 static int client_io(EventHandler *ev, Event *event);
 static int client_finished(EventHandler *ev, Event *event);
 
-static int client_send_request(HttpClient *client);
+static int client_send_buf(HttpClient *client);
 static int client_send_request_body(HttpClient *client);
 static int client_read_response_header(HttpClient *client);
 static int client_read_response_body(HttpClient *client);
@@ -89,6 +89,8 @@
         client->stream->st.free(&client->stream->st);
     }
     free(client->buffer.inbuf);
+    free(client->transfer_buffer);
+    free(client->transfer2_buffer);
     free(client->addr);
     free(client->method);
     free(client->uri);
@@ -217,6 +219,26 @@
     return ret;
 }
 
+int http_client_process(HttpClient *client) {
+    return client_io(client->ev, &client->event);
+}
+
+size_t http_client_message_buf_size_available(HttpClient *client) {
+    return client->transfer_buffer_alloc - client->transfer_buffer_len;
+}
+
+int http_client_add_message(HttpClient *client, const void *buf, size_t size) {
+    size_t available = http_client_message_buf_size_available(client);
+    if(available == 0) {
+        return HTTP_CLIENT_CALLBACK_WOULD_BLOCK;
+    }
+    if(size > available) {
+        size = available;
+    }
+    memcpy(client->transfer_buffer + client->transfer_buffer_len, buf, size);
+    return size;
+}
+
 static int create_req_buffer(HttpClient *client) {
     CxBuffer buf;
     if(cxBufferInit(&buf, cxDefaultAllocator, NULL, HTTP_CLIENT_BUFFER_SIZE, CX_BUFFER_AUTO_EXTEND)) {
@@ -264,7 +286,7 @@
     HttpClient *client = event->cookie;
     if(client->stage == 0) {
         if(client->transfer_buffer_pos < client->transfer_buffer_len) {
-            if(client_send_request(client)) {
+            if(client_send_buf(client)) {
                 return client->error == 0;
             }
         }
@@ -275,9 +297,6 @@
                 return client->error == 0;
             }
         }
-        
-        client->transfer_buffer_pos = 0;
-        client->transfer_buffer_len = 0;
     }
     
     // writing complete, switch to read events
@@ -287,7 +306,16 @@
     if(client_read_response_header(client)) {
         return client->error == 0;
     }
+    int ret = 0;
+    if(client->stage == 2) {
+        // websocket: write message buffer
+        ret = client_send_buf(client);
+    }
     if(client_read_response_body(client)) {
+        ret = 1;
+    }
+    
+    if(ret) {
         return client->error == 0;
     }
     
@@ -308,7 +336,8 @@
     return 0;
 }
 
-static int client_send_request(HttpClient *client) {
+// sends the content of the transfer buffer to client->socketfd
+static int client_send_buf(HttpClient *client) {
     size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos;
     ssize_t w;
     while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) {
@@ -387,7 +416,7 @@
         client->req_contentlength_pos += r;
         client->transfer_buffer_pos = startpos;
         client->transfer_buffer_len = rbody_buf_offset + r;
-        if(client_send_request(client)) {
+        if(client_send_buf(client)) {
             return 1;
         }
     }
@@ -398,7 +427,7 @@
         client->transfer_buffer_pos = 0;
         client->transfer_buffer_len = 5;
         client->request_body_terminated = 1;
-        if(client_send_request(client)) {
+        if(client_send_buf(client)) {
             return 1;
         }
         
@@ -479,19 +508,29 @@
     HeaderArray *headers = client->parser->headers;
     long long contentlength = 0;
     int chunkedtransferenc = 0;
+    cxmutstr hdr_connection = CX_NULLSTR;
+    cxmutstr hdr_upgrade = CX_NULLSTR;
     while(headers) {
         for(int i=0;i<headers->len;i++) {
-            if(!cx_strcasecmp(headers->headers[i].name, "content-length")) {
-                if(!cx_strtoll(headers->headers[i].value, &contentlength, 10)) {
+            cxmutstr header = headers->headers[i].name;
+            cxmutstr hvalue = headers->headers[i].value;
+            if(!cx_strcasecmp(header, "content-length")) {
+                if(!cx_strtoll(hvalue, &contentlength, 10)) {
                     headers = NULL;
                     break;
                 }
-            } else if(!cx_strcasecmp(headers->headers[i].name, "transfer-encoding")) {
-                if(!cx_strcmp(headers->headers[i].value, "chunked")) {
+            } else if(!cx_strcasecmp(header, "transfer-encoding")) {
+                if(!cx_strcmp(hvalue, "chunked")) {
                     chunkedtransferenc = 1;
                     headers = NULL;
                     break;
                 }
+            } else if(!cx_strcasecmp(header, "connection")) {
+                hdr_connection = hvalue;
+            }
+            
+            if(client->statuscode == 101 && !cx_strcasecmp(header, "upgrade")) {
+                hdr_upgrade = hvalue;
             }
         }
         
@@ -500,7 +539,30 @@
         }
     }
     
-    if(contentlength > 0 || chunkedtransferenc) {
+    if(client->statuscode == 101) {
+        if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) {
+            client->stage = 2;
+            client->event.events = EVENT_POLLIN|EVENT_POLLOUT;
+            
+            // prepare IO buffers for websockets
+            client->transfer_buffer_len = 0;
+            client->transfer_buffer_pos = 0;
+            
+            client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE);
+            if(!client->transfer2_buffer) {
+                client->error = 1;
+                return 1;
+            }
+            client->transfer2_buffer_alloc = HTTP_CLIENT_BUFFER_SIZE;
+            client->transfer2_buffer_len = 0;
+            client->transfer2_buffer_pos = 0;
+        } else {
+            // error: unknown protocol
+            log_ereport(LOG_FAILURE, "http-client: unknown protocol upgrade: %.*s", (int)hdr_upgrade.length, hdr_upgrade.ptr);
+            client->error = 1;
+            return 1;
+        }
+    } else if(contentlength > 0 || chunkedtransferenc) {
         IOStream *fd = Sysstream_new(NULL, client->socketfd);
         if(!fd) {
             client->error = 1;
@@ -510,6 +572,15 @@
         if(!http) {
             fd->free(fd);
         }
+        
+        // we can reuse the already allocated transfer_bufer for transfer2
+        client->transfer2_buffer = client->transfer_buffer;
+        client->transfer2_buffer_alloc = client->transfer_buffer_alloc;
+        client->transfer2_buffer_len = 0;
+        client->transfer_buffer_pos = 0;
+        client->transfer_buffer = NULL;
+        client->transfer_buffer_alloc = 0;
+        
         if(contentlength > 0) {
             http->max_read = contentlength;
             httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos);
@@ -527,16 +598,16 @@
 // returns 0 success
 //         1 would block or error
 static int client_write_response(HttpClient *client) {
-    while(client->transfer_buffer_pos < client->transfer_buffer_len) {
-        char *buf = client->transfer_buffer + client->transfer_buffer_pos;
-        size_t len = client->transfer_buffer_len - client->transfer_buffer_pos;
+    while(client->transfer2_buffer_pos < client->transfer2_buffer_len) {
+        char *buf = client->transfer2_buffer + client->transfer2_buffer_pos;
+        size_t len = client->transfer2_buffer_len - client->transfer2_buffer_pos;
         int ret = client->response_body_write(client, buf, len, client->response_body_write_userdata);
         if(ret > 0) {
-           client->transfer_buffer_pos += ret; 
+           client->transfer2_buffer_pos += ret; 
         } else if(ret == 0) {
             // EOF?
             // check if the write is incomplete, which would be an error
-            client->error == client->transfer_buffer_pos < client->transfer_buffer_len;
+            client->error == client->transfer2_buffer_pos < client->transfer2_buffer_len;
             return client->error;
         } else {
             if(ret != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
@@ -560,13 +631,13 @@
         return 1;
     }
     
-    char *buf = client->transfer_buffer;
-    size_t nbytes = client->transfer_buffer_alloc;
+    char *buf = client->transfer2_buffer;
+    size_t nbytes = client->transfer2_buffer_alloc;
     
     ssize_t r;
     while((r = net_read(&client->stream->st, buf, nbytes)) > 0) {
-        client->transfer_buffer_len = r;
-        client->transfer_buffer_pos = 0;
+        client->transfer2_buffer_len = r;
+        client->transfer2_buffer_pos = 0;
         if(client->response_body_write) {
             if(client_write_response(client)) {
                 return 1;
@@ -613,7 +684,7 @@
         
         // test client_send_request
         
-        int ret = client_send_request(client);
+        int ret = client_send_buf(client);
         // It is very likely that the first client_send_request call doesn't
         // fully write the request buffer to the socket
         // In that case it returns 1 but without the error flag
@@ -630,7 +701,7 @@
             ssize_t r = read(sock, tmpbuf, 1024);
             CX_TEST_ASSERT(r >= 0);
             cxBufferWrite(tmpbuf, 1, r, &buf);
-            ret = client_send_request(client);
+            ret = client_send_buf(client);
             CX_TEST_ASSERT(ret == 0 || (ret == 1 && !client->error));
             
             writes++;
--- a/src/server/proxy/httpclient.h	Sun Feb 22 10:05:37 2026 +0100
+++ b/src/server/proxy/httpclient.h	Sun Feb 22 11:18:47 2026 +0100
@@ -110,6 +110,26 @@
     void *response_body_write_userdata;
     
     /*
+     * Websocket write callback function
+     * 
+     * ssize_t ws_write(HttpClient *client, void *buf, size_t size, void *userdata)
+     * 
+     * Return: number of processed bytes,
+     *         HTTP_CLIENT_CALLBACK_WOULD_BLOCK or HTTP_CLIENT_CALLBACK_ERROR.
+     */
+    ssize_t (*ws_write)(HttpClient *, void *, size_t, void *);
+    void *ws_write_userdata;
+    
+    /*
+     * Websocket message IO available
+     * 
+     * This function is called, when client->socketfd is ready to accept
+     * new messages (http_client_add_message) 
+     */
+    int (*ws_msg_ready)(HttpClient *, void *);
+    void *ws_msg_ready_userdata;
+    
+    /*
      * Response finished callback
      * 
      * After this callback, the client object is no longer used. The callback
@@ -125,17 +145,25 @@
     HttpParser *parser;
     netbuf buffer;
     
+    // transfer_buffer: buffer for sending data to socketfd
     char *transfer_buffer;
     size_t transfer_buffer_alloc;
     size_t transfer_buffer_len;
     size_t transfer_buffer_pos;
     
+    // transfer2_buffer: buffer for response_body_write or ws_write
+    char *transfer2_buffer;
+    size_t transfer2_buffer_alloc;
+    size_t transfer2_buffer_len;
+    size_t transfer2_buffer_pos;
+    
     size_t req_contentlength_pos;
     
-    int stage; // 0: request, 1: response
+    int stage; // 0: request, 1: response, 2: websocket
     int request_body_complete;
     int request_body_terminated;
     int response_header_complete;
+    int keep_alive;
     
     Event event;
 };
@@ -180,8 +208,24 @@
  */
 int http_client_enable_chunked_transfer_encoding(HttpClient *client);
 
+/*
+ * Start request processing
+ */
 int http_client_start(HttpClient *client);
 
+/*
+ * Handle HttpClient IO and process the request/response
+ */
+int http_client_process(HttpClient *client);
+
+/*
+ * Adds message data, that will be sent to client->socketfd. This function
+ * should only be called when processing websockets.
+ */
+int http_client_add_message(HttpClient *client, const void *buf, size_t size);
+
+size_t http_client_message_buf_size_available(HttpClient *client);
+
 
 void http_client_add_tests(CxTestSuite *suite);
 

mercurial