refactor http client io to use an HttpStream for reading the response default tip

Wed, 18 Feb 2026 13:41:10 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Wed, 18 Feb 2026 13:41:10 +0100
changeset 684
48da20bde908
parent 683
db37761a8494

refactor http client io to use an HttpStream for reading the response

src/server/proxy/httpclient.c file | annotate | diff | comparison | revisions
src/server/proxy/httpclient.h file | annotate | diff | comparison | revisions
src/server/util/io.c file | annotate | diff | comparison | revisions
src/server/util/io.h file | annotate | diff | comparison | revisions
--- a/src/server/proxy/httpclient.c	Wed Feb 18 12:31:19 2026 +0100
+++ b/src/server/proxy/httpclient.c	Wed Feb 18 13:41:10 2026 +0100
@@ -43,6 +43,7 @@
 static int client_send_request(HttpClient *client);
 static int client_send_request_body(HttpClient *client);
 static int client_read_response_header(HttpClient *client);
+static int client_read_response_body(HttpClient *client);
 
 HttpClient* http_client_new(EventHandler *ev) {
     CxMempool *mp = cxMempoolCreate(32, CX_MEMPOOL_TYPE_PURE);
@@ -83,6 +84,9 @@
     cxMempoolFree(client->mp);
     header_array_free(client->request_headers);
     http_parser_free(client->parser);
+    if(client->stream) {
+        client->stream->st.free(&client->stream->st);
+    }
     free(client->buffer.inbuf);
     free(client->addr);
     free(client->method);
@@ -236,9 +240,9 @@
         hdr = hdr->next;
     }
     cxBufferPutString(&buf, "\r\n");
-    client->req_buffer = buf.space;
-    client->req_buffer_alloc = buf.capacity;
-    client->req_buffer_len = buf.size;
+    client->transfer_buffer = buf.space;
+    client->transfer_buffer_alloc = buf.capacity;
+    client->transfer_buffer_len = buf.size;
     
     return 0;
 }
@@ -256,7 +260,7 @@
 
 static int client_io(EventHandler *ev, Event *event) {
     HttpClient *client = event->cookie;
-    if(client->req_buffer_pos < client->req_buffer_len) {
+    if(client->transfer_buffer_pos < client->transfer_buffer_len) {
         if(client_send_request(client)) {
             return client->error == 0;
         }
@@ -272,72 +276,11 @@
     // writing complete, switch to read events
     event->events = EVENT_POLLIN;
     
-    
-    char *buffer;
-    size_t nbytes;
-    if(client->response_header_complete) {
-        buffer = client->buffer.inbuf;
-        nbytes = client->buffer.maxsize;
-    } else {
-        buffer = client->buffer.inbuf + client->buffer.pos;
-        nbytes = client->buffer.maxsize - client->buffer.cursize;
+    if(client_read_response_header(client)) {
+        return client->error == 0;
     }
-    
-    
-    ssize_t r;
-    while((r = read(client->socketfd, buffer, nbytes)) > 0) {
-        client->buffer.cursize += r;
-        if(!client->response_header_complete) {
-            switch(http_parser_process(client->parser)) {
-                case 0: { // finish
-                    if(!http_parser_validate(client->parser)) {
-                        client->error = 1;
-                        return 0;
-                    }
-                    client->statuscode = client->parser->status;
-                    
-                    client->response_header_complete = 1;
-                    if(client->response_start) {
-                        cxmutstr msg = client->parser->msg;
-                        char t = msg.ptr[msg.length];
-                        msg.ptr[msg.length] = 0;
-                        int ret = client->response_start(client, client->statuscode, msg.ptr, client->response_start_userdata);
-                        msg.ptr[msg.length] = t;
-                        
-                        // TODO: check ret
-                    }
-                    break;
-                }
-                case 1: { // need more data
-                    continue;
-                }
-                case 2: { // error
-                    client->error = 1;
-                    return 0;
-                }
-            }
-        }
-        
-        // header complete
-        
-        char *out = client->buffer.inbuf + client->buffer.pos;
-        size_t len = client->buffer.cursize - client->buffer.pos;
-        
-        if(client->response_body_write) {
-            int ret = client->response_body_write(client, out, len, client->response_body_write_userdata);
-            // TODO: check ret
-        }
-        
-        client->buffer.pos = 0;
-        client->buffer.cursize = 0;
-    }
-    
-    if(r < 0) {
-        if(errno == EAGAIN) {
-            return 1;
-        } else {
-            log_ereport(LOG_FAILURE, "http-client: IO error: %s", strerror(errno));
-        }
+    if(client_read_response_body(client)) {
+        return client->error == 0;
     }
     
     // request finished
@@ -363,11 +306,11 @@
 }
 
 static int client_send_request(HttpClient *client) {
-    size_t nbytes = client->req_buffer_len - client->req_buffer_pos;
+    size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos;
     ssize_t w;
-    while((w = write(client->socketfd, client->req_buffer + client->req_buffer_pos, nbytes)) > 0) {
-        client->req_buffer_pos += w;
-        nbytes = client->req_buffer_len - client->req_buffer_pos;
+    while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) {
+        client->transfer_buffer_pos += w;
+        nbytes = client->transfer_buffer_len - client->transfer_buffer_pos;
         if(nbytes == 0) {
             break;
         }
@@ -382,11 +325,11 @@
         return 1;
     }
     
-    return client->req_buffer_pos < client->req_buffer_len;
+    return client->transfer_buffer_pos < client->transfer_buffer_len;
 }
 
 static int client_send_request_body(HttpClient *client) {
-    size_t rbody_readsize = client->req_buffer_alloc;
+    size_t rbody_readsize = client->transfer_buffer_alloc;
     size_t rbody_buf_offset = 0;
     if(client->req_content_length == -1) {
         // chunked transfer encoding:
@@ -396,7 +339,7 @@
         rbody_buf_offset = 16;
     }
     while(!client->request_body_complete) {
-        ssize_t r = client->request_body_read(client, client->req_buffer + rbody_buf_offset, rbody_readsize, client->request_body_read_userdata);
+        ssize_t r = client->request_body_read(client, client->transfer_buffer + rbody_buf_offset, rbody_readsize, client->request_body_read_userdata);
         if(r <= 0) {
             if(r == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
                 return 1;
@@ -413,7 +356,7 @@
             // is it time to terminate the request body?
             // try read some additional bytes, if it returns 0, we know
             // the request body is complete and we can add the termination chunk
-            char *r2buf = client->req_buffer + rbody_buf_offset + r;
+            char *r2buf = client->transfer_buffer + rbody_buf_offset + r;
             ssize_t r2 = client->request_body_read(client, r2buf, 32, client->request_body_read_userdata);
             if(r > 0) {
                 r += r2;
@@ -435,12 +378,12 @@
             char chunkheader[16];
             int chunkheaderlen = snprintf(chunkheader, 16, "%zx\r\n", (size_t)r);
             startpos = 16 - chunkheaderlen;
-            memcpy(client->req_buffer + startpos, chunkheader, chunkheaderlen);
+            memcpy(client->transfer_buffer + startpos, chunkheader, chunkheaderlen);
         }
 
         client->req_contentlength_pos += r;
-        client->req_buffer_pos = startpos;
-        client->req_buffer_len = rbody_buf_offset + r;
+        client->transfer_buffer_pos = startpos;
+        client->transfer_buffer_len = rbody_buf_offset + r;
         if(client_send_request(client)) {
             return 1;
         }
@@ -448,9 +391,9 @@
     
     // chunked transfer encoding: terminate
     if(client->req_content_length == -1 && !client->request_body_terminated) {
-        memcpy(client->req_buffer, "0\r\n\r\n", 5);
-        client->req_buffer_pos = 0;
-        client->req_buffer_len = 5;
+        memcpy(client->transfer_buffer, "0\r\n\r\n", 5);
+        client->transfer_buffer_pos = 0;
+        client->transfer_buffer_len = 5;
         client->request_body_terminated = 1;
         if(client_send_request(client)) {
             return 1;
@@ -465,7 +408,6 @@
     return 0;
 }
 
-/*
 static int client_read_response_header(HttpClient *client) {
     if(client->response_header_complete) {
         return 0;
@@ -482,7 +424,7 @@
                 case 0: { // finish
                     if(!http_parser_validate(client->parser)) {
                         client->error = 1;
-                        return 0;
+                        return 1;
                     }
                     client->statuscode = client->parser->status;
                     
@@ -503,16 +445,94 @@
                 }
                 case 2: { // error
                     client->error = 1;
-                    return 0;
+                    return 1;
                 }
             }
         }
         
         // header complete
-
+        break;
+    }
+    
+    if(r <= 0) {
+        if(r == 0) {
+            // unexpected EOF
+            client->error = 1;
+        } else if(errno != EAGAIN) {
+            log_ereport(LOG_FAILURE, "http-client: IO error: %s", strerror(errno));
+            client->error = 1;
+        }
+        return 1;
+    }
+    
+    // initialize httpstream
+    HeaderArray *headers = client->parser->headers;
+    long long contentlength = 0;
+    int chunkedtransferenc = 0;
+    while(headers) {
+        for(int i=0;i<headers->len;i++) {
+            if(!cx_strcasecmp(headers->headers[i].name, "content-length")) {
+                if(!cx_strtoll(headers->headers[i].value, &contentlength, 10)) {
+                    headers = NULL;
+                    break;
+                }
+            } else if(!cx_strcasecmp(headers->headers[i].name, "transfer-encoding")) {
+                if(!cx_strcmp(headers->headers[i].value, "chunked")) {
+                    chunkedtransferenc = 1;
+                    headers = NULL;
+                    break;
+                }
+            }
+        }
     }
+    
+    if(contentlength > 0 || chunkedtransferenc) {
+        IOStream *fd = Sysstream_new(NULL, client->socketfd);
+        if(!fd) {
+            client->error = 1;
+            return 1;
+        }
+        HttpStream *http = (HttpStream*)httpstream_new(NULL, fd);
+        if(!http) {
+            fd->free(fd);
+        }
+        if(contentlength > 0) {
+            http->max_read = contentlength;
+            httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos);
+        } else if(chunkedtransferenc) {
+            httpstream_enable_chunked_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos);
+        }
+        client->stream = http;
+    }
+    
+    return 0;
 }
-*/
+
+static int client_read_response_body(HttpClient *client) {
+    if(!client->stream) {
+        return 0; // no input stream -> no response body
+    }
+    
+    char *buf = client->transfer_buffer;
+    size_t nbytes = client->transfer_buffer_alloc;
+    
+    ssize_t r;
+    while((r = net_read(&client->stream->st, buf, nbytes)) > 0) {
+        if(client->response_body_write) {
+            int ret = client->response_body_write(client, buf, r, client->response_body_write_userdata);
+            // TODO: check ret
+        }
+    }
+    
+    if(r < 0) {
+        if(r != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
+            client->error;
+        }
+        return 1;
+    }
+    
+    return 0;
+}
 
 /* --------------------------------- Tests --------------------------------- */
 
@@ -535,11 +555,11 @@
         // init the buffer with random data
         for(size_t i=0;i<len;i+=sizeof(int)) {
             int *p = (int*)(str+i);
-            *p = rand();;
+            *p = rand();
         }
         
-        client->req_buffer = str;
-        client->req_buffer_len = len;
+        client->transfer_buffer = str;
+        client->transfer_buffer_len = len;
         
         // test client_send_request
         
@@ -548,15 +568,15 @@
         // fully write the request buffer to the socket
         // In that case it returns 1 but without the error flag
         CX_TEST_ASSERT(ret == 1 && !client->error);
-        CX_TEST_ASSERT(client->req_buffer_pos > 0);
-        CX_TEST_ASSERT(client->req_buffer_pos < len);
+        CX_TEST_ASSERT(client->transfer_buffer_pos > 0);
+        CX_TEST_ASSERT(client->transfer_buffer_pos < len);
         
         // read the request buffer from sock and continue with client_send_request
         CxBuffer buf;
         cxBufferInit(&buf, cxDefaultAllocator, NULL, len, CX_BUFFER_AUTO_EXTEND|CX_BUFFER_FREE_CONTENTS);
         char tmpbuf[1024];
         int writes = 1;
-        while(client->req_buffer_pos < client->req_buffer_len && writes < 2000000) {
+        while(client->transfer_buffer_pos < client->transfer_buffer_len && writes < 2000000) {
             ssize_t r = read(sock, tmpbuf, 1024);
             CX_TEST_ASSERT(r >= 0);
             cxBufferWrite(tmpbuf, 1, r, &buf);
@@ -565,7 +585,7 @@
             
             writes++;
         }
-        CX_TEST_ASSERT(client->req_buffer_pos == client->req_buffer_len);
+        CX_TEST_ASSERT(client->transfer_buffer_pos == client->transfer_buffer_len);
         
         // finish reading the request buffer from sock
         ssize_t r;
@@ -624,7 +644,7 @@
         http_client_add_request_header(client, cx_mutstr("Test2"), cx_mutstr("value2"));
         create_req_buffer(client);
         
-        size_t req_header_len = client->req_buffer_len;
+        size_t req_header_len = client->transfer_buffer_len;
         
         // response buffer
         CxBuffer buf;
@@ -685,7 +705,6 @@
             ret = client_io(&dummy, &event);
             
             CX_TEST_ASSERT(!client->error);
-            CX_TEST_ASSERT(ret == 1);
         }
         CX_TEST_ASSERT(response_str_pos == response_str_len);
         CX_TEST_ASSERT(testr.status == 200);
@@ -769,9 +788,9 @@
         client->request_body_read = test_request_body_read;
         client->request_body_read_userdata = &req;
         
-        memset(client->req_buffer, '_', client->req_buffer_alloc);
-        client->req_buffer_pos = 0;
-        client->req_buffer_len = 0;
+        memset(client->transfer_buffer, '_', client->transfer_buffer_alloc);
+        client->transfer_buffer_pos = 0;
+        client->transfer_buffer_len = 0;
         
         // send the first 128 bytes
         while(req.cur_reads <= req.max_reads) {
@@ -840,8 +859,59 @@
     }
 }
 
+static CX_TEST(test_http_client_read_response_head) {
+    CX_TEST_DO {
+        EventHandler dummy;
+        HttpClient *client = http_client_new(&dummy);
+        create_req_buffer(client);
+        client->req_content_length = -1;
+        
+        int fds[2];
+        util_socketpair(fds);
+        util_socket_setnonblock(fds[0], 1);
+        util_socket_setnonblock(fds[1], 1);
+        client->socketfd = fds[0];
+        int sock = fds[1];
+        
+        // test
+        char *response_str = 
+                "HTTP/1.1 204 OK\r\n"
+                "Host: localhost\r\n"
+                "Content-length: 0\r\n"
+                "\r\n";
+        
+        size_t response_len = strlen(response_str);
+        size_t response_pos = 0;
+        while(response_pos < response_len) {
+            size_t nbytes = response_len - response_pos;
+            ssize_t w = write(sock, response_str + response_pos, nbytes);
+            if(w > 0) {
+                response_pos += w;
+            }
+            
+            if(!client->response_header_complete) {
+                int ret = client_read_response_header(client);
+                CX_TEST_ASSERT(client->error == 0);
+                if(ret == 1) {
+                    continue;
+                }
+                
+                CX_TEST_ASSERT(client->stream == NULL);
+            }
+            
+            break;
+        }
+        
+        // cleanup
+        close(fds[0]);
+        close(fds[1]);
+        http_client_free(client);
+    }
+}
+
 void http_client_add_tests(CxTestSuite *suite) {
     cx_test_register(suite, test_http_client_send_request);
     cx_test_register(suite, test_http_client_send_request_body_chunked);
+    cx_test_register(suite, test_http_client_read_response_head);
     cx_test_register(suite, test_http_client_io_simple);
 }
--- a/src/server/proxy/httpclient.h	Wed Feb 18 12:31:19 2026 +0100
+++ b/src/server/proxy/httpclient.h	Wed Feb 18 13:41:10 2026 +0100
@@ -122,10 +122,11 @@
     HttpParser *parser;
     netbuf buffer;
     
-    char *req_buffer;
-    size_t req_buffer_alloc;
-    size_t req_buffer_len;
-    size_t req_buffer_pos;
+    char *transfer_buffer;
+    size_t transfer_buffer_alloc;
+    size_t transfer_buffer_len;
+    size_t transfer_buffer_pos;
+    
     size_t req_contentlength_pos;
     
     int request_body_complete;
--- a/src/server/util/io.c	Wed Feb 18 12:31:19 2026 +0100
+++ b/src/server/util/io.c	Wed Feb 18 13:41:10 2026 +0100
@@ -77,6 +77,7 @@
     NULL,
     (io_setmode_f)net_sys_setmode,
     (io_poll_f)net_sys_poll,
+    (io_free_f)sysstream_free,
     0,
     0
 };
@@ -90,6 +91,7 @@
     (io_finish_f)net_http_finish,
     (io_setmode_f)net_http_setmode,
     (io_poll_f)net_http_poll,
+    (io_free_f)httpstream_free,
     0,
     IO_STREAM_TYPE_HTTP
 };
@@ -103,6 +105,7 @@
     (io_finish_f)net_ssl_finish,
     (io_setmode_f)net_ssl_setmode,
     (io_poll_f)net_ssl_poll,
+    (io_free_f)sslstream_free,
     0,
     IO_STREAM_TYPE_SSL
 };
@@ -120,10 +123,16 @@
 IOStream* Sysstream_new(pool_handle_t *pool, SYS_SOCKET fd) {
     Sysstream *st = pool_malloc(pool, sizeof(Sysstream));
     st->st = native_io_funcs;
+    st->pool = pool;
     st->fd = fd;
     return (IOStream*)st;
 }
 
+void sysstream_free(IOStream *st) {
+    Sysstream *sys = (Sysstream*)st;
+    pool_free(sys->pool, st);
+}
+
 #ifdef XP_UNIX
 ssize_t net_sys_write(Sysstream *st, const void *buf, size_t nbytes) {
     ssize_t r = write(st->fd, buf, nbytes);
@@ -288,6 +297,12 @@
     return (IOStream*)st;
 }
 
+void httpstream_free(IOStream *st) {
+    HttpStream *http = (HttpStream*)st;
+    http->fd->free(http->fd);
+    pool_free(http->pool, st);
+}
+
 int httpstream_enable_chunked_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos) {
     if(st->read != (io_read_f)net_http_read) {
         log_ereport(LOG_FAILURE, "%s", "httpstream_enable_chunked_read: IOStream is not an HttpStream");
@@ -812,6 +827,12 @@
     return (IOStream*)st;
 }
 
+void sslstream_free(IOStream *st) {
+    SSLStream *ssl = (SSLStream*)st;
+    SSL_free(ssl->ssl);
+    pool_free(ssl->pool, st);
+}
+
 ssize_t net_ssl_write(SSLStream *st, const void *buf, size_t nbytes) {
     int ret = SSL_write(st->ssl, buf, nbytes);
     if(ret <= 0) {
--- a/src/server/util/io.h	Wed Feb 18 12:31:19 2026 +0100
+++ b/src/server/util/io.h	Wed Feb 18 13:41:10 2026 +0100
@@ -70,6 +70,7 @@
 typedef void(*io_finish_f)(IOStream *);
 typedef void(*io_setmode_f)(IOStream *, int);
 typedef int(*io_poll_f)(IOStream *, EventHandler *, int, Event *);
+typedef void (*io_free_f)(IOStream *);
 
 struct IOStream {
     io_write_f    write;
@@ -80,12 +81,14 @@
     io_finish_f   finish;
     io_setmode_f  setmode;
     io_poll_f     poll;
+    io_free_f     free;
     int           io_errno;
     unsigned int  type;
 };
 
 struct Sysstream {
     IOStream st;
+    pool_handle_t *pool;
 #ifdef XP_UNIX
     int      fd;
 #elif defined(XP_WIN32)
@@ -97,6 +100,7 @@
 struct HttpStream {
     IOStream st;
     IOStream *fd;
+    pool_handle_t *pool;
     
     uint64_t written; 
     /*
@@ -181,6 +185,7 @@
 typedef struct SSLStream {
     IOStream st;
     SSL      *ssl;
+    pool_handle_t *pool;
     int      error;
 } SSLStream;
 
@@ -188,6 +193,7 @@
 
 /* system stream */
 IOStream* Sysstream_new(pool_handle_t *pool, SYS_SOCKET fd);
+void sysstream_free(IOStream *st);
 
 ssize_t net_sys_write(Sysstream *st, const void *buf, size_t nbytes);
 ssize_t net_sys_writev(Sysstream *st, struct iovec *iovec, int iovcnt);
@@ -199,6 +205,7 @@
 
 /* http stream */
 IOStream* httpstream_new(pool_handle_t *pool, IOStream *fd);
+void httpstream_free(IOStream *st);
 
 int httpstream_enable_chunked_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos);
 int httpstream_enable_buffered_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos);
@@ -222,6 +229,7 @@
 
 /* ssl stream */
 IOStream* sslstream_new(pool_handle_t *pool, SSL *ssl);
+void sslstream_free(IOStream *st);
 
 ssize_t net_ssl_write(SSLStream *st, const void *buf, size_t nbytes);
 ssize_t net_ssl_writev(SSLStream *st, struct iovec *iovec, int iovcnt);

mercurial