| 389 int ret = client_send_buf(client); |
390 int ret = client_send_buf(client); |
| 390 if(client->error) { |
391 if(client->error) { |
| 391 return 0; |
392 return 0; |
| 392 } |
393 } |
| 393 // readiness notification |
394 // readiness notification |
| 394 if(ret == 0 && client->ws_msg_ready) { |
395 if(ret == 0) { |
| 395 if(client->ws_msg_ready(client, client->ws_msg_ready_userdata)) { |
396 if(client->ws_msg_ready && client->ws_msg_ready(client, client->ws_msg_ready_userdata)) { |
| 396 return 0; |
397 return 0; |
| 397 } |
398 } |
| |
399 client->transfer_buffer_pos = 0; |
| |
400 client->transfer_buffer_len = 0; |
| 398 } |
401 } |
| 399 |
402 |
| 400 // read message |
403 // read message |
| 401 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; |
404 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; |
| 402 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; |
405 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; |
| 683 } |
686 } |
| 684 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
687 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
| 685 if(!http) { |
688 if(!http) { |
| 686 fd->free(fd); |
689 fd->free(fd); |
| 687 } |
690 } |
| |
691 http->fd->setmode((IOStream*)http, IO_MODE_RAW); |
| 688 if(client->buffer.pos < client->buffer.cursize) { |
692 if(client->buffer.pos < client->buffer.cursize) { |
| 689 // bytes remaining in the buffer -> enable buffered reading |
693 // bytes remaining in the buffer -> enable buffered reading |
| 690 httpstream_enable_buffered_read( |
694 httpstream_enable_buffered_read( |
| 691 &http->st, |
695 &http->st, |
| 692 (char*)client->buffer.inbuf, |
696 (char*)client->buffer.inbuf, |
| 1489 } |
1493 } |
| 1490 |
1494 |
| 1491 static CX_TEST(test_http_client_io_write_blsz8_error3) { |
1495 static CX_TEST(test_http_client_io_write_blsz8_error3) { |
| 1492 CX_TEST_DO { |
1496 CX_TEST_DO { |
| 1493 CX_TEST_CALL_SUBROUTINE(test_http_client_io_simple, 8, 3); |
1497 CX_TEST_CALL_SUBROUTINE(test_http_client_io_simple, 8, 3); |
| |
1498 } |
| |
1499 } |
| |
1500 |
| |
1501 static CX_TEST(test_http_client_ws_msg_out) { |
| |
1502 CX_TEST_DO { |
| |
1503 EventHandler dummy; |
| |
1504 HttpClient *client = http_client_new(&dummy); |
| |
1505 |
| |
1506 int fds[2]; |
| |
1507 util_socketpair(fds); |
| |
1508 util_socket_setnonblock(fds[0], 1); |
| |
1509 util_socket_setnonblock(fds[1], 1); |
| |
1510 client->socketfd = fds[0]; |
| |
1511 client->event.cookie = client; |
| |
1512 int sock = fds[1]; |
| |
1513 |
| |
1514 create_req_buffer(client); |
| |
1515 client->transfer_buffer_len = 0; |
| |
1516 client->transfer_buffer_pos = 0; |
| |
1517 |
| |
1518 IOStream *fd = Sysstream_new(NULL, client->socketfd); |
| |
1519 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
| |
1520 http->fd->setmode((IOStream*)http, IO_MODE_RAW); |
| |
1521 client->stream = http; |
| |
1522 |
| |
1523 size_t out_nbytes = 1024*1024*64; |
| |
1524 char *outbuf = malloc(out_nbytes); |
| |
1525 // init the buffer with random data |
| |
1526 for(size_t i=0;i<out_nbytes;i+=sizeof(int)) { |
| |
1527 int *p = (int*)(outbuf+i); |
| |
1528 *p = rand(); |
| |
1529 } |
| |
1530 size_t out_pos = 0; |
| |
1531 |
| |
1532 CxBuffer *inbuf = cxBufferCreate(NULL, NULL, out_nbytes, CX_BUFFER_FREE_CONTENTS); |
| |
1533 |
| |
1534 WSBool add_message_would_block = FALSE; |
| |
1535 |
| |
1536 |
| |
1537 while(inbuf->pos < out_nbytes) { |
| |
1538 // add outbuf data to message buffer |
| |
1539 char *msg = outbuf + out_pos; |
| |
1540 size_t msglen = out_nbytes - out_pos; |
| |
1541 while(msglen > 0) { |
| |
1542 int ret = http_client_add_message(client, msg, msglen); |
| |
1543 if(ret == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { |
| |
1544 add_message_would_block = TRUE; |
| |
1545 break; |
| |
1546 } |
| |
1547 out_pos += ret; |
| |
1548 msg = outbuf + out_pos; |
| |
1549 msglen = out_nbytes - out_pos; |
| |
1550 } |
| |
1551 |
| |
1552 // send message buffer to the socket |
| |
1553 int ret = client_ws_io(&dummy, &client->event); |
| |
1554 CX_TEST_ASSERT(client->error == 0); |
| |
1555 |
| |
1556 // run client_ws_io again, it should do nothing |
| |
1557 ret = client_ws_io(&dummy, &client->event); |
| |
1558 CX_TEST_ASSERT(ret == 1); // would block |
| |
1559 |
| |
1560 ssize_t r = read(sock, inbuf->space + inbuf->pos, inbuf->capacity - inbuf->pos); |
| |
1561 if(r > 0) { |
| |
1562 inbuf->pos += r; |
| |
1563 inbuf->size += r; |
| |
1564 } |
| |
1565 } |
| |
1566 |
| |
1567 // http_client_add_message should block at least once |
| |
1568 CX_TEST_ASSERT(add_message_would_block); |
| |
1569 CX_TEST_ASSERT(inbuf->pos == out_nbytes); |
| |
1570 CX_TEST_ASSERT(!memcmp(inbuf->space, outbuf, out_nbytes)); |
| |
1571 |
| |
1572 cxBufferFree(inbuf); |
| |
1573 http_client_free(client); |
| |
1574 close(sock); |
| 1494 } |
1575 } |
| 1495 } |
1576 } |
| 1496 |
1577 |
| 1497 void http_client_add_tests(CxTestSuite *suite) { |
1578 void http_client_add_tests(CxTestSuite *suite) { |
| 1498 cx_test_register(suite, test_http_client_send_request); |
1579 cx_test_register(suite, test_http_client_send_request); |
| 1516 cx_test_register(suite, test_http_client_io_write_error2); |
1597 cx_test_register(suite, test_http_client_io_write_error2); |
| 1517 cx_test_register(suite, test_http_client_io_write_error3); |
1598 cx_test_register(suite, test_http_client_io_write_error3); |
| 1518 cx_test_register(suite, test_http_client_io_write_blsz8_error1); |
1599 cx_test_register(suite, test_http_client_io_write_blsz8_error1); |
| 1519 cx_test_register(suite, test_http_client_io_write_blsz8_error2); |
1600 cx_test_register(suite, test_http_client_io_write_blsz8_error2); |
| 1520 cx_test_register(suite, test_http_client_io_write_blsz8_error3); |
1601 cx_test_register(suite, test_http_client_io_write_blsz8_error3); |
| 1521 } |
1602 cx_test_register(suite, test_http_client_ws_msg_out); |
| |
1603 } |