src/server/proxy/httpclient.c

changeset 698
fea7c3d74cc6
parent 694
a5aa94800b59
child 700
658f4c02b4c5
equal deleted inserted replaced
697:3ddfd45d4e47 698:fea7c3d74cc6
39 39
40 static int client_connected(EventHandler *ev, Event *event); 40 static int client_connected(EventHandler *ev, Event *event);
41 static int client_io(EventHandler *ev, Event *event); 41 static int client_io(EventHandler *ev, Event *event);
42 static int client_finished(EventHandler *ev, Event *event); 42 static int client_finished(EventHandler *ev, Event *event);
43 43
44 static int client_send_request(HttpClient *client); 44 static int client_send_buf(HttpClient *client);
45 static int client_send_request_body(HttpClient *client); 45 static int client_send_request_body(HttpClient *client);
46 static int client_read_response_header(HttpClient *client); 46 static int client_read_response_header(HttpClient *client);
47 static int client_read_response_body(HttpClient *client); 47 static int client_read_response_body(HttpClient *client);
48 48
49 HttpClient* http_client_new(EventHandler *ev) { 49 HttpClient* http_client_new(EventHandler *ev) {
87 http_parser_free(client->parser); 87 http_parser_free(client->parser);
88 if(client->stream) { 88 if(client->stream) {
89 client->stream->st.free(&client->stream->st); 89 client->stream->st.free(&client->stream->st);
90 } 90 }
91 free(client->buffer.inbuf); 91 free(client->buffer.inbuf);
92 free(client->transfer_buffer);
93 free(client->transfer2_buffer);
92 free(client->addr); 94 free(client->addr);
93 free(client->method); 95 free(client->method);
94 free(client->uri); 96 free(client->uri);
95 free(client); 97 free(client);
96 } 98 }
215 close(socketfd); 217 close(socketfd);
216 } 218 }
217 return ret; 219 return ret;
218 } 220 }
219 221
222 int http_client_process(HttpClient *client) {
223 return client_io(client->ev, &client->event);
224 }
225
226 size_t http_client_message_buf_size_available(HttpClient *client) {
227 return client->transfer_buffer_alloc - client->transfer_buffer_len;
228 }
229
230 int http_client_add_message(HttpClient *client, const void *buf, size_t size) {
231 size_t available = http_client_message_buf_size_available(client);
232 if(available == 0) {
233 return HTTP_CLIENT_CALLBACK_WOULD_BLOCK;
234 }
235 if(size > available) {
236 size = available;
237 }
238 memcpy(client->transfer_buffer + client->transfer_buffer_len, buf, size);
239 return size;
240 }
241
220 static int create_req_buffer(HttpClient *client) { 242 static int create_req_buffer(HttpClient *client) {
221 CxBuffer buf; 243 CxBuffer buf;
222 if(cxBufferInit(&buf, cxDefaultAllocator, NULL, HTTP_CLIENT_BUFFER_SIZE, CX_BUFFER_AUTO_EXTEND)) { 244 if(cxBufferInit(&buf, cxDefaultAllocator, NULL, HTTP_CLIENT_BUFFER_SIZE, CX_BUFFER_AUTO_EXTEND)) {
223 return 1; 245 return 1;
224 } 246 }
262 284
263 static int client_io(EventHandler *ev, Event *event) { 285 static int client_io(EventHandler *ev, Event *event) {
264 HttpClient *client = event->cookie; 286 HttpClient *client = event->cookie;
265 if(client->stage == 0) { 287 if(client->stage == 0) {
266 if(client->transfer_buffer_pos < client->transfer_buffer_len) { 288 if(client->transfer_buffer_pos < client->transfer_buffer_len) {
267 if(client_send_request(client)) { 289 if(client_send_buf(client)) {
268 return client->error == 0; 290 return client->error == 0;
269 } 291 }
270 } 292 }
271 293
272 // do we need to send a request body? 294 // do we need to send a request body?
273 if(client->req_content_length != 0) { 295 if(client->req_content_length != 0) {
274 if(client_send_request_body(client)) { 296 if(client_send_request_body(client)) {
275 return client->error == 0; 297 return client->error == 0;
276 } 298 }
277 } 299 }
278
279 client->transfer_buffer_pos = 0;
280 client->transfer_buffer_len = 0;
281 } 300 }
282 301
283 // writing complete, switch to read events 302 // writing complete, switch to read events
284 event->events = EVENT_POLLIN; 303 event->events = EVENT_POLLIN;
285 client->stage = 1; 304 client->stage = 1;
286 305
287 if(client_read_response_header(client)) { 306 if(client_read_response_header(client)) {
288 return client->error == 0; 307 return client->error == 0;
289 } 308 }
309 int ret = 0;
310 if(client->stage == 2) {
311 // websocket: write message buffer
312 ret = client_send_buf(client);
313 }
290 if(client_read_response_body(client)) { 314 if(client_read_response_body(client)) {
315 ret = 1;
316 }
317
318 if(ret) {
291 return client->error == 0; 319 return client->error == 0;
292 } 320 }
293 321
294 return 0; 322 return 0;
295 } 323 }
306 } 334 }
307 335
308 return 0; 336 return 0;
309 } 337 }
310 338
311 static int client_send_request(HttpClient *client) { 339 // sends the content of the transfer buffer to client->socketfd
340 static int client_send_buf(HttpClient *client) {
312 size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos; 341 size_t nbytes = client->transfer_buffer_len - client->transfer_buffer_pos;
313 ssize_t w; 342 ssize_t w;
314 while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) { 343 while((w = write(client->socketfd, client->transfer_buffer + client->transfer_buffer_pos, nbytes)) > 0) {
315 client->transfer_buffer_pos += w; 344 client->transfer_buffer_pos += w;
316 nbytes = client->transfer_buffer_len - client->transfer_buffer_pos; 345 nbytes = client->transfer_buffer_len - client->transfer_buffer_pos;
385 } 414 }
386 415
387 client->req_contentlength_pos += r; 416 client->req_contentlength_pos += r;
388 client->transfer_buffer_pos = startpos; 417 client->transfer_buffer_pos = startpos;
389 client->transfer_buffer_len = rbody_buf_offset + r; 418 client->transfer_buffer_len = rbody_buf_offset + r;
390 if(client_send_request(client)) { 419 if(client_send_buf(client)) {
391 return 1; 420 return 1;
392 } 421 }
393 } 422 }
394 423
395 // chunked transfer encoding: terminate 424 // chunked transfer encoding: terminate
396 if(client->req_content_length == -1 && !client->request_body_terminated) { 425 if(client->req_content_length == -1 && !client->request_body_terminated) {
397 memcpy(client->transfer_buffer, "0\r\n\r\n", 5); 426 memcpy(client->transfer_buffer, "0\r\n\r\n", 5);
398 client->transfer_buffer_pos = 0; 427 client->transfer_buffer_pos = 0;
399 client->transfer_buffer_len = 5; 428 client->transfer_buffer_len = 5;
400 client->request_body_terminated = 1; 429 client->request_body_terminated = 1;
401 if(client_send_request(client)) { 430 if(client_send_buf(client)) {
402 return 1; 431 return 1;
403 } 432 }
404 433
405 } else if(client->req_content_length != client->req_contentlength_pos) { 434 } else if(client->req_content_length != client->req_contentlength_pos) {
406 // incomplete request body 435 // incomplete request body
477 506
478 // initialize httpstream 507 // initialize httpstream
479 HeaderArray *headers = client->parser->headers; 508 HeaderArray *headers = client->parser->headers;
480 long long contentlength = 0; 509 long long contentlength = 0;
481 int chunkedtransferenc = 0; 510 int chunkedtransferenc = 0;
511 cxmutstr hdr_connection = CX_NULLSTR;
512 cxmutstr hdr_upgrade = CX_NULLSTR;
482 while(headers) { 513 while(headers) {
483 for(int i=0;i<headers->len;i++) { 514 for(int i=0;i<headers->len;i++) {
484 if(!cx_strcasecmp(headers->headers[i].name, "content-length")) { 515 cxmutstr header = headers->headers[i].name;
485 if(!cx_strtoll(headers->headers[i].value, &contentlength, 10)) { 516 cxmutstr hvalue = headers->headers[i].value;
517 if(!cx_strcasecmp(header, "content-length")) {
518 if(!cx_strtoll(hvalue, &contentlength, 10)) {
486 headers = NULL; 519 headers = NULL;
487 break; 520 break;
488 } 521 }
489 } else if(!cx_strcasecmp(headers->headers[i].name, "transfer-encoding")) { 522 } else if(!cx_strcasecmp(header, "transfer-encoding")) {
490 if(!cx_strcmp(headers->headers[i].value, "chunked")) { 523 if(!cx_strcmp(hvalue, "chunked")) {
491 chunkedtransferenc = 1; 524 chunkedtransferenc = 1;
492 headers = NULL; 525 headers = NULL;
493 break; 526 break;
494 } 527 }
528 } else if(!cx_strcasecmp(header, "connection")) {
529 hdr_connection = hvalue;
530 }
531
532 if(client->statuscode == 101 && !cx_strcasecmp(header, "upgrade")) {
533 hdr_upgrade = hvalue;
495 } 534 }
496 } 535 }
497 536
498 if(headers) { 537 if(headers) {
499 headers = headers->next; 538 headers = headers->next;
500 } 539 }
501 } 540 }
502 541
503 if(contentlength > 0 || chunkedtransferenc) { 542 if(client->statuscode == 101) {
543 if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) {
544 client->stage = 2;
545 client->event.events = EVENT_POLLIN|EVENT_POLLOUT;
546
547 // prepare IO buffers for websockets
548 client->transfer_buffer_len = 0;
549 client->transfer_buffer_pos = 0;
550
551 client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE);
552 if(!client->transfer2_buffer) {
553 client->error = 1;
554 return 1;
555 }
556 client->transfer2_buffer_alloc = HTTP_CLIENT_BUFFER_SIZE;
557 client->transfer2_buffer_len = 0;
558 client->transfer2_buffer_pos = 0;
559 } else {
560 // error: unknown protocol
561 log_ereport(LOG_FAILURE, "http-client: unknown protocol upgrade: %.*s", (int)hdr_upgrade.length, hdr_upgrade.ptr);
562 client->error = 1;
563 return 1;
564 }
565 } else if(contentlength > 0 || chunkedtransferenc) {
504 IOStream *fd = Sysstream_new(NULL, client->socketfd); 566 IOStream *fd = Sysstream_new(NULL, client->socketfd);
505 if(!fd) { 567 if(!fd) {
506 client->error = 1; 568 client->error = 1;
507 return 1; 569 return 1;
508 } 570 }
509 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); 571 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd);
510 if(!http) { 572 if(!http) {
511 fd->free(fd); 573 fd->free(fd);
512 } 574 }
575
576 // we can reuse the already allocated transfer_bufer for transfer2
577 client->transfer2_buffer = client->transfer_buffer;
578 client->transfer2_buffer_alloc = client->transfer_buffer_alloc;
579 client->transfer2_buffer_len = 0;
580 client->transfer_buffer_pos = 0;
581 client->transfer_buffer = NULL;
582 client->transfer_buffer_alloc = 0;
583
513 if(contentlength > 0) { 584 if(contentlength > 0) {
514 http->max_read = contentlength; 585 http->max_read = contentlength;
515 httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); 586 httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos);
516 } else if(chunkedtransferenc) { 587 } else if(chunkedtransferenc) {
517 httpstream_enable_chunked_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); 588 httpstream_enable_chunked_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos);
525 // uses the response_body_write callback to write the content of the 596 // uses the response_body_write callback to write the content of the
526 // transfer buffer 597 // transfer buffer
527 // returns 0 success 598 // returns 0 success
528 // 1 would block or error 599 // 1 would block or error
529 static int client_write_response(HttpClient *client) { 600 static int client_write_response(HttpClient *client) {
530 while(client->transfer_buffer_pos < client->transfer_buffer_len) { 601 while(client->transfer2_buffer_pos < client->transfer2_buffer_len) {
531 char *buf = client->transfer_buffer + client->transfer_buffer_pos; 602 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos;
532 size_t len = client->transfer_buffer_len - client->transfer_buffer_pos; 603 size_t len = client->transfer2_buffer_len - client->transfer2_buffer_pos;
533 int ret = client->response_body_write(client, buf, len, client->response_body_write_userdata); 604 int ret = client->response_body_write(client, buf, len, client->response_body_write_userdata);
534 if(ret > 0) { 605 if(ret > 0) {
535 client->transfer_buffer_pos += ret; 606 client->transfer2_buffer_pos += ret;
536 } else if(ret == 0) { 607 } else if(ret == 0) {
537 // EOF? 608 // EOF?
538 // check if the write is incomplete, which would be an error 609 // check if the write is incomplete, which would be an error
539 client->error == client->transfer_buffer_pos < client->transfer_buffer_len; 610 client->error == client->transfer2_buffer_pos < client->transfer2_buffer_len;
540 return client->error; 611 return client->error;
541 } else { 612 } else {
542 if(ret != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { 613 if(ret != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) {
543 client->error = 1; 614 client->error = 1;
544 } 615 }
558 // does the transfer buffer still contains bytes, that should be written? 629 // does the transfer buffer still contains bytes, that should be written?
559 if(client_write_response(client)) { 630 if(client_write_response(client)) {
560 return 1; 631 return 1;
561 } 632 }
562 633
563 char *buf = client->transfer_buffer; 634 char *buf = client->transfer2_buffer;
564 size_t nbytes = client->transfer_buffer_alloc; 635 size_t nbytes = client->transfer2_buffer_alloc;
565 636
566 ssize_t r; 637 ssize_t r;
567 while((r = net_read(&client->stream->st, buf, nbytes)) > 0) { 638 while((r = net_read(&client->stream->st, buf, nbytes)) > 0) {
568 client->transfer_buffer_len = r; 639 client->transfer2_buffer_len = r;
569 client->transfer_buffer_pos = 0; 640 client->transfer2_buffer_pos = 0;
570 if(client->response_body_write) { 641 if(client->response_body_write) {
571 if(client_write_response(client)) { 642 if(client_write_response(client)) {
572 return 1; 643 return 1;
573 } 644 }
574 } 645 }
611 client->transfer_buffer = str; 682 client->transfer_buffer = str;
612 client->transfer_buffer_len = len; 683 client->transfer_buffer_len = len;
613 684
614 // test client_send_request 685 // test client_send_request
615 686
616 int ret = client_send_request(client); 687 int ret = client_send_buf(client);
617 // It is very likely that the first client_send_request call doesn't 688 // It is very likely that the first client_send_request call doesn't
618 // fully write the request buffer to the socket 689 // fully write the request buffer to the socket
619 // In that case it returns 1 but without the error flag 690 // In that case it returns 1 but without the error flag
620 CX_TEST_ASSERT(ret == 1 && !client->error); 691 CX_TEST_ASSERT(ret == 1 && !client->error);
621 CX_TEST_ASSERT(client->transfer_buffer_pos > 0); 692 CX_TEST_ASSERT(client->transfer_buffer_pos > 0);
628 int writes = 1; 699 int writes = 1;
629 while(client->transfer_buffer_pos < client->transfer_buffer_len && writes < 2000000) { 700 while(client->transfer_buffer_pos < client->transfer_buffer_len && writes < 2000000) {
630 ssize_t r = read(sock, tmpbuf, 1024); 701 ssize_t r = read(sock, tmpbuf, 1024);
631 CX_TEST_ASSERT(r >= 0); 702 CX_TEST_ASSERT(r >= 0);
632 cxBufferWrite(tmpbuf, 1, r, &buf); 703 cxBufferWrite(tmpbuf, 1, r, &buf);
633 ret = client_send_request(client); 704 ret = client_send_buf(client);
634 CX_TEST_ASSERT(ret == 0 || (ret == 1 && !client->error)); 705 CX_TEST_ASSERT(ret == 0 || (ret == 1 && !client->error));
635 706
636 writes++; 707 writes++;
637 } 708 }
638 CX_TEST_ASSERT(client->transfer_buffer_pos == client->transfer_buffer_len); 709 CX_TEST_ASSERT(client->transfer_buffer_pos == client->transfer_buffer_len);

mercurial