| 383 } |
383 } |
| 384 |
384 |
| 385 return 0; |
385 return 0; |
| 386 } |
386 } |
| 387 |
387 |
| |
388 static int client_ws_write(HttpClient *client) { |
| |
389 if(!client->ws_write) { |
| |
390 client->transfer2_buffer_pos = client->transfer2_buffer_len; |
| |
391 return 0; |
| |
392 } |
| |
393 |
| |
394 int ret = 0; |
| |
395 |
| |
396 // pass all data from the transfer2 buffer to the ws_write callback |
| |
397 char *out = client->transfer2_buffer + client->transfer2_buffer_pos; |
| |
398 size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; |
| |
399 while(nbytes > 0) { |
| |
400 ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata); |
| |
401 if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { |
| |
402 ret = 1; |
| |
403 break; |
| |
404 } else if(w <= 0) { |
| |
405 client->error = 1; |
| |
406 return 0; |
| |
407 } |
| |
408 client->transfer2_buffer_pos += w; |
| |
409 |
| |
410 // adjust buffer |
| |
411 out = client->transfer2_buffer + client->transfer2_buffer_pos; |
| |
412 nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; |
| |
413 } |
| |
414 |
| |
415 // clear buffer |
| |
416 if(client->transfer2_buffer_pos == client->transfer2_buffer_len) { |
| |
417 client->transfer2_buffer_pos = 0; |
| |
418 client->transfer2_buffer_len = 0; |
| |
419 } |
| |
420 |
| |
421 return ret; |
| |
422 } |
| |
423 |
| 388 static int client_ws_process(HttpClient *client, Event *event) { |
424 static int client_ws_process(HttpClient *client, Event *event) { |
| 389 // send available data from the transfer buffer |
425 // send available data from the transfer buffer |
| 390 int ret = client_send_buf(client); |
426 int ret = client_send_buf(client); |
| 391 if(client->error) { |
427 if(client->error) { |
| 392 return 0; |
428 return 0; |
| 398 } |
434 } |
| 399 client->transfer_buffer_pos = 0; |
435 client->transfer_buffer_pos = 0; |
| 400 client->transfer_buffer_len = 0; |
436 client->transfer_buffer_len = 0; |
| 401 } |
437 } |
| 402 |
438 |
| |
439 // flush transfer2 buffer |
| |
440 if(client_ws_write(client)) { |
| |
441 return 1; |
| |
442 } |
| |
443 |
| 403 // read message |
444 // read message |
| 404 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; |
445 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; |
| 405 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; |
446 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; |
| |
447 |
| 406 ssize_t r = -1; |
448 ssize_t r = -1; |
| 407 while(available > 0) { |
449 while(available > 0) { |
| 408 ssize_t r = net_read(client->stream, buf, available); |
450 r = net_read(client->stream, buf, available); |
| 409 if(r <= 0) { |
451 if(r <= 0) { |
| 410 break; |
452 break; |
| 411 } |
453 } |
| 412 client->transfer2_buffer_len += r; |
454 client->transfer2_buffer_len += r; |
| 413 if(client->ws_write) { |
455 if(client_ws_write(client)) { |
| 414 char *out = client->transfer2_buffer + client->transfer2_buffer_pos; |
456 break; |
| 415 size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; |
|
| 416 while(nbytes > 0) { |
|
| 417 ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata); |
|
| 418 if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { |
|
| 419 break; |
|
| 420 } else if(w <= 0) { |
|
| 421 client->error = 1; |
|
| 422 return 0; |
|
| 423 } |
|
| 424 client->transfer2_buffer_pos += w; |
|
| 425 |
|
| 426 // adjust buffer |
|
| 427 out = client->transfer2_buffer + client->transfer2_buffer_pos; |
|
| 428 nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos; |
|
| 429 } |
|
| 430 } else { |
|
| 431 // noop |
|
| 432 client->transfer2_buffer_pos = client->transfer2_buffer_len; |
|
| 433 } |
457 } |
| 434 |
458 |
| 435 // adjust buffer |
459 // adjust buffer |
| 436 buf = client->transfer2_buffer + client->transfer2_buffer_pos; |
460 buf = client->transfer2_buffer + client->transfer2_buffer_pos; |
| 437 available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; |
461 available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; |
| 686 } |
710 } |
| 687 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
711 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
| 688 if(!http) { |
712 if(!http) { |
| 689 fd->free(fd); |
713 fd->free(fd); |
| 690 } |
714 } |
| 691 http->fd->setmode((IOStream*)http, IO_MODE_RAW); |
715 http->raw = WS_TRUE; |
| 692 if(client->buffer.pos < client->buffer.cursize) { |
716 if(client->buffer.pos < client->buffer.cursize) { |
| 693 // bytes remaining in the buffer -> enable buffered reading |
717 // bytes remaining in the buffer -> enable buffered reading |
| 694 httpstream_enable_buffered_read( |
718 httpstream_enable_buffered_read( |
| 695 &http->st, |
719 &http->st, |
| 696 (char*)client->buffer.inbuf, |
720 (char*)client->buffer.inbuf, |
| 1568 CX_TEST_ASSERT(add_message_would_block); |
1592 CX_TEST_ASSERT(add_message_would_block); |
| 1569 CX_TEST_ASSERT(inbuf->pos == out_nbytes); |
1593 CX_TEST_ASSERT(inbuf->pos == out_nbytes); |
| 1570 CX_TEST_ASSERT(!memcmp(inbuf->space, outbuf, out_nbytes)); |
1594 CX_TEST_ASSERT(!memcmp(inbuf->space, outbuf, out_nbytes)); |
| 1571 |
1595 |
| 1572 cxBufferFree(inbuf); |
1596 cxBufferFree(inbuf); |
| |
1597 http_client_free(client); |
| |
1598 close(sock); |
| |
1599 } |
| |
1600 } |
| |
1601 |
| |
1602 static ssize_t test_ws_write(HttpClient *client, void *buf, size_t nbytes, void *userdata) { |
| |
1603 CxBuffer *out = (CxBuffer*)userdata; |
| |
1604 return cxBufferWrite(buf, 1, nbytes, out); |
| |
1605 } |
| |
1606 |
| |
1607 static CX_TEST(test_http_client_ws_msg_in) { |
| |
1608 CX_TEST_DO { |
| |
1609 EventHandler dummy; |
| |
1610 HttpClient *client = http_client_new(&dummy); |
| |
1611 |
| |
1612 int fds[2]; |
| |
1613 util_socketpair(fds); |
| |
1614 util_socket_setnonblock(fds[0], 1); |
| |
1615 util_socket_setnonblock(fds[1], 1); |
| |
1616 client->socketfd = fds[0]; |
| |
1617 client->event.cookie = client; |
| |
1618 int sock = fds[1]; |
| |
1619 |
| |
1620 create_req_buffer(client); |
| |
1621 client->transfer_buffer_len = 0; |
| |
1622 client->transfer_buffer_pos = 0; |
| |
1623 client->transfer2_buffer_alloc = 1024*128; |
| |
1624 client->transfer2_buffer = malloc(client->transfer2_buffer_alloc); |
| |
1625 |
| |
1626 IOStream *fd = Sysstream_new(NULL, client->socketfd); |
| |
1627 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
| |
1628 http->raw = WS_TRUE; |
| |
1629 client->stream = http; |
| |
1630 |
| |
1631 size_t str_nbytes = 1024*1024*64; |
| |
1632 char *str = malloc(str_nbytes); |
| |
1633 // init the buffer with random data |
| |
1634 for(size_t i=0;i<str_nbytes;i+=sizeof(int)) { |
| |
1635 int *p = (int*)(str+i); |
| |
1636 *p = rand(); |
| |
1637 } |
| |
1638 size_t str_pos = 0; |
| |
1639 |
| |
1640 CxBuffer *out = cxBufferCreate(NULL, NULL, str_nbytes, CX_BUFFER_AUTO_EXTEND|CX_BUFFER_FREE_CONTENTS); |
| |
1641 client->ws_write = test_ws_write; |
| |
1642 client->ws_write_userdata = out; |
| |
1643 |
| |
1644 while(out->size < str_nbytes) { |
| |
1645 size_t slen = str_nbytes - str_pos; |
| |
1646 if(slen > 64*1024) { |
| |
1647 slen = 64*1024; |
| |
1648 } |
| |
1649 |
| |
1650 if(slen > 0) { |
| |
1651 ssize_t w = write(sock, str + str_pos, slen); |
| |
1652 if(w > 0) { |
| |
1653 str_pos += w; |
| |
1654 } |
| |
1655 } |
| |
1656 |
| |
1657 // webosocket IO: this should read from the other socket and |
| |
1658 // call the ws_write callback |
| |
1659 int ret = client_ws_io(&dummy, &client->event); |
| |
1660 CX_TEST_ASSERT(client->error == 0); |
| |
1661 |
| |
1662 // run client_ws_io again, it should do nothing |
| |
1663 ret = client_ws_io(&dummy, &client->event); |
| |
1664 CX_TEST_ASSERT(ret == 1); // would block |
| |
1665 } |
| |
1666 |
| |
1667 // http_client_add_message should block at least once |
| |
1668 CX_TEST_ASSERT(out->pos == str_nbytes); |
| |
1669 CX_TEST_ASSERT(!memcmp(out->space, str, str_nbytes)); |
| |
1670 |
| |
1671 cxBufferFree(out); |
| 1573 http_client_free(client); |
1672 http_client_free(client); |
| 1574 close(sock); |
1673 close(sock); |
| 1575 } |
1674 } |
| 1576 } |
1675 } |
| 1577 |
1676 |
| 1598 cx_test_register(suite, test_http_client_io_write_error3); |
1697 cx_test_register(suite, test_http_client_io_write_error3); |
| 1599 cx_test_register(suite, test_http_client_io_write_blsz8_error1); |
1698 cx_test_register(suite, test_http_client_io_write_blsz8_error1); |
| 1600 cx_test_register(suite, test_http_client_io_write_blsz8_error2); |
1699 cx_test_register(suite, test_http_client_io_write_blsz8_error2); |
| 1601 cx_test_register(suite, test_http_client_io_write_blsz8_error3); |
1700 cx_test_register(suite, test_http_client_io_write_blsz8_error3); |
| 1602 cx_test_register(suite, test_http_client_ws_msg_out); |
1701 cx_test_register(suite, test_http_client_ws_msg_out); |
| 1603 } |
1702 cx_test_register(suite, test_http_client_ws_msg_in); |
| |
1703 } |