add test_http_client_ws_msg_in

Fri, 06 Mar 2026 18:50:23 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Fri, 06 Mar 2026 18:50:23 +0100
changeset 719
c4c2b8e8ddc5
parent 718
9e98618464ae
child 720
8c7d08d3be2e

add test_http_client_ws_msg_in

src/server/proxy/httpclient.c file | annotate | diff | comparison | revisions
src/server/util/io.c file | annotate | diff | comparison | revisions
--- a/src/server/proxy/httpclient.c	Thu Mar 05 21:39:39 2026 +0100
+++ b/src/server/proxy/httpclient.c	Fri Mar 06 18:50:23 2026 +0100
@@ -385,6 +385,42 @@
     return 0;
 }
 
+static int client_ws_write(HttpClient *client) {
+    if(!client->ws_write) {
+        client->transfer2_buffer_pos = client->transfer2_buffer_len;
+        return 0;
+    }
+    
+    int ret = 0;
+    
+    // pass all data from the transfer2 buffer to the ws_write callback
+    char *out = client->transfer2_buffer + client->transfer2_buffer_pos;
+    size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
+    while(nbytes > 0) {
+        ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata);
+        if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
+            ret = 1;
+            break;
+        } else if(w <= 0) {
+            client->error = 1;
+            return 0;
+        }
+        client->transfer2_buffer_pos += w;
+
+        // adjust buffer
+        out = client->transfer2_buffer + client->transfer2_buffer_pos;
+        nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
+    }
+    
+    // clear buffer
+    if(client->transfer2_buffer_pos == client->transfer2_buffer_len) {
+        client->transfer2_buffer_pos = 0;
+        client->transfer2_buffer_len = 0;
+    }
+    
+    return ret;
+}
+
 static int client_ws_process(HttpClient *client, Event *event) {
     // send available data from the transfer buffer
     int ret = client_send_buf(client);
@@ -400,36 +436,24 @@
         client->transfer_buffer_len = 0;
     }
     
+    // flush transfer2 buffer
+    if(client_ws_write(client)) {
+        return 1;
+    }
+    
     // read message
     char *buf = client->transfer2_buffer + client->transfer2_buffer_pos;
     size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
+
     ssize_t r = -1;
     while(available > 0) {
-        ssize_t r = net_read(client->stream, buf, available);
+        r = net_read(client->stream, buf, available);
         if(r <= 0) {
             break;
         }
         client->transfer2_buffer_len += r;
-        if(client->ws_write) {
-            char *out = client->transfer2_buffer + client->transfer2_buffer_pos;
-            size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
-            while(nbytes > 0) {
-                ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata);
-                if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
-                    break;
-                } else if(w <= 0) {
-                    client->error = 1;
-                    return 0;
-                }
-                client->transfer2_buffer_pos += w;
-                
-                // adjust buffer
-                out = client->transfer2_buffer + client->transfer2_buffer_pos;
-                nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
-            }
-        } else {
-            // noop
-            client->transfer2_buffer_pos = client->transfer2_buffer_len;
+        if(client_ws_write(client)) {
+            break;
         }
         
         // adjust buffer
@@ -688,7 +712,7 @@
             if(!http) {
                 fd->free(fd);
             }
-            http->fd->setmode((IOStream*)http, IO_MODE_RAW);
+            http->raw = WS_TRUE;
             if(client->buffer.pos < client->buffer.cursize) {
                 // bytes remaining in the buffer -> enable buffered reading
                 httpstream_enable_buffered_read(
@@ -1575,6 +1599,81 @@
     }
 }
 
+static ssize_t test_ws_write(HttpClient *client, void *buf, size_t nbytes, void *userdata) {
+    CxBuffer *out = (CxBuffer*)userdata;
+    return cxBufferWrite(buf, 1, nbytes, out);
+}
+
+static CX_TEST(test_http_client_ws_msg_in) {
+    CX_TEST_DO {
+        EventHandler dummy;
+        HttpClient *client = http_client_new(&dummy);
+        
+        int fds[2];
+        util_socketpair(fds);
+        util_socket_setnonblock(fds[0], 1);
+        util_socket_setnonblock(fds[1], 1);
+        client->socketfd = fds[0];
+        client->event.cookie = client;
+        int sock = fds[1];
+        
+        create_req_buffer(client);
+        client->transfer_buffer_len = 0;
+        client->transfer_buffer_pos = 0;
+        client->transfer2_buffer_alloc = 1024*128;
+        client->transfer2_buffer = malloc(client->transfer2_buffer_alloc);
+        
+        IOStream *fd = Sysstream_new(NULL, client->socketfd);
+        HttpStream *http = (HttpStream*)httpstream_new(NULL, fd);
+        http->raw = WS_TRUE;
+        client->stream = http;
+        
+        size_t str_nbytes = 1024*1024*64;
+        char *str = malloc(str_nbytes);
+        // init the buffer with random data
+        for(size_t i=0;i<str_nbytes;i+=sizeof(int)) {
+            int *p = (int*)(str+i);
+            *p = rand();
+        }
+        size_t str_pos = 0;
+        
+        CxBuffer *out = cxBufferCreate(NULL, NULL, str_nbytes, CX_BUFFER_AUTO_EXTEND|CX_BUFFER_FREE_CONTENTS);
+        client->ws_write = test_ws_write;
+        client->ws_write_userdata = out;
+        
+        while(out->size < str_nbytes) {
+            size_t slen = str_nbytes - str_pos;
+            if(slen > 64*1024) {
+                slen = 64*1024;
+            }
+            
+            if(slen > 0) {
+                ssize_t w = write(sock, str + str_pos, slen);
+                if(w > 0) {
+                    str_pos += w;
+                }
+            } 
+            
+            // webosocket IO: this should read from the other socket and
+            // call the ws_write callback
+            int ret = client_ws_io(&dummy, &client->event);
+            CX_TEST_ASSERT(client->error == 0);
+            
+            // run client_ws_io again, it should do nothing
+            ret = client_ws_io(&dummy, &client->event);
+            CX_TEST_ASSERT(ret == 1); // would block
+        }
+        
+        // http_client_add_message should block at least once
+        CX_TEST_ASSERT(out->pos == str_nbytes);
+        CX_TEST_ASSERT(!memcmp(out->space, str, str_nbytes));
+        
+        cxBufferFree(out);
+        http_client_free(client);
+        close(sock);
+    }
+}
+
 void http_client_add_tests(CxTestSuite *suite) {
     cx_test_register(suite, test_http_client_send_request);
     cx_test_register(suite, test_http_client_send_request_body_chunked);
@@ -1600,4 +1699,5 @@
     cx_test_register(suite, test_http_client_io_write_blsz8_error2);
     cx_test_register(suite, test_http_client_io_write_blsz8_error3);
     cx_test_register(suite, test_http_client_ws_msg_out);
+    cx_test_register(suite, test_http_client_ws_msg_in);
 }
--- a/src/server/util/io.c	Thu Mar 05 21:39:39 2026 +0100
+++ b/src/server/util/io.c	Fri Mar 06 18:50:23 2026 +0100
@@ -543,16 +543,8 @@
     }
 }
 
-ssize_t net_http_write_raw(HttpStream *st, const void *buf, size_t nbytes) {
-    return st->fd->write(st->fd, buf, nbytes);
-}
-
-ssize_t net_http_writev_raw(HttpStream *st, struct iovec *iovec, int iovcnt) {
-    return st->fd->writev(st->fd, iovec, iovcnt);
-}
-
 ssize_t net_http_read(HttpStream *st, void *buf, size_t nbytes) {
-    if(st->read >= st->max_read) {
+    if(st->read >= st->max_read && !st->raw) {
         st->read_eof = WS_TRUE;
         return 0;
     }

mercurial