# HG changeset patch # User Olaf Wintermann # Date 1772819423 -3600 # Node ID c4c2b8e8ddc57878efac4980092c1043b00bfcbf # Parent 9e98618464ae37d592180d34032af0d0111694c8 add test_http_client_ws_msg_in diff -r 9e98618464ae -r c4c2b8e8ddc5 src/server/proxy/httpclient.c --- a/src/server/proxy/httpclient.c Thu Mar 05 21:39:39 2026 +0100 +++ b/src/server/proxy/httpclient.c Fri Mar 06 18:50:23 2026 +0100 @@ -385,6 +385,42 @@ return 0; } +static int client_ws_write(HttpClient *client) { + if(!client->ws_write) { + client->transfer2_buffer_pos = client->transfer2_buffer_len; + return 0; + } + + int ret = 0; + + // pass all data from the transfer2 buffer to the ws_write callback + char *out = client->transfer2_buffer + client->transfer2_buffer_pos; + size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; + while(nbytes > 0) { + ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata); + if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { + ret = 1; + break; + } else if(w <= 0) { + client->error = 1; + return 0; + } + client->transfer2_buffer_pos += w; + + // adjust buffer + out = client->transfer2_buffer + client->transfer2_buffer_pos; + nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; + } + + // clear buffer + if(client->transfer2_buffer_pos == client->transfer2_buffer_len) { + client->transfer2_buffer_pos = 0; + client->transfer2_buffer_len = 0; + } + + return ret; +} + static int client_ws_process(HttpClient *client, Event *event) { // send available data from the transfer buffer int ret = client_send_buf(client); @@ -400,36 +436,24 @@ client->transfer_buffer_len = 0; } + // flush transfer2 buffer + if(client_ws_write(client)) { + return 1; + } + // read message char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; + ssize_t r = -1; while(available > 0) { - ssize_t r = net_read(client->stream, buf, available); + r = net_read(client->stream, buf, available); if(r <= 0) { break; } client->transfer2_buffer_len += r; - if(client->ws_write) { - char *out = client->transfer2_buffer + client->transfer2_buffer_pos; - size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; - while(nbytes > 0) { - ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata); - if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { - break; - } else if(w <= 0) { - client->error = 1; - return 0; - } - client->transfer2_buffer_pos += w; - - // adjust buffer - out = client->transfer2_buffer + client->transfer2_buffer_pos; - nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; - } - } else { - // noop - client->transfer2_buffer_pos = client->transfer2_buffer_len; + if(client_ws_write(client)) { + break; } // adjust buffer @@ -688,7 +712,7 @@ if(!http) { fd->free(fd); } - http->fd->setmode((IOStream*)http, IO_MODE_RAW); + http->raw = WS_TRUE; if(client->buffer.pos < client->buffer.cursize) { // bytes remaining in the buffer -> enable buffered reading httpstream_enable_buffered_read( @@ -1575,6 +1599,81 @@ } } +static ssize_t test_ws_write(HttpClient *client, void *buf, size_t nbytes, void *userdata) { + CxBuffer *out = (CxBuffer*)userdata; + return cxBufferWrite(buf, 1, nbytes, out); +} + +static CX_TEST(test_http_client_ws_msg_in) { + CX_TEST_DO { + EventHandler dummy; + HttpClient *client = http_client_new(&dummy); + + int fds[2]; + util_socketpair(fds); + util_socket_setnonblock(fds[0], 1); + util_socket_setnonblock(fds[1], 1); + client->socketfd = fds[0]; + client->event.cookie = client; + int sock = fds[1]; + + create_req_buffer(client); + client->transfer_buffer_len = 0; + client->transfer_buffer_pos = 0; + client->transfer2_buffer_alloc = 1024*128; + client->transfer2_buffer = malloc(client->transfer2_buffer_alloc); + + IOStream *fd = Sysstream_new(NULL, client->socketfd); + HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); + http->raw = WS_TRUE; + client->stream = http; + + size_t str_nbytes = 1024*1024*64; + char *str = malloc(str_nbytes); + // init the buffer with random data + for(size_t i=0;iws_write = test_ws_write; + client->ws_write_userdata = out; + + while(out->size < str_nbytes) { + size_t slen = str_nbytes - str_pos; + if(slen > 64*1024) { + slen = 64*1024; + } + + if(slen > 0) { + ssize_t w = write(sock, str + str_pos, slen); + if(w > 0) { + str_pos += w; + } + } + + // webosocket IO: this should read from the other socket and + // call the ws_write callback + int ret = client_ws_io(&dummy, &client->event); + CX_TEST_ASSERT(client->error == 0); + + // run client_ws_io again, it should do nothing + ret = client_ws_io(&dummy, &client->event); + CX_TEST_ASSERT(ret == 1); // would block + } + + // http_client_add_message should block at least once + CX_TEST_ASSERT(out->pos == str_nbytes); + CX_TEST_ASSERT(!memcmp(out->space, str, str_nbytes)); + + cxBufferFree(out); + http_client_free(client); + close(sock); + } +} + void http_client_add_tests(CxTestSuite *suite) { cx_test_register(suite, test_http_client_send_request); cx_test_register(suite, test_http_client_send_request_body_chunked); @@ -1600,4 +1699,5 @@ cx_test_register(suite, test_http_client_io_write_blsz8_error2); cx_test_register(suite, test_http_client_io_write_blsz8_error3); cx_test_register(suite, test_http_client_ws_msg_out); + cx_test_register(suite, test_http_client_ws_msg_in); } diff -r 9e98618464ae -r c4c2b8e8ddc5 src/server/util/io.c --- a/src/server/util/io.c Thu Mar 05 21:39:39 2026 +0100 +++ b/src/server/util/io.c Fri Mar 06 18:50:23 2026 +0100 @@ -543,16 +543,8 @@ } } -ssize_t net_http_write_raw(HttpStream *st, const void *buf, size_t nbytes) { - return st->fd->write(st->fd, buf, nbytes); -} - -ssize_t net_http_writev_raw(HttpStream *st, struct iovec *iovec, int iovcnt) { - return st->fd->writev(st->fd, iovec, iovcnt); -} - ssize_t net_http_read(HttpStream *st, void *buf, size_t nbytes) { - if(st->read >= st->max_read) { + if(st->read >= st->max_read && !st->raw) { st->read_eof = WS_TRUE; return 0; }