| 39 |
39 |
| 40 static int client_connected(EventHandler *ev, Event *event); |
40 static int client_connected(EventHandler *ev, Event *event); |
| 41 static int client_io(EventHandler *ev, Event *event); |
41 static int client_io(EventHandler *ev, Event *event); |
| 42 static int client_finished(EventHandler *ev, Event *event); |
42 static int client_finished(EventHandler *ev, Event *event); |
| 43 |
43 |
| 44 static int client_send_request(HttpClient *client); |
44 static int client_send_buf(HttpClient *client); |
| 45 static int client_send_request_body(HttpClient *client); |
45 static int client_send_request_body(HttpClient *client); |
| 46 static int client_read_response_header(HttpClient *client); |
46 static int client_read_response_header(HttpClient *client); |
| 47 static int client_read_response_body(HttpClient *client); |
47 static int client_read_response_body(HttpClient *client); |
| 48 |
48 |
| 49 HttpClient* http_client_new(EventHandler *ev) { |
49 HttpClient* http_client_new(EventHandler *ev) { |
| 215 close(socketfd); |
217 close(socketfd); |
| 216 } |
218 } |
| 217 return ret; |
219 return ret; |
| 218 } |
220 } |
| 219 |
221 |
| |
222 int http_client_process(HttpClient *client) { |
| |
223 return client_io(client->ev, &client->event); |
| |
224 } |
| |
225 |
| |
226 size_t http_client_message_buf_size_available(HttpClient *client) { |
| |
227 return client->transfer_buffer_alloc - client->transfer_buffer_len; |
| |
228 } |
| |
229 |
| |
230 int http_client_add_message(HttpClient *client, const void *buf, size_t size) { |
| |
231 size_t available = http_client_message_buf_size_available(client); |
| |
232 if(available == 0) { |
| |
233 return HTTP_CLIENT_CALLBACK_WOULD_BLOCK; |
| |
234 } |
| |
235 if(size > available) { |
| |
236 size = available; |
| |
237 } |
| |
238 memcpy(client->transfer_buffer + client->transfer_buffer_len, buf, size); |
| |
239 return size; |
| |
240 } |
| |
241 |
| 220 static int create_req_buffer(HttpClient *client) { |
242 static int create_req_buffer(HttpClient *client) { |
| 221 CxBuffer buf; |
243 CxBuffer buf; |
| 222 if(cxBufferInit(&buf, cxDefaultAllocator, NULL, HTTP_CLIENT_BUFFER_SIZE, CX_BUFFER_AUTO_EXTEND)) { |
244 if(cxBufferInit(&buf, cxDefaultAllocator, NULL, HTTP_CLIENT_BUFFER_SIZE, CX_BUFFER_AUTO_EXTEND)) { |
| 223 return 1; |
245 return 1; |
| 224 } |
246 } |
| 262 |
284 |
| 263 static int client_io(EventHandler *ev, Event *event) { |
285 static int client_io(EventHandler *ev, Event *event) { |
| 264 HttpClient *client = event->cookie; |
286 HttpClient *client = event->cookie; |
| 265 if(client->stage == 0) { |
287 if(client->stage == 0) { |
| 266 if(client->transfer_buffer_pos < client->transfer_buffer_len) { |
288 if(client->transfer_buffer_pos < client->transfer_buffer_len) { |
| 267 if(client_send_request(client)) { |
289 if(client_send_buf(client)) { |
| 268 return client->error == 0; |
290 return client->error == 0; |
| 269 } |
291 } |
| 270 } |
292 } |
| 271 |
293 |
| 272 // do we need to send a request body? |
294 // do we need to send a request body? |
| 273 if(client->req_content_length != 0) { |
295 if(client->req_content_length != 0) { |
| 274 if(client_send_request_body(client)) { |
296 if(client_send_request_body(client)) { |
| 275 return client->error == 0; |
297 return client->error == 0; |
| 276 } |
298 } |
| 277 } |
299 } |
| 278 |
|
| 279 client->transfer_buffer_pos = 0; |
|
| 280 client->transfer_buffer_len = 0; |
|
| 281 } |
300 } |
| 282 |
301 |
| 283 // writing complete, switch to read events |
302 // writing complete, switch to read events |
| 284 event->events = EVENT_POLLIN; |
303 event->events = EVENT_POLLIN; |
| 285 client->stage = 1; |
304 client->stage = 1; |
| 286 |
305 |
| 287 if(client_read_response_header(client)) { |
306 if(client_read_response_header(client)) { |
| 288 return client->error == 0; |
307 return client->error == 0; |
| 289 } |
308 } |
| |
309 int ret = 0; |
| |
310 if(client->stage == 2) { |
| |
311 // websocket: write message buffer |
| |
312 ret = client_send_buf(client); |
| |
313 } |
| 290 if(client_read_response_body(client)) { |
314 if(client_read_response_body(client)) { |
| |
315 ret = 1; |
| |
316 } |
| |
317 |
| |
318 if(ret) { |
| 291 return client->error == 0; |
319 return client->error == 0; |
| 292 } |
320 } |
| 293 |
321 |
| 294 return 0; |
322 return 0; |
| 295 } |
323 } |
| 385 } |
414 } |
| 386 |
415 |
| 387 client->req_contentlength_pos += r; |
416 client->req_contentlength_pos += r; |
| 388 client->transfer_buffer_pos = startpos; |
417 client->transfer_buffer_pos = startpos; |
| 389 client->transfer_buffer_len = rbody_buf_offset + r; |
418 client->transfer_buffer_len = rbody_buf_offset + r; |
| 390 if(client_send_request(client)) { |
419 if(client_send_buf(client)) { |
| 391 return 1; |
420 return 1; |
| 392 } |
421 } |
| 393 } |
422 } |
| 394 |
423 |
| 395 // chunked transfer encoding: terminate |
424 // chunked transfer encoding: terminate |
| 396 if(client->req_content_length == -1 && !client->request_body_terminated) { |
425 if(client->req_content_length == -1 && !client->request_body_terminated) { |
| 397 memcpy(client->transfer_buffer, "0\r\n\r\n", 5); |
426 memcpy(client->transfer_buffer, "0\r\n\r\n", 5); |
| 398 client->transfer_buffer_pos = 0; |
427 client->transfer_buffer_pos = 0; |
| 399 client->transfer_buffer_len = 5; |
428 client->transfer_buffer_len = 5; |
| 400 client->request_body_terminated = 1; |
429 client->request_body_terminated = 1; |
| 401 if(client_send_request(client)) { |
430 if(client_send_buf(client)) { |
| 402 return 1; |
431 return 1; |
| 403 } |
432 } |
| 404 |
433 |
| 405 } else if(client->req_content_length != client->req_contentlength_pos) { |
434 } else if(client->req_content_length != client->req_contentlength_pos) { |
| 406 // incomplete request body |
435 // incomplete request body |
| 477 |
506 |
| 478 // initialize httpstream |
507 // initialize httpstream |
| 479 HeaderArray *headers = client->parser->headers; |
508 HeaderArray *headers = client->parser->headers; |
| 480 long long contentlength = 0; |
509 long long contentlength = 0; |
| 481 int chunkedtransferenc = 0; |
510 int chunkedtransferenc = 0; |
| |
511 cxmutstr hdr_connection = CX_NULLSTR; |
| |
512 cxmutstr hdr_upgrade = CX_NULLSTR; |
| 482 while(headers) { |
513 while(headers) { |
| 483 for(int i=0;i<headers->len;i++) { |
514 for(int i=0;i<headers->len;i++) { |
| 484 if(!cx_strcasecmp(headers->headers[i].name, "content-length")) { |
515 cxmutstr header = headers->headers[i].name; |
| 485 if(!cx_strtoll(headers->headers[i].value, &contentlength, 10)) { |
516 cxmutstr hvalue = headers->headers[i].value; |
| |
517 if(!cx_strcasecmp(header, "content-length")) { |
| |
518 if(!cx_strtoll(hvalue, &contentlength, 10)) { |
| 486 headers = NULL; |
519 headers = NULL; |
| 487 break; |
520 break; |
| 488 } |
521 } |
| 489 } else if(!cx_strcasecmp(headers->headers[i].name, "transfer-encoding")) { |
522 } else if(!cx_strcasecmp(header, "transfer-encoding")) { |
| 490 if(!cx_strcmp(headers->headers[i].value, "chunked")) { |
523 if(!cx_strcmp(hvalue, "chunked")) { |
| 491 chunkedtransferenc = 1; |
524 chunkedtransferenc = 1; |
| 492 headers = NULL; |
525 headers = NULL; |
| 493 break; |
526 break; |
| 494 } |
527 } |
| |
528 } else if(!cx_strcasecmp(header, "connection")) { |
| |
529 hdr_connection = hvalue; |
| |
530 } |
| |
531 |
| |
532 if(client->statuscode == 101 && !cx_strcasecmp(header, "upgrade")) { |
| |
533 hdr_upgrade = hvalue; |
| 495 } |
534 } |
| 496 } |
535 } |
| 497 |
536 |
| 498 if(headers) { |
537 if(headers) { |
| 499 headers = headers->next; |
538 headers = headers->next; |
| 500 } |
539 } |
| 501 } |
540 } |
| 502 |
541 |
| 503 if(contentlength > 0 || chunkedtransferenc) { |
542 if(client->statuscode == 101) { |
| |
543 if(!cx_strcasecmp(hdr_upgrade, "websocket") && !cx_strcasecmp(hdr_connection, "upgrade")) { |
| |
544 client->stage = 2; |
| |
545 client->event.events = EVENT_POLLIN|EVENT_POLLOUT; |
| |
546 |
| |
547 // prepare IO buffers for websockets |
| |
548 client->transfer_buffer_len = 0; |
| |
549 client->transfer_buffer_pos = 0; |
| |
550 |
| |
551 client->transfer2_buffer = malloc(HTTP_CLIENT_BUFFER_SIZE); |
| |
552 if(!client->transfer2_buffer) { |
| |
553 client->error = 1; |
| |
554 return 1; |
| |
555 } |
| |
556 client->transfer2_buffer_alloc = HTTP_CLIENT_BUFFER_SIZE; |
| |
557 client->transfer2_buffer_len = 0; |
| |
558 client->transfer2_buffer_pos = 0; |
| |
559 } else { |
| |
560 // error: unknown protocol |
| |
561 log_ereport(LOG_FAILURE, "http-client: unknown protocol upgrade: %.*s", (int)hdr_upgrade.length, hdr_upgrade.ptr); |
| |
562 client->error = 1; |
| |
563 return 1; |
| |
564 } |
| |
565 } else if(contentlength > 0 || chunkedtransferenc) { |
| 504 IOStream *fd = Sysstream_new(NULL, client->socketfd); |
566 IOStream *fd = Sysstream_new(NULL, client->socketfd); |
| 505 if(!fd) { |
567 if(!fd) { |
| 506 client->error = 1; |
568 client->error = 1; |
| 507 return 1; |
569 return 1; |
| 508 } |
570 } |
| 509 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
571 HttpStream *http = (HttpStream*)httpstream_new(NULL, fd); |
| 510 if(!http) { |
572 if(!http) { |
| 511 fd->free(fd); |
573 fd->free(fd); |
| 512 } |
574 } |
| |
575 |
| |
576 // we can reuse the already allocated transfer_bufer for transfer2 |
| |
577 client->transfer2_buffer = client->transfer_buffer; |
| |
578 client->transfer2_buffer_alloc = client->transfer_buffer_alloc; |
| |
579 client->transfer2_buffer_len = 0; |
| |
580 client->transfer_buffer_pos = 0; |
| |
581 client->transfer_buffer = NULL; |
| |
582 client->transfer_buffer_alloc = 0; |
| |
583 |
| 513 if(contentlength > 0) { |
584 if(contentlength > 0) { |
| 514 http->max_read = contentlength; |
585 http->max_read = contentlength; |
| 515 httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); |
586 httpstream_enable_buffered_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); |
| 516 } else if(chunkedtransferenc) { |
587 } else if(chunkedtransferenc) { |
| 517 httpstream_enable_chunked_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); |
588 httpstream_enable_chunked_read(&http->st, (char*)client->buffer.inbuf, client->buffer.maxsize, &client->buffer.cursize, &client->buffer.pos); |
| 525 // uses the response_body_write callback to write the content of the |
596 // uses the response_body_write callback to write the content of the |
| 526 // transfer buffer |
597 // transfer buffer |
| 527 // returns 0 success |
598 // returns 0 success |
| 528 // 1 would block or error |
599 // 1 would block or error |
| 529 static int client_write_response(HttpClient *client) { |
600 static int client_write_response(HttpClient *client) { |
| 530 while(client->transfer_buffer_pos < client->transfer_buffer_len) { |
601 while(client->transfer2_buffer_pos < client->transfer2_buffer_len) { |
| 531 char *buf = client->transfer_buffer + client->transfer_buffer_pos; |
602 char *buf = client->transfer2_buffer + client->transfer2_buffer_pos; |
| 532 size_t len = client->transfer_buffer_len - client->transfer_buffer_pos; |
603 size_t len = client->transfer2_buffer_len - client->transfer2_buffer_pos; |
| 533 int ret = client->response_body_write(client, buf, len, client->response_body_write_userdata); |
604 int ret = client->response_body_write(client, buf, len, client->response_body_write_userdata); |
| 534 if(ret > 0) { |
605 if(ret > 0) { |
| 535 client->transfer_buffer_pos += ret; |
606 client->transfer2_buffer_pos += ret; |
| 536 } else if(ret == 0) { |
607 } else if(ret == 0) { |
| 537 // EOF? |
608 // EOF? |
| 538 // check if the write is incomplete, which would be an error |
609 // check if the write is incomplete, which would be an error |
| 539 client->error == client->transfer_buffer_pos < client->transfer_buffer_len; |
610 client->error == client->transfer2_buffer_pos < client->transfer2_buffer_len; |
| 540 return client->error; |
611 return client->error; |
| 541 } else { |
612 } else { |
| 542 if(ret != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { |
613 if(ret != HTTP_CLIENT_CALLBACK_WOULD_BLOCK) { |
| 543 client->error = 1; |
614 client->error = 1; |
| 544 } |
615 } |
| 558 // does the transfer buffer still contains bytes, that should be written? |
629 // does the transfer buffer still contains bytes, that should be written? |
| 559 if(client_write_response(client)) { |
630 if(client_write_response(client)) { |
| 560 return 1; |
631 return 1; |
| 561 } |
632 } |
| 562 |
633 |
| 563 char *buf = client->transfer_buffer; |
634 char *buf = client->transfer2_buffer; |
| 564 size_t nbytes = client->transfer_buffer_alloc; |
635 size_t nbytes = client->transfer2_buffer_alloc; |
| 565 |
636 |
| 566 ssize_t r; |
637 ssize_t r; |
| 567 while((r = net_read(&client->stream->st, buf, nbytes)) > 0) { |
638 while((r = net_read(&client->stream->st, buf, nbytes)) > 0) { |
| 568 client->transfer_buffer_len = r; |
639 client->transfer2_buffer_len = r; |
| 569 client->transfer_buffer_pos = 0; |
640 client->transfer2_buffer_pos = 0; |
| 570 if(client->response_body_write) { |
641 if(client->response_body_write) { |
| 571 if(client_write_response(client)) { |
642 if(client_write_response(client)) { |
| 572 return 1; |
643 return 1; |
| 573 } |
644 } |
| 574 } |
645 } |
| 611 client->transfer_buffer = str; |
682 client->transfer_buffer = str; |
| 612 client->transfer_buffer_len = len; |
683 client->transfer_buffer_len = len; |
| 613 |
684 |
| 614 // test client_send_request |
685 // test client_send_request |
| 615 |
686 |
| 616 int ret = client_send_request(client); |
687 int ret = client_send_buf(client); |
| 617 // It is very likely that the first client_send_request call doesn't |
688 // It is very likely that the first client_send_request call doesn't |
| 618 // fully write the request buffer to the socket |
689 // fully write the request buffer to the socket |
| 619 // In that case it returns 1 but without the error flag |
690 // In that case it returns 1 but without the error flag |
| 620 CX_TEST_ASSERT(ret == 1 && !client->error); |
691 CX_TEST_ASSERT(ret == 1 && !client->error); |
| 621 CX_TEST_ASSERT(client->transfer_buffer_pos > 0); |
692 CX_TEST_ASSERT(client->transfer_buffer_pos > 0); |
| 628 int writes = 1; |
699 int writes = 1; |
| 629 while(client->transfer_buffer_pos < client->transfer_buffer_len && writes < 2000000) { |
700 while(client->transfer_buffer_pos < client->transfer_buffer_len && writes < 2000000) { |
| 630 ssize_t r = read(sock, tmpbuf, 1024); |
701 ssize_t r = read(sock, tmpbuf, 1024); |
| 631 CX_TEST_ASSERT(r >= 0); |
702 CX_TEST_ASSERT(r >= 0); |
| 632 cxBufferWrite(tmpbuf, 1, r, &buf); |
703 cxBufferWrite(tmpbuf, 1, r, &buf); |
| 633 ret = client_send_request(client); |
704 ret = client_send_buf(client); |
| 634 CX_TEST_ASSERT(ret == 0 || (ret == 1 && !client->error)); |
705 CX_TEST_ASSERT(ret == 0 || (ret == 1 && !client->error)); |
| 635 |
706 |
| 636 writes++; |
707 writes++; |
| 637 } |
708 } |
| 638 CX_TEST_ASSERT(client->transfer_buffer_pos == client->transfer_buffer_len); |
709 CX_TEST_ASSERT(client->transfer_buffer_pos == client->transfer_buffer_len); |