implement httpclient websocket IO

Thu, 05 Mar 2026 21:06:41 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Thu, 05 Mar 2026 21:06:41 +0100
changeset 717
2edcb361b8be
parent 716
0b3d0af5d74f
child 718
9e98618464ae

implement httpclient websocket IO

src/server/proxy/httpclient.c file | annotate | diff | comparison | revisions
src/server/proxy/httpclient.h file | annotate | diff | comparison | revisions
--- a/src/server/proxy/httpclient.c	Wed Mar 04 22:05:07 2026 +0100
+++ b/src/server/proxy/httpclient.c	Thu Mar 05 21:06:41 2026 +0100
@@ -39,7 +39,9 @@
 
 static int client_connected(EventHandler *ev, Event *event);
 static int client_io(EventHandler *ev, Event *event);
+static int client_ws_io(EventHandler *ev, Event *event);
 static int client_process(HttpClient *client, Event *event);
+static int client_ws_process(HttpClient *client, Event *event);
 static int client_finished(EventHandler *ev, Event *event);
 
 static int client_send_buf(HttpClient *client);
@@ -318,6 +320,11 @@
     return client_process(client, event);
 }
 
+static int client_ws_io(EventHandler *ev, Event *event) {
+    HttpClient *client = event->cookie;
+    return client_ws_process(client, event);
+}
+
 static int client_process(HttpClient *client, Event *event) {
     client->last_event = event;
     if(client->stage < 0) {
@@ -364,8 +371,7 @@
     }
     int ret = 0;
     if(client->stage == 2) {
-        // websocket: write message buffer
-        ret = client_send_buf(client);
+        return client_ws_process(client, event);
     }
     if(client_read_response_body(client)) {
         ret = 1;
@@ -378,6 +384,59 @@
     return 0;
 }
 
+static int client_ws_process(HttpClient *client, Event *event) {
+    // send available data from the transfer buffer
+    int ret = client_send_buf(client);
+    if(client->error) {
+        return 0;
+    }
+    // readiness notification
+    if(ret == 0 && client->ws_msg_ready) {
+        if(client->ws_msg_ready(client, client->ws_msg_ready_userdata)) {
+            return 0;
+        }
+    }
+    
+    // read message
+    char *buf = client->transfer2_buffer + client->transfer2_buffer_pos;
+    size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
+    ssize_t r = -1;
+    while(available > 0) {
+        ssize_t r = net_read(client->stream, buf, available);
+        if(r <= 0) {
+            break;
+        }
+        client->transfer2_buffer_len += r;
+        if(client->ws_write) {
+            char *out = client->transfer2_buffer + client->transfer2_buffer_pos;
+            size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
+            while(nbytes > 0) {
+                ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata);
+                if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
+                    break;
+                } else if(w <= 0) {
+                    client->error = 1;
+                    return 0;
+                }
+                client->transfer2_buffer_pos += w;
+                
+                // adjust buffer
+                out = client->transfer2_buffer + client->transfer2_buffer_pos;
+                nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
+            }
+        } else {
+            // noop
+            client->transfer2_buffer_pos = client->transfer2_buffer_len;
+        }
+        
+        // adjust buffer
+        buf = client->transfer2_buffer + client->transfer2_buffer_pos;
+        available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
+    }
+    
+    return r == 0 || client->error ? 0 : 1;
+}
+
 static int client_finished(EventHandler *ev, Event *event) {
     HttpClient *client = event->cookie;
     
@@ -600,11 +659,14 @@
         if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) {
             client->stage = 2;
             client->event.events = EVENT_POLLIN|EVENT_POLLOUT;
+            client->event.fn = client_ws_io;
             
             // prepare IO buffers for websockets
+            // transfer_buffer is used for outgoing traffic
             client->transfer_buffer_len = 0;
             client->transfer_buffer_pos = 0;
             
+            // transfer2_buffer is used for reading
             client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE);
             if(!client->transfer2_buffer) {
                 client->error = 1;
--- a/src/server/proxy/httpclient.h	Wed Mar 04 22:05:07 2026 +0100
+++ b/src/server/proxy/httpclient.h	Thu Mar 05 21:06:41 2026 +0100
@@ -126,6 +126,8 @@
      * 
      * This function is called, when client->socketfd is ready to accept
      * new messages (http_client_add_message) 
+     * 
+     * If the callback returns 1, the websocket connection is terminated.
      */
     int (*ws_msg_ready)(HttpClient *, void *);
     void *ws_msg_ready_userdata;

mercurial