src/server/util/io.c

changeset 498
0d80f8a2b29f
parent 493
56cf890dd9ed
child 500
077aa138e8fb
equal deleted inserted replaced
497:8827517054ec 498:0d80f8a2b29f
116 st->fd = fd; 116 st->fd = fd;
117 return (IOStream*)st; 117 return (IOStream*)st;
118 } 118 }
119 119
120 #ifdef XP_UNIX 120 #ifdef XP_UNIX
121 ssize_t net_sys_write(Sysstream *st, void *buf, size_t nbytes) { 121 ssize_t net_sys_write(Sysstream *st, const void *buf, size_t nbytes) {
122 return write(st->fd, buf, nbytes); 122 return write(st->fd, buf, nbytes);
123 } 123 }
124 124
125 ssize_t net_sys_writev(Sysstream *st, struct iovec *iovec, int iovcnt) { 125 ssize_t net_sys_writev(Sysstream *st, struct iovec *iovec, int iovcnt) {
126 return writev(st->fd, iovec, iovcnt); 126 return writev(st->fd, iovec, iovcnt);
262 st->readbuf = NULL; 262 st->readbuf = NULL;
263 st->bufsize = 0; 263 st->bufsize = 0;
264 st->buflen = NULL; 264 st->buflen = NULL;
265 st->bufpos = NULL; 265 st->bufpos = NULL;
266 st->chunk_buf_pos = 0; 266 st->chunk_buf_pos = 0;
267 st->current_chunk_length = 0;
268 st->current_chunk_pos = 0;
269 st->write_chunk_buf_len = 0;
270 st->write_chunk_buf_pos = 0;
267 st->chunked_enc = WS_FALSE; 271 st->chunked_enc = WS_FALSE;
268 st->read_eof = WS_TRUE; 272 st->read_eof = WS_TRUE;
269 st->write_eof = WS_FALSE; 273 st->write_eof = WS_FALSE;
270 return (IOStream*)st; 274 return (IOStream*)st;
271 } 275 }
316 int64_t httpstream_written(IOStream *st) { 320 int64_t httpstream_written(IOStream *st) {
317 HttpStream *http = (HttpStream*)st; 321 HttpStream *http = (HttpStream*)st;
318 return http->written; 322 return http->written;
319 } 323 }
320 324
321 ssize_t net_http_write(HttpStream *st, void *buf, size_t nbytes) { 325 /*
326 * iovec callback func
327 * returns number of payload bytes written (number of bytes returned back to the net_write caller)
328 */
329 typedef ssize_t(*writeop_finish_func)(HttpStream *st, char *base, size_t len, size_t written, void *udata);
330
331 static ssize_t httpstream_finish_prev_header(HttpStream *st, char *base, size_t len, size_t written, void *udata) {
332 st->write_chunk_buf_pos += written;
333 if(st->write_chunk_buf_pos == st->write_chunk_buf_len) {
334 st->write_chunk_buf_len = 0;
335 st->write_chunk_buf_pos = 0;
336 }
337 return 0;
338 }
339
340 static ssize_t httpstream_finish_data(HttpStream *st, char *base, size_t len, size_t written, void *udata) {
341 st->current_chunk_pos += written;
342 if(st->current_chunk_pos == st->current_chunk_length) {
343 st->current_chunk_length = 0;
344 st->current_chunk_pos = 0;
345 st->current_trailer = 2;
346 }
347 return written;
348 }
349
350 static ssize_t httpstream_finish_new_header(HttpStream *st, char *base, size_t len, size_t written, void *udata) {
351 size_t *chunk_len = udata;
352 st->current_chunk_length = *chunk_len;
353 st->current_chunk_pos = 0; // new chunk started
354 if(written < len) {
355 st->write_chunk_buf_len = len-written;
356 st->write_chunk_buf_pos = 0;
357 memcpy(st->write_chunk_buf + st->write_chunk_buf_pos, base+written, st->write_chunk_buf_len);
358 } else {
359 st->write_chunk_buf_len = 0;
360 st->write_chunk_buf_pos = 0;
361 }
362 return 0;
363 }
364
365 static ssize_t httpstream_finish_trailer(HttpStream *st, char *base, size_t len, size_t written, void *udata) {
366 st->current_trailer -= written;
367 return 0;
368 }
369
370 ssize_t net_http_write(HttpStream *st, const void *buf, size_t nbytes) {
322 if(st->write_eof) return 0; 371 if(st->write_eof) return 0;
323 IOStream *fd = st->fd; 372 IOStream *fd = st->fd;
324 if(st->chunked_enc) { 373 if(!st->chunked_enc) {
325 // TODO: on some plattforms iov_len is smaller than size_t 374 ssize_t w = fd->write(fd, buf, nbytes);
326 struct iovec io[3];
327 char chunk_len[16];
328 io[0].iov_base = chunk_len;
329 io[0].iov_len = snprintf(chunk_len, 16, "%zx\r\n", nbytes);
330 io[1].iov_base = buf;
331 io[1].iov_len = nbytes;
332 io[2].iov_base = "\r\n";
333 io[2].iov_len = 2;
334 // TODO: FIXME: if wv < sum of iov_len, everything would explode
335 // we need to store the chunk state and remaining bytes
336 // TODO: FIX IT NOW, IT IS HORRIBLE BROKEN
337 ssize_t wv = fd->writev(fd, io, 3);
338 ssize_t w = wv - io[0].iov_len - io[2].iov_len;
339 st->written += w > 0 ? w : 0; 375 st->written += w > 0 ? w : 0;
340 return w; 376 return w;
341 } else { 377 } else {
342 ssize_t w = fd->write(fd, buf, nbytes); 378 struct iovec io[8];
343 st->written += w > 0 ? w : 0; 379 writeop_finish_func io_finished[8];
344 return w; 380 void *io_finished_udata[8];
381 int iovec_len = 0;
382
383 char *str_crlf = "\r\n";
384
385 size_t prev_chunk_len = st->current_chunk_length;
386 size_t new_chunk_len = 0;
387
388 // was the previous chunk header completely sent?
389 if(st->write_chunk_buf_len > 0) {
390 io[0].iov_base = &st->write_chunk_buf[st->write_chunk_buf_pos];
391 io[0].iov_len = st->write_chunk_buf_len - st->write_chunk_buf_pos;
392 io_finished[0] = httpstream_finish_prev_header;
393 io_finished_udata[0] = &prev_chunk_len;
394 iovec_len++;
395 }
396
397 // was the previous chunk payload completely sent?
398 if(st->current_chunk_length != 0) {
399 size_t chunk_remaining = st->current_chunk_length - st->current_chunk_pos;
400 size_t prev_nbytes = chunk_remaining > nbytes ? nbytes : chunk_remaining;
401 io[iovec_len].iov_base = (char*)buf;
402 io[iovec_len].iov_len = prev_nbytes;
403 io_finished[iovec_len] = httpstream_finish_data;
404 buf = ((char*)buf) + prev_nbytes;
405 nbytes -= prev_nbytes;
406 iovec_len++;
407
408 io[iovec_len].iov_base = str_crlf;
409 io[iovec_len].iov_len = 2;
410 io_finished[iovec_len] = httpstream_finish_trailer;
411 iovec_len++;
412 } else if(st->current_trailer > 0) {
413 io[iovec_len].iov_base = str_crlf + 2 - st->current_trailer;
414 io[iovec_len].iov_len = st->current_trailer;
415 io_finished[iovec_len] = httpstream_finish_trailer;
416 iovec_len++;
417 }
418
419 // TODO: on some plattforms iov_len is smaller than size_t
420 // if nbytes > INT_MAX, it should be devided into multiple
421 // iovec entries
422 char chunk_len[16];
423 if(nbytes > 0) {
424 new_chunk_len = nbytes;
425 io[iovec_len].iov_base = chunk_len;
426 io[iovec_len].iov_len = snprintf(chunk_len, 16, "%zx\r\n", nbytes);
427 io_finished[iovec_len] = httpstream_finish_new_header;
428 io_finished_udata[iovec_len] = &new_chunk_len;
429 iovec_len++;
430
431 io[iovec_len].iov_base = (char*)buf;
432 io[iovec_len].iov_len = nbytes;
433 io_finished[iovec_len] = httpstream_finish_data;
434 iovec_len++;
435
436 io[iovec_len].iov_base = str_crlf;
437 io[iovec_len].iov_len = 2;
438 io_finished[iovec_len] = httpstream_finish_trailer;
439 iovec_len++;
440 }
441
442 ssize_t wv = fd->writev(fd, io, iovec_len);
443 if(wv <= 0) {
444 return wv;
445 }
446
447 size_t ret_w = 0;
448 int i = 0;
449 while(wv > 0) {
450 char *base = io[i].iov_base;
451 size_t len = io[i].iov_len;
452 size_t wlen = wv > len ? len : wv;
453 ret_w += io_finished[i](st, base, len, wlen, io_finished_udata[i]);
454 wv -= wlen;
455 i++;
456 }
457
458 st->written += ret_w;
459 return ret_w;
345 } 460 }
346 } 461 }
347 462
348 ssize_t net_http_writev(HttpStream *st, struct iovec *iovec, int iovcnt) { 463 ssize_t net_http_writev(HttpStream *st, struct iovec *iovec, int iovcnt) {
349 if(st->write_eof) return 0; 464 if(st->write_eof) return 0;
451 * the chunk length is stored in chunklen 566 * the chunk length is stored in chunklen
452 * return: 0 if the data is incomplete 567 * return: 0 if the data is incomplete
453 * -1 if an error occured 568 * -1 if an error occured
454 * >0 chunk header length 569 * >0 chunk header length
455 */ 570 */
456 static int parse_chunk_header(char *str, int len, WSBool first, int64_t *chunklen) { 571 int http_stream_parse_chunk_header(char *str, int len, WSBool first, int64_t *chunklen) {
457 char *hdr_start = NULL; 572 char *hdr_start = NULL;
458 char *hdr_end = NULL; 573 char *hdr_end = NULL;
459 int i = 0; 574 int i = 0;
460 if(first) { 575 if(first) {
461 hdr_start = str; 576 hdr_start = str;
567 if(r == 0) { 682 if(r == 0) {
568 break; 683 break;
569 } 684 }
570 int chunkbuf_len = st->chunk_buf_pos + r; 685 int chunkbuf_len = st->chunk_buf_pos + r;
571 int64_t chunklen; 686 int64_t chunklen;
572 int ret = parse_chunk_header(st->chunk_buf, chunkbuf_len, st->read_total > 0 ? FALSE : TRUE, &chunklen); 687 int ret = http_stream_parse_chunk_header(st->chunk_buf, chunkbuf_len, st->read_total > 0 ? FALSE : TRUE, &chunklen);
573 if(ret == 0) { 688 if(ret == 0) {
574 // incomplete chunk header 689 // incomplete chunk header
575 st->chunk_buf_pos = chunkbuf_len; 690 st->chunk_buf_pos = chunkbuf_len;
576 } else if(ret < 0) { 691 } else if(ret < 0) {
577 // error 692 // error
649 st->ssl = ssl; 764 st->ssl = ssl;
650 st->error = 0; 765 st->error = 0;
651 return (IOStream*)st; 766 return (IOStream*)st;
652 } 767 }
653 768
654 ssize_t net_ssl_write(SSLStream *st, void *buf, size_t nbytes) { 769 ssize_t net_ssl_write(SSLStream *st, const void *buf, size_t nbytes) {
655 int ret = SSL_write(st->ssl, buf, nbytes); 770 int ret = SSL_write(st->ssl, buf, nbytes);
656 if(ret <= 0) { 771 if(ret <= 0) {
657 st->error = SSL_get_error(st->ssl, ret); 772 st->error = SSL_get_error(st->ssl, ret);
658 } 773 }
659 return ret; 774 return ret;
732 return IO_ERROR; 847 return IO_ERROR;
733 } 848 }
734 return r; 849 return r;
735 } 850 }
736 851
737 ssize_t net_write(SYS_NETFD fd, void *buf, size_t nbytes) { 852 ssize_t net_write(SYS_NETFD fd, const void *buf, size_t nbytes) {
738 ssize_t r = ((IOStream*)fd)->write(fd, buf, nbytes); 853 ssize_t r = ((IOStream*)fd)->write(fd, buf, nbytes);
739 if(r < 0) { 854 if(r < 0) {
740 ((IOStream*)fd)->io_errno = errno; 855 ((IOStream*)fd)->io_errno = errno;
741 return IO_ERROR; 856 return IO_ERROR;
742 } 857 }

mercurial