src/server/util/io.c

changeset 385
a1f4cb076d2f
parent 383
a5698877d14a
child 406
e5d96f6b9306
equal deleted inserted replaced
210:21274e5950af 385:a1f4cb076d2f
250 HttpStream *st = pool_malloc(pool, sizeof(HttpStream)); 250 HttpStream *st = pool_malloc(pool, sizeof(HttpStream));
251 st->st = http_io_funcs; 251 st->st = http_io_funcs;
252 st->fd = fd; 252 st->fd = fd;
253 st->max_read = 0; 253 st->max_read = 0;
254 st->read = 0; 254 st->read = 0;
255 st->read_total = 0;
256 st->readbuf = NULL;
257 st->bufsize = 0;
258 st->buflen = NULL;
259 st->bufpos = NULL;
260 st->chunk_buf_pos = 0;
255 st->chunked_enc = WS_FALSE; 261 st->chunked_enc = WS_FALSE;
256 st->buffered = WS_FALSE; 262 st->read_eof = WS_TRUE;
263 st->write_eof = WS_FALSE;
257 return (IOStream*)st; 264 return (IOStream*)st;
258 } 265 }
259 266
267 int httpstream_enable_chunked_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos) {
268 if(st->read != (io_read_f)net_http_read) {
269 log_ereport(LOG_FAILURE, "%s", "httpstream_enable_chunked_read: IOStream is not an HttpStream");
270 return 1;
271 }
272 st->read = (io_read_f)net_http_read_chunked;
273 HttpStream *http = (HttpStream*)st;
274 http->max_read = 0;
275 http->read = 0;
276 http->readbuf = buffer;
277 http->bufsize = bufsize;
278 http->buflen = cursize;
279 http->bufpos = pos;
280 http->chunk_buf_pos = 0;
281 http->read_eof = WS_FALSE;
282 return 0;
283 }
284
285 int httpstream_enable_chunked_write(IOStream *st) {
286 if(st->write != (io_write_f)net_http_write) {
287 log_ereport(LOG_FAILURE, "%s", "httpstream_enable_chunked_write: IOStream is not an HttpStream");
288 return 1;
289 }
290 HttpStream *http = (HttpStream*)st;
291 http->chunked_enc = WS_TRUE;
292 return 0;
293 }
294
295 int httpstream_set_max_read(IOStream *st, int64_t maxread) {
296 if(st->write != (io_write_f)net_http_write) {
297 log_ereport(LOG_FAILURE, "%s", "httpstream_set_max_read: IOStream is not an HttpStream");
298 return 1;
299 }
300 HttpStream *http = (HttpStream*)st;
301 http->max_read = maxread;
302 return 0;
303 }
304
305 WSBool httpstream_eof(IOStream *st) {
306 HttpStream *http = (HttpStream*)st;
307 return http->read_eof;
308 }
309
260 ssize_t net_http_write(HttpStream *st, void *buf, size_t nbytes) { 310 ssize_t net_http_write(HttpStream *st, void *buf, size_t nbytes) {
311 if(st->write_eof) return 0;
261 IOStream *fd = st->fd; 312 IOStream *fd = st->fd;
262 if(st->chunked_enc) { 313 if(st->chunked_enc) {
263 // TODO: on some plattforms iov_len is smaller than size_t 314 // TODO: on some plattforms iov_len is smaller than size_t
264 struct iovec io[3]; 315 struct iovec io[3];
265 char chunk_len[16]; 316 char chunk_len[16];
267 io[0].iov_len = snprintf(chunk_len, 16, "%zx\r\n", nbytes); 318 io[0].iov_len = snprintf(chunk_len, 16, "%zx\r\n", nbytes);
268 io[1].iov_base = buf; 319 io[1].iov_base = buf;
269 io[1].iov_len = nbytes; 320 io[1].iov_len = nbytes;
270 io[2].iov_base = "\r\n"; 321 io[2].iov_base = "\r\n";
271 io[2].iov_len = 2; 322 io[2].iov_len = 2;
323 // TODO: FIXME: if r < sum of iov_len, everything would explode
324 // we need to store the chunk state and remaining bytes
272 ssize_t r = fd->writev(fd, io, 3); 325 ssize_t r = fd->writev(fd, io, 3);
273 return r - io[0].iov_len; 326 return r - io[0].iov_len - io[2].iov_len;
274 } else { 327 } else {
275 return fd->write(fd, buf, nbytes); 328 return fd->write(fd, buf, nbytes);
276 } 329 }
277 } 330 }
278 331
279 ssize_t net_http_writev(HttpStream *st, struct iovec *iovec, int iovcnt) { 332 ssize_t net_http_writev(HttpStream *st, struct iovec *iovec, int iovcnt) {
333 if(st->write_eof) return 0;
280 IOStream *fd = st->fd; 334 IOStream *fd = st->fd;
281 if(st->chunked_enc) { 335 if(st->chunked_enc) {
282 struct iovec *io = calloc(iovcnt + 1, sizeof(struct iovec)); 336 struct iovec *io = calloc(iovcnt + 1, sizeof(struct iovec));
337 if(!io) {
338 return 0;
339 }
283 char chunk_len[16]; 340 char chunk_len[16];
284 io[0].iov_base = chunk_len; 341 io[0].iov_base = chunk_len;
285 size_t len = 0; 342 size_t len = 0;
286 for(int i=0;i<iovcnt;i++) { 343 for(int i=0;i<iovcnt;i++) {
287 len += iovec[i].iov_len; 344 len += iovec[i].iov_len;
288 } 345 }
289 io[0].iov_len = snprintf(chunk_len, 16, "\r\n%zx\r\n", len); 346 io[0].iov_len = snprintf(chunk_len, 16, "\r\n%zx\r\n", len);
290 memcpy(io + 1, iovec, iovcnt * sizeof(struct iovec)); 347 memcpy(io + 1, iovec, iovcnt * sizeof(struct iovec));
291 ssize_t r = fd->writev(fd, io, iovcnt + 1); 348 ssize_t r = fd->writev(fd, io, iovcnt + 1);
292 return r - io[0].iov_len; 349
350 ssize_t ret = r - io[0].iov_len;
351 free(io);
352 return ret;
293 } else { 353 } else {
294 return fd->writev(fd, iovec, iovcnt); 354 return fd->writev(fd, iovec, iovcnt);
295 } 355 }
296 } 356 }
297 357
298 ssize_t net_http_read(HttpStream *st, void *buf, size_t nbytes) { 358 ssize_t net_http_read(HttpStream *st, void *buf, size_t nbytes) {
299 if(st->max_read != 0 && st->read >= st->max_read) { 359 if(st->read >= st->max_read) {
360 st->read_eof = WS_TRUE;
300 return 0; 361 return 0;
301 } 362 }
302 ssize_t r = st->fd->read(st->fd, buf, nbytes); 363 ssize_t r = st->fd->read(st->fd, buf, nbytes);
364 if(r < 0) {
365 st->st.io_errno = st->fd->io_errno;
366 }
303 st->read += r; 367 st->read += r;
304 return r; 368 return r;
305 } 369 }
306 370
307 ssize_t net_http_sendfile(HttpStream *st, sendfiledata *sfd) { 371 #define BUF_UNNEEDED_DIFF 64
372 /*
373 * read from st->chunk_buf first, read from st->fd if perform_io is true
374 */
375 static ssize_t net_http_read_buffered(HttpStream *st, char *buf, size_t nbytes, WSBool read_data, WSBool *perform_io) {
376 ssize_t r = 0;
377
378 //memset(buf, 'x', nbytes);
379 //char *orig_buf = buf;
380
381 // copy available data from st->readbuf to buf
382 int pos = *st->bufpos;
383 size_t buf_available = *st->buflen - pos;
384 if(buf_available) {
385 size_t cplen = buf_available > nbytes ? nbytes : buf_available;
386 if(read_data) {
387 // if we read data (and not a chunk header), we limit the
388 // amount of bytes we copy
389 size_t chunk_available = st->max_read - st->read;
390 cplen = cplen > chunk_available ? chunk_available : cplen;
391 st->read += cplen;
392 }
393 memcpy(buf, st->readbuf + pos, cplen);
394 *st->bufpos += cplen;
395 r += cplen;
396 buf += cplen;
397 nbytes -= cplen;
398 }
399
400 // maybe perform IO and refill the read buffer
401 // if we read data (read_data == true), make sure not to perform IO,
402 // when a chunk is completed
403 //
404 // if we read a chunk header (read_data == false) it is very important
405 // to not perform IO, if we have previously copied data from readbuf
406 // this ensures we never override non-chunk-header data
407 if(*perform_io && ((read_data && nbytes > 0 && st->max_read - st->read) || (!read_data && r == 0))) {
408 if(*st->buflen - *st->bufpos > 0) {
409 printf("todo: fix, should not happen, remove later\n");
410 }
411 // fill buffer again
412 ssize_t rlen = st->fd->read(st->fd, st->readbuf, st->bufsize);
413 *st->buflen = rlen;
414 *st->bufpos = 0;
415 *perform_io = WS_FALSE;
416 if(rlen < 0) {
417 st->st.io_errno = st->fd->io_errno;
418 }
419
420 if(rlen > 0) {
421 // call func again to get data from buffer (no IO will be performed)
422 r += net_http_read_buffered(st, buf, nbytes, read_data, perform_io);
423 }
424 }
425
426 return r;
427 }
428
429
430 /*
431 * parses a chunk header
432 * the chunk length is stored in chunklen
433 * return: 0 if the data is incomplete
434 * -1 if an error occured
435 * >0 chunk header length
436 */
437 static int parse_chunk_header(char *str, int len, WSBool first, int64_t *chunklen) {
438 char *hdr_start = NULL;
439 char *hdr_end = NULL;
440 int i = 0;
441 if(first) {
442 hdr_start = str;
443 } else {
444 if(len < 3) {
445 return 0;
446 }
447 if(str[0] == '\r' && str[1] == '\n') {
448 hdr_start = str+2;
449 i = 2;
450 } else if(str[0] == '\n') {
451 hdr_start = str+1;
452 i = 1;
453 } else {
454 return -1;
455 }
456 }
457
458 for(;i<len;i++) {
459 char c = str[i];
460 if(c == '\r' || c == '\n') {
461 hdr_end = str+i;
462 break;
463 }
464 }
465 if(!hdr_end || i == len) {
466 return 0; // incomplete
467 }
468
469 if(*hdr_end == '\r') {
470 // we also need '\n'
471 if(hdr_end[1] != '\n') {
472 return -1;
473 }
474 i++; // '\n' found
475 }
476
477 // parse
478 char save_c = *hdr_end;
479 *hdr_end = '\0';
480 char *end;
481 int64_t clen;
482 errno = 0;
483 clen = strtoll(hdr_start, &end, 16);
484 *hdr_end = save_c;
485 if(errno) {
486 return -1;
487 }
488 i++;
489
490 if(clen == 0) {
491 // chunk length of 0 indicates the end
492 // an additional \r\n is required (we also accept \n)
493 if(i >= len) {
494 return 0;
495 }
496 if(str[i] == '\n') {
497 i++;
498 } else if(str[i] == '\r') {
499 if(++i >= len) {
500 return 0;
501 }
502 if(str[i] == '\n') {
503 i++;
504 } else {
505 return -1;
506 }
507 } else {
508 return -1;
509 }
510 }
511
512 *chunklen = clen;
513 return i;
514 }
515
516 ssize_t net_http_read_chunked(HttpStream *st, void *buf, size_t nbytes) {
517 if(st->read_eof) {
518 return 0;
519 }
520
521 char *rbuf = buf; // buffer pos
522 size_t rd = 0; // number of bytes read
523 size_t rbuflen = nbytes; // number of bytes until end of buf
524 WSBool perform_io = WS_TRUE; // we do only 1 read before we abort
525 while(rd < nbytes && (perform_io || (st->max_read - st->read) > 0)) {
526 // how many bytes are available in the current chunk
527 size_t chunk_available = st->max_read - st->read;
528 if(chunk_available > 0) {
529 ssize_t r = net_http_read_buffered(st, rbuf, rbuflen, TRUE, &perform_io);
530 if(r == 0) {
531 break;
532 }
533 rd += r;
534 st->read_total += r;
535 rbuf += r;
536 rbuflen -= r;
537 } else {
538 int chunkbuf_avail = HTTP_STREAM_CBUF_SIZE - st->chunk_buf_pos;
539 if(chunkbuf_avail == 0) {
540 // for some reason HTTP_STREAM_CBUF_SIZE is not enough
541 // to store the chunk header
542 // this indicates that something has gone wrong (or this is an attack)
543 st->read_eof = WS_TRUE;
544 return -1;
545 }
546 // fill st->chunk_buf
547 ssize_t r = net_http_read_buffered(st, &st->chunk_buf[st->chunk_buf_pos], chunkbuf_avail, FALSE, &perform_io);
548 if(r == 0) {
549 break;
550 }
551 int chunkbuf_len = st->chunk_buf_pos + r;
552 int64_t chunklen;
553 int ret = parse_chunk_header(st->chunk_buf, chunkbuf_len, st->read_total > 0 ? FALSE : TRUE, &chunklen);
554 if(ret == 0) {
555 // incomplete chunk header
556 st->chunk_buf_pos = chunkbuf_len;
557 } else if(ret < 0) {
558 // error
559 st->read_eof = WS_TRUE;
560 return -1;
561 } else if(ret > 0) {
562 st->max_read = chunklen;
563 st->read = 0;
564 int remaining_len = chunkbuf_len - ret;
565 if(remaining_len > 0) {
566 // we have read more into chunk_buf than the chunk_header
567 // it is save to just move bufpos back
568 *st->bufpos -= remaining_len;
569 }
570 //st->remaining_len = chunkbuf_len - ret;
571 st->chunk_buf_pos = 0;
572
573 if(chunklen == 0) {
574 st->read_eof = WS_TRUE;
575 break;
576 }
577 }
578 }
579
580 if(!perform_io && rd == 0) {
581 perform_io = WS_TRUE;
582 }
583 }
584
585 return rd;
586 }
587
588 ssize_t net_http_sendfile(HttpStream *st, sendfiledata *sfd) {
589 if(st->write_eof) return 0;
308 ssize_t ret = 0; 590 ssize_t ret = 0;
309 // TODO: support chunked transfer encoding 591 // TODO: support chunked transfer encoding
310 if(st->fd->sendfile) { 592 if(st->fd->sendfile) {
311 ret = st->fd->sendfile(st->fd, sfd); 593 ret = st->fd->sendfile(st->fd, sfd);
312 } else { 594 } else {
319 void net_http_close(HttpStream *st) { 601 void net_http_close(HttpStream *st) {
320 st->fd->close(st->fd); 602 st->fd->close(st->fd);
321 } 603 }
322 604
323 void net_http_finish(HttpStream *st) { 605 void net_http_finish(HttpStream *st) {
324 if(st->chunked_enc) { 606 if(st->chunked_enc && !st->write_eof) {
325 st->fd->write(st->fd, "0\r\n\r\n", 5); 607 st->fd->write(st->fd, "0\r\n\r\n", 5);
326 } 608 }
609 st->write_eof = WS_TRUE;
327 } 610 }
328 611
329 void net_http_setmode(HttpStream *st, int mode) { 612 void net_http_setmode(HttpStream *st, int mode) {
330 st->fd->setmode(st->fd, mode); 613 st->fd->setmode(st->fd, mode);
331 } 614 }
450 733
451 ssize_t net_printf(SYS_NETFD fd, char *format, ...) { 734 ssize_t net_printf(SYS_NETFD fd, char *format, ...) {
452 va_list arg; 735 va_list arg;
453 va_start(arg, format); 736 va_start(arg, format);
454 sstr_t buf = ucx_vasprintf(ucx_default_allocator(), format, arg); 737 sstr_t buf = ucx_vasprintf(ucx_default_allocator(), format, arg);
455 ssize_t r = net_write(fd, buf.ptr, buf.length); 738 ssize_t r = buf.length > 0 ? net_write(fd, buf.ptr, buf.length) : 0;
456 free(buf.ptr); 739 free(buf.ptr);
457 va_end(arg); 740 va_end(arg);
458 if(r < 0) { 741 if(r < 0) {
459 ((IOStream*)fd)->io_errno = errno; 742 ((IOStream*)fd)->io_errno = errno;
460 } 743 }

mercurial