src/server/proxy/httpclient.c

changeset 719
c4c2b8e8ddc5
parent 718
9e98618464ae
equal deleted inserted replaced
718:9e98618464ae 719:c4c2b8e8ddc5
383 } 383 }
384 384
385 return 0; 385 return 0;
386 } 386 }
387 387
388 static int client_ws_write(HttpClient *client) {
389 if(!client->ws_write) {
390 client->transfer2_buffer_pos = client->transfer2_buffer_len;
391 return 0;
392 }
393
394 int ret = 0;
395
396 // pass all data from the transfer2 buffer to the ws_write callback
397 char *out = client->transfer2_buffer + client->transfer2_buffer_pos;
398 size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
399 while(nbytes > 0) {
400 ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata);
401 if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
402 ret = 1;
403 break;
404 } else if(w <= 0) {
405 client->error = 1;
406 return 0;
407 }
408 client->transfer2_buffer_pos += w;
409
410 // adjust buffer
411 out = client->transfer2_buffer + client->transfer2_buffer_pos;
412 nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
413 }
414
415 // clear buffer
416 if(client->transfer2_buffer_pos == client->transfer2_buffer_len) {
417 client->transfer2_buffer_pos = 0;
418 client->transfer2_buffer_len = 0;
419 }
420
421 return ret;
422 }
423
388 static int client_ws_process(HttpClient *client, Event *event) { 424 static int client_ws_process(HttpClient *client, Event *event) {
389 // send available data from the transfer buffer 425 // send available data from the transfer buffer
390 int ret = client_send_buf(client); 426 int ret = client_send_buf(client);
391 if(client->error) { 427 if(client->error) {
392 return 0; 428 return 0;
398 } 434 }
399 client->transfer_buffer_pos = 0; 435 client->transfer_buffer_pos = 0;
400 client->transfer_buffer_len = 0; 436 client->transfer_buffer_len = 0;
401 } 437 }
402 438
439 // flush transfer2 buffer
440 if(client_ws_write(client)) {
441 return 1;
442 }
443
403 // read message 444 // read message
404 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; 445 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos;
405 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; 446 size_t available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
447
406 ssize_t r = -1; 448 ssize_t r = -1;
407 while(available > 0) { 449 while(available > 0) {
408 ssize_t r = net_read(client->stream, buf, available); 450 r = net_read(client->stream, buf, available);
409 if(r <= 0) { 451 if(r <= 0) {
410 break; 452 break;
411 } 453 }
412 client->transfer2_buffer_len += r; 454 client->transfer2_buffer_len += r;
413 if(client->ws_write) { 455 if(client_ws_write(client)) {
414 char *out = client->transfer2_buffer + client->transfer2_buffer_pos; 456 break;
415 size_t nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
416 while(nbytes > 0) {
417 ssize_t w = client->ws_write(client, out, nbytes, client->ws_write_userdata);
418 if(w == HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
419 break;
420 } else if(w <= 0) {
421 client->error = 1;
422 return 0;
423 }
424 client->transfer2_buffer_pos += w;
425
426 // adjust buffer
427 out = client->transfer2_buffer + client->transfer2_buffer_pos;
428 nbytes = client->transfer2_buffer_len - client->transfer2_buffer_pos;
429 }
430 } else {
431 // noop
432 client->transfer2_buffer_pos = client->transfer2_buffer_len;
433 } 457 }
434 458
435 // adjust buffer 459 // adjust buffer
436 buf = client->transfer2_buffer + client->transfer2_buffer_pos; 460 buf = client->transfer2_buffer + client->transfer2_buffer_pos;
437 available = client->transfer2_buffer_alloc - client->transfer2_buffer_len; 461 available = client->transfer2_buffer_alloc - client->transfer2_buffer_len;
686 } 710 }
687 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); 711 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd);
688 if(!http) { 712 if(!http) {
689 fd->free(fd); 713 fd->free(fd);
690 } 714 }
691 http->fd->setmode((IOStream*)http, IO_MODE_RAW); 715 http->raw = WS_TRUE;
692 if(client->buffer.pos < client->buffer.cursize) { 716 if(client->buffer.pos < client->buffer.cursize) {
693 // bytes remaining in the buffer -> enable buffered reading 717 // bytes remaining in the buffer -> enable buffered reading
694 httpstream_enable_buffered_read( 718 httpstream_enable_buffered_read(
695 &http->st, 719 &http->st,
696 (char*)client->buffer.inbuf, 720 (char*)client->buffer.inbuf,
1568 CX_TEST_ASSERT(add_message_would_block); 1592 CX_TEST_ASSERT(add_message_would_block);
1569 CX_TEST_ASSERT(inbuf->pos == out_nbytes); 1593 CX_TEST_ASSERT(inbuf->pos == out_nbytes);
1570 CX_TEST_ASSERT(!memcmp(inbuf->space, outbuf, out_nbytes)); 1594 CX_TEST_ASSERT(!memcmp(inbuf->space, outbuf, out_nbytes));
1571 1595
1572 cxBufferFree(inbuf); 1596 cxBufferFree(inbuf);
1597 http_client_free(client);
1598 close(sock);
1599 }
1600 }
1601
1602 static ssize_t test_ws_write(HttpClient *client, void *buf, size_t nbytes, void *userdata) {
1603 CxBuffer *out = (CxBuffer*)userdata;
1604 return cxBufferWrite(buf, 1, nbytes, out);
1605 }
1606
1607 static CX_TEST(test_http_client_ws_msg_in) {
1608 CX_TEST_DO {
1609 EventHandler dummy;
1610 HttpClient *client = http_client_new(&dummy);
1611
1612 int fds[2];
1613 util_socketpair(fds);
1614 util_socket_setnonblock(fds[0], 1);
1615 util_socket_setnonblock(fds[1], 1);
1616 client->socketfd = fds[0];
1617 client->event.cookie = client;
1618 int sock = fds[1];
1619
1620 create_req_buffer(client);
1621 client->transfer_buffer_len = 0;
1622 client->transfer_buffer_pos = 0;
1623 client->transfer2_buffer_alloc = 1024*128;
1624 client->transfer2_buffer = malloc(client->transfer2_buffer_alloc);
1625
1626 IOStream *fd = Sysstream_new(NULL, client->socketfd);
1627 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd);
1628 http->raw = WS_TRUE;
1629 client->stream = http;
1630
1631 size_t str_nbytes = 1024*1024*64;
1632 char *str = malloc(str_nbytes);
1633 // init the buffer with random data
1634 for(size_t i=0;i<str_nbytes;i+=sizeof(int)) {
1635 int *p = (int*)(str+i);
1636 *p = rand();
1637 }
1638 size_t str_pos = 0;
1639
1640 CxBuffer *out = cxBufferCreate(NULL, NULL, str_nbytes, CX_BUFFER_AUTO_EXTEND|CX_BUFFER_FREE_CONTENTS);
1641 client->ws_write = test_ws_write;
1642 client->ws_write_userdata = out;
1643
1644 while(out->size < str_nbytes) {
1645 size_t slen = str_nbytes - str_pos;
1646 if(slen > 64*1024) {
1647 slen = 64*1024;
1648 }
1649
1650 if(slen > 0) {
1651 ssize_t w = write(sock, str + str_pos, slen);
1652 if(w > 0) {
1653 str_pos += w;
1654 }
1655 }
1656
1657 // webosocket IO: this should read from the other socket and
1658 // call the ws_write callback
1659 int ret = client_ws_io(&dummy, &client->event);
1660 CX_TEST_ASSERT(client->error == 0);
1661
1662 // run client_ws_io again, it should do nothing
1663 ret = client_ws_io(&dummy, &client->event);
1664 CX_TEST_ASSERT(ret == 1); // would block
1665 }
1666
1667 // http_client_add_message should block at least once
1668 CX_TEST_ASSERT(out->pos == str_nbytes);
1669 CX_TEST_ASSERT(!memcmp(out->space, str, str_nbytes));
1670
1671 cxBufferFree(out);
1573 http_client_free(client); 1672 http_client_free(client);
1574 close(sock); 1673 close(sock);
1575 } 1674 }
1576 } 1675 }
1577 1676
1598 cx_test_register(suite, test_http_client_io_write_error3); 1697 cx_test_register(suite, test_http_client_io_write_error3);
1599 cx_test_register(suite, test_http_client_io_write_blsz8_error1); 1698 cx_test_register(suite, test_http_client_io_write_blsz8_error1);
1600 cx_test_register(suite, test_http_client_io_write_blsz8_error2); 1699 cx_test_register(suite, test_http_client_io_write_blsz8_error2);
1601 cx_test_register(suite, test_http_client_io_write_blsz8_error3); 1700 cx_test_register(suite, test_http_client_io_write_blsz8_error3);
1602 cx_test_register(suite, test_http_client_ws_msg_out); 1701 cx_test_register(suite, test_http_client_ws_msg_out);
1603 } 1702 cx_test_register(suite, test_http_client_ws_msg_in);
1703 }

mercurial