# HG changeset patch # User Olaf Wintermann # Date 1771418470 -3600 # Node ID 48da20bde908c9707e7d67f88362109ce4038578 # Parent db37761a84943b600caf1bf273e94cb4e0f0e7ba refactor http client io to use an HttpStream for reading the response diff -r db37761a8494 -r 48da20bde908 src/server/proxy/httpclient.c --- a/src/server/proxy/httpclient.c Wed Feb 18 12:31:19 2026 +0100 +++ b/src/server/proxy/httpclient.c Wed Feb 18 13:41:10 2026 +0100 @@ -43,6 +43,7 @@ static int client_send_request(HttpClient *client); static int client_send_request_body(HttpClient *client); static int client_read_response_header(HttpClient *client); +static int client_read_response_body(HttpClient *client); HttpClient* http_client_new(EventHandler *ev) { CxMempool *mp = cxMempoolCreate(32, CX_MEMPOOL_TYPE_PURE); @@ -83,6 +84,9 @@ cxMempoolFree(client->mp); header_array_free(client->request_headers); http_parser_free(client->parser); + if(client->stream) { + client->stream->st.free(&client->stream->st); + } free(client->buffer.inbuf); free(client->addr); free(client->method); @@ -236,9 +240,9 @@ hdr = hdr->next; } cxBufferPutString(&buf, "\r\n"); - client->req_buffer = buf.space; - client->req_buffer_alloc = buf.capacity; - client->req_buffer_len = buf.size; + client->transfer_buffer = buf.space; + client->transfer_buffer_alloc = buf.capacity; + client->transfer_buffer_len = buf.size; return 0; } @@ -256,7 +260,7 @@ static int client_io(EventHandler *ev, Event *event) { HttpClient *client = event->cookie; - if(client->req_buffer_pos < client->req_buffer_len) { + if(client->transfer_buffer_pos < client->transfer_buffer_len) { if(client_send_request(client)) { return client->error == 0; } @@ -272,72 +276,11 @@ // writing complete, switch to read events event->events = EVENT_POLLIN; - - char *buffer; - size_t nbytes; - if(client->response_header_complete) { - buffer = client->buffer.inbuf; - nbytes = client->buffer.maxsize; - } else { - buffer = client->buffer.inbuf + client->buffer.pos; - nbytes = client->buffer.maxsize - client->buffer.cursize; + if(client_read_response_header(client)) { + return client->error == 0; } - - - ssize_t r; - while((r = read(client->socketfd, buffer, nbytes)) > 0) { - client->buffer.cursize += r; - if(!client->response_header_complete) { - switch(http_parser_process(client->parser)) { - case 0: { // finish - if(!http_parser_validate(client->parser)) { - client->error = 1; - return 0; - } - client->statuscode = client->parser->status; - - client->response_header_complete = 1; - if(client->response_start) { - cxmutstr msg = client->parser->msg; - char t = msg.ptr[msg.length]; - msg.ptr[msg.length] = 0; - int ret = client->response_start(client, client->statuscode, msg.ptr, client->response_start_userdata); - msg.ptr[msg.length] = t; - - // TODO: check ret - } - break; - } - case 1: { // need more data - continue; - } - case 2: { // error - client->error = 1; - return 0; - } - } - } - - // header complete - - char *out = client->buffer.inbuf + client->buffer.pos; - size_t len = client->buffer.cursize - client->buffer.pos; - - if(client->response_body_write) { - int ret = client->response_body_write(client, out, len, client->response_body_write_userdata); - // TODO: check ret - } - - client->buffer.pos = 0; - client->buffer.cursize = 0; - } - - if(r < 0) { - if(errno == EAGAIN) { - return 1; - } else { - log_ereport(LOG_FAILURE, "http-client: IO error: %s", strerror(errno)); - } + if(client_read_response_body(client)) { + return client->error == 0; } // request finished @@ -363,11 +306,11 @@ } static int client_send_request(HttpClient *client) { - size_t nbytes = client->req_buffer_len - client->req_buffer_pos; + size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos; ssize_t w; - while((w = write(client->socketfd, client->req_buffer + client->req_buffer_pos, nbytes)) > 0) { - client->req_buffer_pos += w; - nbytes = client->req_buffer_len - client->req_buffer_pos; + while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) { + client->transfer_buffer_pos += w; + nbytes = client->transfer_buffer_len - client->transfer_buffer_pos; if(nbytes == 0) { break; } @@ -382,11 +325,11 @@ return 1; } - return client->req_buffer_pos < client->req_buffer_len; + return client->transfer_buffer_pos < client->transfer_buffer_len; } static int client_send_request_body(HttpClient *client) { - size_t rbody_readsize = client->req_buffer_alloc; + size_t rbody_readsize = client->transfer_buffer_alloc; size_t rbody_buf_offset = 0; if(client->req_content_length == -1) { // chunked transfer encoding: @@ -396,7 +339,7 @@ rbody_buf_offset = 16; } while(!client->request_body_complete) { - ssize_t r = client->request_body_read(client, client->req_buffer + rbody_buf_offset, rbody_readsize, client->request_body_read_userdata); + ssize_t r = client->request_body_read(client, client->transfer_buffer + rbody_buf_offset, rbody_readsize, client->request_body_read_userdata); if(r <= 0) { if(r == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { return 1; @@ -413,7 +356,7 @@ // is it time to terminate the request body? // try read some additional bytes, if it returns 0, we know // the request body is complete and we can add the termination chunk - char *r2buf = client->req_buffer + rbody_buf_offset + r; + char *r2buf = client->transfer_buffer + rbody_buf_offset + r; ssize_t r2 = client->request_body_read(client, r2buf, 32, client->request_body_read_userdata); if(r > 0) { r += r2; @@ -435,12 +378,12 @@ char chunkheader[16]; int chunkheaderlen = snprintf(chunkheader, 16, "%zx\r\n", (size_t)r); startpos = 16 - chunkheaderlen; - memcpy(client->req_buffer + startpos, chunkheader, chunkheaderlen); + memcpy(client->transfer_buffer + startpos, chunkheader, chunkheaderlen); } client->req_contentlength_pos += r; - client->req_buffer_pos = startpos; - client->req_buffer_len = rbody_buf_offset + r; + client->transfer_buffer_pos = startpos; + client->transfer_buffer_len = rbody_buf_offset + r; if(client_send_request(client)) { return 1; } @@ -448,9 +391,9 @@ // chunked transfer encoding: terminate if(client->req_content_length == -1 && !client->request_body_terminated) { - memcpy(client->req_buffer, "0\r\n\r\n", 5); - client->req_buffer_pos = 0; - client->req_buffer_len = 5; + memcpy(client->transfer_buffer, "0\r\n\r\n", 5); + client->transfer_buffer_pos = 0; + client->transfer_buffer_len = 5; client->request_body_terminated = 1; if(client_send_request(client)) { return 1; @@ -465,7 +408,6 @@ return 0; } -/* static int client_read_response_header(HttpClient *client) { if(client->response_header_complete) { return 0; @@ -482,7 +424,7 @@ case 0: { // finish if(!http_parser_validate(client->parser)) { client->error = 1; - return 0; + return 1; } client->statuscode = client->parser->status; @@ -503,16 +445,94 @@ } case 2: { // error client->error = 1; - return 0; + return 1; } } } // header complete - + break; + } + + if(r <= 0) { + if(r == 0) { + // unexpected EOF + client->error = 1; + } else if(errno != EAGAIN) { + log_ereport(LOG_FAILURE, "http-client: IO error: %s", strerror(errno)); + client->error = 1; + } + return 1; + } + + // initialize httpstream + HeaderArray *headers = client->parser->headers; + long long contentlength = 0; + int chunkedtransferenc = 0; + while(headers) { + for(int i=0;ilen;i++) { + if(!cx_strcasecmp(headers->headers[i].name, "content-length")) { + if(!cx_strtoll(headers->headers[i].value, &contentlength, 10)) { + headers = NULL; + break; + } + } else if(!cx_strcasecmp(headers->headers[i].name, "transfer-encoding")) { + if(!cx_strcmp(headers->headers[i].value, "chunked")) { + chunkedtransferenc = 1; + headers = NULL; + break; + } + } + } } + + if(contentlength > 0 || chunkedtransferenc) { + IOStream *fd = Sysstream_new(NULL, client->socketfd); + if(!fd) { + client->error = 1; + return 1; + } + HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); + if(!http) { + fd->free(fd); + } + if(contentlength > 0) { + http->max_read = contentlength; + httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); + } else if(chunkedtransferenc) { + httpstream_enable_chunked_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); + } + client->stream = http; + } + + return 0; } -*/ + +static int client_read_response_body(HttpClient *client) { + if(!client->stream) { + return 0; // no input stream -> no response body + } + + char *buf = client->transfer_buffer; + size_t nbytes = client->transfer_buffer_alloc; + + ssize_t r; + while((r = net_read(&client->stream->st, buf, nbytes)) > 0) { + if(client->response_body_write) { + int ret = client->response_body_write(client, buf, r, client->response_body_write_userdata); + // TODO: check ret + } + } + + if(r < 0) { + if(r != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { + client->error; + } + return 1; + } + + return 0; +} /* --------------------------------- Tests --------------------------------- */ @@ -535,11 +555,11 @@ // init the buffer with random data for(size_t i=0;ireq_buffer = str; - client->req_buffer_len = len; + client->transfer_buffer = str; + client->transfer_buffer_len = len; // test client_send_request @@ -548,15 +568,15 @@ // fully write the request buffer to the socket // In that case it returns 1 but without the error flag CX_TEST_ASSERT(ret == 1 && !client->error); - CX_TEST_ASSERT(client->req_buffer_pos > 0); - CX_TEST_ASSERT(client->req_buffer_pos < len); + CX_TEST_ASSERT(client->transfer_buffer_pos > 0); + CX_TEST_ASSERT(client->transfer_buffer_pos < len); // read the request buffer from sock and continue with client_send_request CxBuffer buf; cxBufferInit(&buf, cxDefaultAllocator, NULL, len, CX_BUFFER_AUTO_EXTEND|CX_BUFFER_FREE_CONTENTS); char tmpbuf[1024]; int writes = 1; - while(client->req_buffer_pos < client->req_buffer_len && writes < 2000000) { + while(client->transfer_buffer_pos < client->transfer_buffer_len && writes < 2000000) { ssize_t r = read(sock, tmpbuf, 1024); CX_TEST_ASSERT(r >= 0); cxBufferWrite(tmpbuf, 1, r, &buf); @@ -565,7 +585,7 @@ writes++; } - CX_TEST_ASSERT(client->req_buffer_pos == client->req_buffer_len); + CX_TEST_ASSERT(client->transfer_buffer_pos == client->transfer_buffer_len); // finish reading the request buffer from sock ssize_t r; @@ -624,7 +644,7 @@ http_client_add_request_header(client, cx_mutstr("Test2"), cx_mutstr("value2")); create_req_buffer(client); - size_t req_header_len = client->req_buffer_len; + size_t req_header_len = client->transfer_buffer_len; // response buffer CxBuffer buf; @@ -685,7 +705,6 @@ ret = client_io(&dummy, &event); CX_TEST_ASSERT(!client->error); - CX_TEST_ASSERT(ret == 1); } CX_TEST_ASSERT(response_str_pos == response_str_len); CX_TEST_ASSERT(testr.status == 200); @@ -769,9 +788,9 @@ client->request_body_read = test_request_body_read; client->request_body_read_userdata = &req; - memset(client->req_buffer, '_', client->req_buffer_alloc); - client->req_buffer_pos = 0; - client->req_buffer_len = 0; + memset(client->transfer_buffer, '_', client->transfer_buffer_alloc); + client->transfer_buffer_pos = 0; + client->transfer_buffer_len = 0; // send the first 128 bytes while(req.cur_reads <= req.max_reads) { @@ -840,8 +859,59 @@ } } +static CX_TEST(test_http_client_read_response_head) { + CX_TEST_DO { + EventHandler dummy; + HttpClient *client = http_client_new(&dummy); + create_req_buffer(client); + client->req_content_length = -1; + + int fds[2]; + util_socketpair(fds); + util_socket_setnonblock(fds[0], 1); + util_socket_setnonblock(fds[1], 1); + client->socketfd = fds[0]; + int sock = fds[1]; + + // test + char *response_str = + "HTTP/1.1 204 OK\r\n" + "Host: localhost\r\n" + "Content-length: 0\r\n" + "\r\n"; + + size_t response_len = strlen(response_str); + size_t response_pos = 0; + while(response_pos < response_len) { + size_t nbytes = response_len - response_pos; + ssize_t w = write(sock, response_str + response_pos, nbytes); + if(w > 0) { + response_pos += w; + } + + if(!client->response_header_complete) { + int ret = client_read_response_header(client); + CX_TEST_ASSERT(client->error == 0); + if(ret == 1) { + continue; + } + + CX_TEST_ASSERT(client->stream == NULL); + } + + break; + } + + // cleanup + close(fds[0]); + close(fds[1]); + http_client_free(client); + } +} + void http_client_add_tests(CxTestSuite *suite) { cx_test_register(suite, test_http_client_send_request); cx_test_register(suite, test_http_client_send_request_body_chunked); + cx_test_register(suite, test_http_client_read_response_head); cx_test_register(suite, test_http_client_io_simple); } diff -r db37761a8494 -r 48da20bde908 src/server/proxy/httpclient.h --- a/src/server/proxy/httpclient.h Wed Feb 18 12:31:19 2026 +0100 +++ b/src/server/proxy/httpclient.h Wed Feb 18 13:41:10 2026 +0100 @@ -122,10 +122,11 @@ HttpParser *parser; netbuf buffer; - char *req_buffer; - size_t req_buffer_alloc; - size_t req_buffer_len; - size_t req_buffer_pos; + char *transfer_buffer; + size_t transfer_buffer_alloc; + size_t transfer_buffer_len; + size_t transfer_buffer_pos; + size_t req_contentlength_pos; int request_body_complete; diff -r db37761a8494 -r 48da20bde908 src/server/util/io.c --- a/src/server/util/io.c Wed Feb 18 12:31:19 2026 +0100 +++ b/src/server/util/io.c Wed Feb 18 13:41:10 2026 +0100 @@ -77,6 +77,7 @@ NULL, (io_setmode_f)net_sys_setmode, (io_poll_f)net_sys_poll, + (io_free_f)sysstream_free, 0, 0 }; @@ -90,6 +91,7 @@ (io_finish_f)net_http_finish, (io_setmode_f)net_http_setmode, (io_poll_f)net_http_poll, + (io_free_f)httpstream_free, 0, IO_STREAM_TYPE_HTTP }; @@ -103,6 +105,7 @@ (io_finish_f)net_ssl_finish, (io_setmode_f)net_ssl_setmode, (io_poll_f)net_ssl_poll, + (io_free_f)sslstream_free, 0, IO_STREAM_TYPE_SSL }; @@ -120,10 +123,16 @@ IOStream* Sysstream_new(pool_handle_t *pool, SYS_SOCKET fd) { Sysstream *st = pool_malloc(pool, sizeof(Sysstream)); st->st = native_io_funcs; + st->pool = pool; st->fd = fd; return (IOStream*)st; } +void sysstream_free(IOStream *st) { + Sysstream *sys = (Sysstream*)st; + pool_free(sys->pool, st); +} + #ifdef XP_UNIX ssize_t net_sys_write(Sysstream *st, const void *buf, size_t nbytes) { ssize_t r = write(st->fd, buf, nbytes); @@ -288,6 +297,12 @@ return (IOStream*)st; } +void httpstream_free(IOStream *st) { + HttpStream *http = (HttpStream*)st; + http->fd->free(http->fd); + pool_free(http->pool, st); +} + int httpstream_enable_chunked_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos) { if(st->read != (io_read_f)net_http_read) { log_ereport(LOG_FAILURE, "%s", "httpstream_enable_chunked_read: IOStream is not an HttpStream"); @@ -812,6 +827,12 @@ return (IOStream*)st; } +void sslstream_free(IOStream *st) { + SSLStream *ssl = (SSLStream*)st; + SSL_free(ssl->ssl); + pool_free(ssl->pool, st); +} + ssize_t net_ssl_write(SSLStream *st, const void *buf, size_t nbytes) { int ret = SSL_write(st->ssl, buf, nbytes); if(ret <= 0) { diff -r db37761a8494 -r 48da20bde908 src/server/util/io.h --- a/src/server/util/io.h Wed Feb 18 12:31:19 2026 +0100 +++ b/src/server/util/io.h Wed Feb 18 13:41:10 2026 +0100 @@ -70,6 +70,7 @@ typedef void(*io_finish_f)(IOStream *); typedef void(*io_setmode_f)(IOStream *, int); typedef int(*io_poll_f)(IOStream *, EventHandler *, int, Event *); +typedef void (*io_free_f)(IOStream *); struct IOStream { io_write_f write; @@ -80,12 +81,14 @@ io_finish_f finish; io_setmode_f setmode; io_poll_f poll; + io_free_f free; int io_errno; unsigned int type; }; struct Sysstream { IOStream st; + pool_handle_t *pool; #ifdef XP_UNIX int fd; #elif defined(XP_WIN32) @@ -97,6 +100,7 @@ struct HttpStream { IOStream st; IOStream *fd; + pool_handle_t *pool; uint64_t written; /* @@ -181,6 +185,7 @@ typedef struct SSLStream { IOStream st; SSL *ssl; + pool_handle_t *pool; int error; } SSLStream; @@ -188,6 +193,7 @@ /* system stream */ IOStream* Sysstream_new(pool_handle_t *pool, SYS_SOCKET fd); +void sysstream_free(IOStream *st); ssize_t net_sys_write(Sysstream *st, const void *buf, size_t nbytes); ssize_t net_sys_writev(Sysstream *st, struct iovec *iovec, int iovcnt); @@ -199,6 +205,7 @@ /* http stream */ IOStream* httpstream_new(pool_handle_t *pool, IOStream *fd); +void httpstream_free(IOStream *st); int httpstream_enable_chunked_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos); int httpstream_enable_buffered_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos); @@ -222,6 +229,7 @@ /* ssl stream */ IOStream* sslstream_new(pool_handle_t *pool, SSL *ssl); +void sslstream_free(IOStream *st); ssize_t net_ssl_write(SSLStream *st, const void *buf, size_t nbytes); ssize_t net_ssl_writev(SSLStream *st, struct iovec *iovec, int iovcnt);