Thu, 05 Mar 2026 21:39:39 +0100
add httpclient test for sending websocket messages
| src/server/proxy/httpclient.c | file | annotate | diff | comparison | revisions |
--- a/src/server/proxy/httpclient.c Thu Mar 05 21:06:41 2026 +0100 +++ b/src/server/proxy/httpclient.c Thu Mar 05 21:39:39 2026 +0100 @@ -265,6 +265,7 @@ size = available; } memcpy(client->transfer_buffer + client->transfer_buffer_len, buf, size); + client->transfer_buffer_len += size; return size; } @@ -391,10 +392,12 @@ return 0; } // readiness notification - if(ret == 0 && client->ws_msg_ready) { - if(client->ws_msg_ready(client, client->ws_msg_ready_userdata)) { + if(ret == 0) { + if(client->ws_msg_ready && client->ws_msg_ready(client, client->ws_msg_ready_userdata)) { return 0; } + client->transfer_buffer_pos = 0; + client->transfer_buffer_len = 0; } // read message @@ -685,6 +688,7 @@ if(!http) { fd->free(fd); } + http->fd->setmode((IOStream*)http, IO_MODE_RAW); if(client->buffer.pos < client->buffer.cursize) { // bytes remaining in the buffer -> enable buffered reading httpstream_enable_buffered_read( @@ -1494,6 +1498,83 @@ } } +static CX_TEST(test_http_client_ws_msg_out) { + CX_TEST_DO { + EventHandler dummy; + HttpClient *client = http_client_new(&dummy); + + int fds[2]; + util_socketpair(fds); + util_socket_setnonblock(fds[0], 1); + util_socket_setnonblock(fds[1], 1); + client->socketfd = fds[0]; + client->event.cookie = client; + int sock = fds[1]; + + create_req_buffer(client); + client->transfer_buffer_len = 0; + client->transfer_buffer_pos = 0; + + IOStream *fd = Sysstream_new(NULL, client->socketfd); + HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); + http->fd->setmode((IOStream*)http, IO_MODE_RAW); + client->stream = http; + + size_t out_nbytes = 1024*1024*64; + char *outbuf = malloc(out_nbytes); + // init the buffer with random data + for(size_t i=0;i<out_nbytes;i+=sizeof(int)) { + int *p = (int*)(outbuf+i); + *p = rand(); + } + size_t out_pos = 0; + + CxBuffer *inbuf = cxBufferCreate(NULL, NULL, out_nbytes, CX_BUFFER_FREE_CONTENTS); + + WSBool add_message_would_block = FALSE; + + + while(inbuf->pos < out_nbytes) { + // add outbuf data to message buffer + char *msg = outbuf + out_pos; + size_t msglen = out_nbytes - out_pos; + while(msglen > 0) { + int ret = http_client_add_message(client, msg, msglen); + if(ret == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { + add_message_would_block = TRUE; + break; + } + out_pos += ret; + msg = outbuf + out_pos; + msglen = out_nbytes - out_pos; + } + + // send message buffer to the socket + int ret = client_ws_io(&dummy, &client->event); + CX_TEST_ASSERT(client->error == 0); + + // run client_ws_io again, it should do nothing + ret = client_ws_io(&dummy, &client->event); + CX_TEST_ASSERT(ret == 1); // would block + + ssize_t r = read(sock, inbuf->space + inbuf->pos, inbuf->capacity - inbuf->pos); + if(r > 0) { + inbuf->pos += r; + inbuf->size += r; + } + } + + // http_client_add_message should block at least once + CX_TEST_ASSERT(add_message_would_block); + CX_TEST_ASSERT(inbuf->pos == out_nbytes); + CX_TEST_ASSERT(!memcmp(inbuf->space, outbuf, out_nbytes)); + + cxBufferFree(inbuf); + http_client_free(client); + close(sock); + } +} + void http_client_add_tests(CxTestSuite *suite) { cx_test_register(suite, test_http_client_send_request); cx_test_register(suite, test_http_client_send_request_body_chunked); @@ -1518,4 +1599,5 @@ cx_test_register(suite, test_http_client_io_write_blsz8_error1); cx_test_register(suite, test_http_client_io_write_blsz8_error2); cx_test_register(suite, test_http_client_io_write_blsz8_error3); + cx_test_register(suite, test_http_client_ws_msg_out); }