src/server/proxy/httpclient.c

changeset 718
9e98618464ae
parent 717
2edcb361b8be
equal deleted inserted replaced
717:2edcb361b8be 718:9e98618464ae
263 } 263 }
264 if(size > available) { 264 if(size > available) {
265 size = available; 265 size = available;
266 } 266 }
267 memcpy(client->transfer_buffer + client->transfer_buffer_len, buf, size); 267 memcpy(client->transfer_buffer + client->transfer_buffer_len, buf, size);
268 client->transfer_buffer_len += size;
268 return size; 269 return size;
269 } 270 }
270 271
271 static int create_req_buffer(HttpClient *client) { 272 static int create_req_buffer(HttpClient *client) {
272 CxBuffer buf; 273 CxBuffer buf;
389 int ret = client_send_buf(client); 390 int ret = client_send_buf(client);
390 if(client->error) { 391 if(client->error) {
391 return 0; 392 return 0;
392 } 393 }
393 // readiness notification 394 // readiness notification
394 if(ret == 0 && client->ws_msg_ready) { 395 if(ret == 0) {
395 if(client->ws_msg_ready(client, client->ws_msg_ready_userdata)) { 396 if(client->ws_msg_ready && client->ws_msg_ready(client, client->ws_msg_ready_userdata)) {
396 return 0; 397 return 0;
397 } 398 }
399 client->transfer_buffer_pos = 0;
400 client->transfer_buffer_len = 0;
398 } 401 }
399 402
400 // read message 403 // read message
401 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; 404 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos;
402 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; 405 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
683 } 686 }
684 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); 687 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd);
685 if(!http) { 688 if(!http) {
686 fd->free(fd); 689 fd->free(fd);
687 } 690 }
691 http->fd->setmode((IOStream*)http, IO_MODE_RAW);
688 if(client->buffer.pos < client->buffer.cursize) { 692 if(client->buffer.pos < client->buffer.cursize) {
689 // bytes remaining in the buffer -> enable buffered reading 693 // bytes remaining in the buffer -> enable buffered reading
690 httpstream_enable_buffered_read( 694 httpstream_enable_buffered_read(
691 &http->st, 695 &http->st,
692 (char*)client->buffer.inbuf, 696 (char*)client->buffer.inbuf,
1489 } 1493 }
1490 1494
1491 static CX_TEST(test_http_client_io_write_blsz8_error3) { 1495 static CX_TEST(test_http_client_io_write_blsz8_error3) {
1492 CX_TEST_DO { 1496 CX_TEST_DO {
1493 CX_TEST_CALL_SUBROUTINE(test_http_client_io_simple, 8, 3); 1497 CX_TEST_CALL_SUBROUTINE(test_http_client_io_simple, 8, 3);
1498 }
1499 }
1500
1501 static CX_TEST(test_http_client_ws_msg_out) {
1502 CX_TEST_DO {
1503 EventHandler dummy;
1504 HttpClient *client = http_client_new(&dummy);
1505
1506 int fds[2];
1507 util_socketpair(fds);
1508 util_socket_setnonblock(fds[0], 1);
1509 util_socket_setnonblock(fds[1], 1);
1510 client->socketfd = fds[0];
1511 client->event.cookie = client;
1512 int sock = fds[1];
1513
1514 create_req_buffer(client);
1515 client->transfer_buffer_len = 0;
1516 client->transfer_buffer_pos = 0;
1517
1518 IOStream *fd = Sysstream_new(NULL, client->socketfd);
1519 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd);
1520 http->fd->setmode((IOStream*)http, IO_MODE_RAW);
1521 client->stream = http;
1522
1523 size_t out_nbytes = 1024*1024*64;
1524 char *outbuf = malloc(out_nbytes);
1525 // init the buffer with random data
1526 for(size_t i=0;i<out_nbytes;i+=sizeof(int)) {
1527 int *p = (int*)(outbuf+i);
1528 *p = rand();
1529 }
1530 size_t out_pos = 0;
1531
1532 CxBuffer *inbuf = cxBufferCreate(NULL, NULL, out_nbytes, CX_BUFFER_FREE_CONTENTS);
1533
1534 WSBool add_message_would_block = FALSE;
1535
1536
1537 while(inbuf->pos < out_nbytes) {
1538 // add outbuf data to message buffer
1539 char *msg = outbuf + out_pos;
1540 size_t msglen = out_nbytes - out_pos;
1541 while(msglen > 0) {
1542 int ret = http_client_add_message(client, msg, msglen);
1543 if(ret == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
1544 add_message_would_block = TRUE;
1545 break;
1546 }
1547 out_pos += ret;
1548 msg = outbuf + out_pos;
1549 msglen = out_nbytes - out_pos;
1550 }
1551
1552 // send message buffer to the socket
1553 int ret = client_ws_io(&dummy, &client->event);
1554 CX_TEST_ASSERT(client->error == 0);
1555
1556 // run client_ws_io again, it should do nothing
1557 ret = client_ws_io(&dummy, &client->event);
1558 CX_TEST_ASSERT(ret == 1); // would block
1559
1560 ssize_t r = read(sock, inbuf->space + inbuf->pos, inbuf->capacity - inbuf->pos);
1561 if(r > 0) {
1562 inbuf->pos += r;
1563 inbuf->size += r;
1564 }
1565 }
1566
1567 // http_client_add_message should block at least once
1568 CX_TEST_ASSERT(add_message_would_block);
1569 CX_TEST_ASSERT(inbuf->pos == out_nbytes);
1570 CX_TEST_ASSERT(!memcmp(inbuf->space, outbuf, out_nbytes));
1571
1572 cxBufferFree(inbuf);
1573 http_client_free(client);
1574 close(sock);
1494 } 1575 }
1495 } 1576 }
1496 1577
1497 void http_client_add_tests(CxTestSuite *suite) { 1578 void http_client_add_tests(CxTestSuite *suite) {
1498 cx_test_register(suite, test_http_client_send_request); 1579 cx_test_register(suite, test_http_client_send_request);
1516 cx_test_register(suite, test_http_client_io_write_error2); 1597 cx_test_register(suite, test_http_client_io_write_error2);
1517 cx_test_register(suite, test_http_client_io_write_error3); 1598 cx_test_register(suite, test_http_client_io_write_error3);
1518 cx_test_register(suite, test_http_client_io_write_blsz8_error1); 1599 cx_test_register(suite, test_http_client_io_write_blsz8_error1);
1519 cx_test_register(suite, test_http_client_io_write_blsz8_error2); 1600 cx_test_register(suite, test_http_client_io_write_blsz8_error2);
1520 cx_test_register(suite, test_http_client_io_write_blsz8_error3); 1601 cx_test_register(suite, test_http_client_io_write_blsz8_error3);
1521 } 1602 cx_test_register(suite, test_http_client_ws_msg_out);
1603 }

mercurial