src/server/util/io.c

changeset 385
a1f4cb076d2f
parent 383
a5698877d14a
child 406
e5d96f6b9306
--- a/src/server/util/io.c	Tue Aug 13 22:14:32 2019 +0200
+++ b/src/server/util/io.c	Sat Sep 24 16:26:10 2022 +0200
@@ -252,12 +252,63 @@
     st->fd = fd;
     st->max_read = 0;
     st->read = 0;
+    st->read_total = 0;
+    st->readbuf = NULL;
+    st->bufsize = 0;
+    st->buflen = NULL;
+    st->bufpos = NULL;
+    st->chunk_buf_pos = 0;
     st->chunked_enc = WS_FALSE;
-    st->buffered = WS_FALSE;
+    st->read_eof = WS_TRUE;
+    st->write_eof = WS_FALSE;
     return (IOStream*)st;
 }
 
+int httpstream_enable_chunked_read(IOStream *st, char *buffer, size_t bufsize, int *cursize, int *pos) {
+    if(st->read != (io_read_f)net_http_read) {
+        log_ereport(LOG_FAILURE, "%s", "httpstream_enable_chunked_read: IOStream is not an HttpStream");
+        return 1;
+    }
+    st->read = (io_read_f)net_http_read_chunked;
+    HttpStream *http = (HttpStream*)st;
+    http->max_read = 0;
+    http->read = 0;
+    http->readbuf = buffer;
+    http->bufsize = bufsize;
+    http->buflen = cursize;
+    http->bufpos = pos;
+    http->chunk_buf_pos = 0;
+    http->read_eof = WS_FALSE;
+    return 0;
+}
+
+int httpstream_enable_chunked_write(IOStream *st) {
+    if(st->write != (io_write_f)net_http_write) {
+        log_ereport(LOG_FAILURE, "%s", "httpstream_enable_chunked_write: IOStream is not an HttpStream");
+        return 1;
+    }
+    HttpStream *http = (HttpStream*)st;
+    http->chunked_enc = WS_TRUE;
+    return 0;
+}
+
+int httpstream_set_max_read(IOStream *st, int64_t maxread) {
+    if(st->write != (io_write_f)net_http_write) {
+        log_ereport(LOG_FAILURE, "%s", "httpstream_set_max_read: IOStream is not an HttpStream");
+        return 1;
+    }
+    HttpStream *http = (HttpStream*)st;
+    http->max_read = maxread;
+    return 0;
+}
+
+WSBool httpstream_eof(IOStream *st) {
+    HttpStream *http = (HttpStream*)st;
+    return http->read_eof;
+}
+
 ssize_t net_http_write(HttpStream *st, void *buf, size_t nbytes) {
+    if(st->write_eof) return 0;
     IOStream *fd = st->fd;
     if(st->chunked_enc) {
         // TODO: on some plattforms iov_len is smaller than size_t
@@ -269,17 +320,23 @@
         io[1].iov_len = nbytes;
         io[2].iov_base = "\r\n";
         io[2].iov_len = 2;
+        // TODO: FIXME: if r < sum of iov_len, everything would explode
+        // we need to store the chunk state and remaining bytes
         ssize_t r = fd->writev(fd, io, 3);
-        return r - io[0].iov_len;
+        return r - io[0].iov_len - io[2].iov_len;
     } else {
         return fd->write(fd, buf, nbytes);
     }
 }
 
 ssize_t net_http_writev(HttpStream *st, struct iovec *iovec, int iovcnt) {
+    if(st->write_eof) return 0;
     IOStream *fd = st->fd;
     if(st->chunked_enc) {
         struct iovec *io = calloc(iovcnt + 1, sizeof(struct iovec));
+        if(!io) {
+            return 0;
+        }
         char chunk_len[16];
         io[0].iov_base = chunk_len;
         size_t len = 0;
@@ -289,22 +346,247 @@
         io[0].iov_len = snprintf(chunk_len, 16, "\r\n%zx\r\n", len);
         memcpy(io + 1, iovec, iovcnt * sizeof(struct iovec));
         ssize_t r = fd->writev(fd, io, iovcnt + 1);
-        return r - io[0].iov_len;
+        
+        ssize_t ret = r - io[0].iov_len;
+        free(io);
+        return ret;
     } else {
         return fd->writev(fd, iovec, iovcnt);
     }
 }
 
 ssize_t net_http_read(HttpStream *st, void *buf, size_t nbytes) {
-    if(st->max_read != 0 && st->read >= st->max_read) {
+    if(st->read >= st->max_read) {
+        st->read_eof = WS_TRUE;
         return 0;
     }
     ssize_t r = st->fd->read(st->fd, buf, nbytes);
+    if(r < 0) {
+        st->st.io_errno = st->fd->io_errno;
+    }
     st->read += r;
     return r;
 }
 
-ssize_t net_http_sendfile(HttpStream *st, sendfiledata *sfd) {  
+#define BUF_UNNEEDED_DIFF 64
+/*
+ * read from st->chunk_buf first, read from st->fd if perform_io is true
+ */
+static ssize_t net_http_read_buffered(HttpStream *st, char *buf, size_t nbytes, WSBool read_data, WSBool *perform_io) {
+    ssize_t r = 0;
+    
+    //memset(buf, 'x', nbytes);
+    //char *orig_buf = buf;
+    
+    // copy available data from st->readbuf to buf
+    int pos = *st->bufpos;
+    size_t buf_available = *st->buflen - pos;
+    if(buf_available) {
+        size_t cplen = buf_available > nbytes ? nbytes : buf_available;
+        if(read_data) {
+            // if we read data (and not a chunk header), we limit the
+            // amount of bytes we copy
+            size_t chunk_available = st->max_read - st->read;
+            cplen = cplen > chunk_available ? chunk_available : cplen;
+            st->read += cplen;
+        }
+        memcpy(buf, st->readbuf + pos, cplen);
+        *st->bufpos += cplen;
+        r += cplen;
+        buf += cplen;
+        nbytes -= cplen;
+    }
+    
+    // maybe perform IO and refill the read buffer
+    // if we read data (read_data == true), make sure not to perform IO,
+    // when a chunk is completed
+    //
+    // if we read a chunk header (read_data == false) it is very important
+    // to not perform IO, if we have previously copied data from readbuf
+    // this ensures we never override non-chunk-header data
+    if(*perform_io && ((read_data && nbytes > 0 && st->max_read - st->read) || (!read_data && r == 0))) {
+        if(*st->buflen - *st->bufpos > 0) {
+            printf("todo: fix, should not happen, remove later\n");
+        }
+        // fill buffer again
+        ssize_t rlen = st->fd->read(st->fd, st->readbuf, st->bufsize);
+        *st->buflen = rlen;
+        *st->bufpos = 0;
+        *perform_io = WS_FALSE;
+        if(rlen < 0) {
+            st->st.io_errno = st->fd->io_errno;
+        }
+        
+        if(rlen > 0) {
+            // call func again to get data from buffer (no IO will be performed)
+            r += net_http_read_buffered(st, buf, nbytes, read_data, perform_io);
+        }
+    }
+    
+    return r;
+}
+
+
+/*
+ * parses a chunk header
+ * the chunk length is stored in chunklen
+ * return:  0 if the data is incomplete
+ *         -1 if an error occured
+ *         >0 chunk header length
+ */
+static int parse_chunk_header(char *str, int len, WSBool first, int64_t *chunklen) {
+    char *hdr_start = NULL;
+    char *hdr_end = NULL;
+    int i = 0;
+    if(first) {
+        hdr_start = str;
+    } else {
+        if(len < 3) {
+            return 0;
+        }
+        if(str[0] == '\r' && str[1] == '\n') {
+            hdr_start = str+2;
+            i = 2;
+        } else if(str[0] == '\n') {
+            hdr_start = str+1;
+            i = 1;
+        } else {
+            return -1;
+        }
+    }
+    
+    for(;i<len;i++) {
+        char c = str[i];
+        if(c == '\r' || c == '\n') {
+            hdr_end = str+i;
+            break;
+        }
+    }
+    if(!hdr_end || i == len) {
+        return 0; // incomplete
+    }
+    
+    if(*hdr_end == '\r') {
+        // we also need '\n'
+        if(hdr_end[1] != '\n') {
+            return -1;
+        }
+        i++; // '\n' found
+    }
+    
+    // parse
+    char save_c = *hdr_end;
+    *hdr_end = '\0';
+    char *end;
+    int64_t clen;
+    errno = 0;
+    clen = strtoll(hdr_start, &end, 16);
+    *hdr_end = save_c;
+    if(errno) {
+        return -1;
+    }
+    i++;
+    
+    if(clen == 0) {
+        // chunk length of 0 indicates the end
+        // an additional \r\n is required (we also accept \n)
+        if(i >= len) {
+            return 0;
+        }
+        if(str[i] == '\n') {
+            i++;
+        } else if(str[i] == '\r') {
+            if(++i >= len) {
+                return 0;
+            }
+            if(str[i] == '\n') {
+                i++;
+            } else {
+                return -1;
+            }
+        } else {
+            return -1;
+        }
+    }
+    
+    *chunklen = clen;
+    return i;
+}
+
+ssize_t net_http_read_chunked(HttpStream *st, void *buf, size_t nbytes) {
+    if(st->read_eof) {
+        return 0;
+    }
+    
+    char *rbuf = buf; // buffer pos
+    size_t rd = 0; // number of bytes read
+    size_t rbuflen = nbytes; // number of bytes until end of buf
+    WSBool perform_io = WS_TRUE; // we do only 1 read before we abort
+    while(rd < nbytes && (perform_io || (st->max_read - st->read) > 0)) {
+        // how many bytes are available in the current chunk
+        size_t chunk_available = st->max_read - st->read;
+        if(chunk_available > 0) {
+            ssize_t r = net_http_read_buffered(st, rbuf, rbuflen, TRUE, &perform_io);
+            if(r == 0) {
+                break;
+            }
+            rd += r;
+            st->read_total += r;
+            rbuf += r;
+            rbuflen -= r;
+        } else {
+            int chunkbuf_avail = HTTP_STREAM_CBUF_SIZE - st->chunk_buf_pos;
+            if(chunkbuf_avail == 0) {
+                // for some reason HTTP_STREAM_CBUF_SIZE is not enough
+                // to store the chunk header
+                // this indicates that something has gone wrong (or this is an attack)
+                st->read_eof = WS_TRUE;
+                return -1;
+            }
+            // fill st->chunk_buf
+            ssize_t r = net_http_read_buffered(st, &st->chunk_buf[st->chunk_buf_pos], chunkbuf_avail, FALSE, &perform_io);
+            if(r == 0) {
+                break;
+            }
+            int chunkbuf_len = st->chunk_buf_pos + r;
+            int64_t chunklen;
+            int ret = parse_chunk_header(st->chunk_buf, chunkbuf_len, st->read_total > 0 ? FALSE : TRUE, &chunklen);
+            if(ret == 0) {
+                // incomplete chunk header
+                st->chunk_buf_pos = chunkbuf_len;
+            } else if(ret < 0) {
+                // error
+                st->read_eof = WS_TRUE;
+                return -1;
+            } else if(ret > 0) {
+                st->max_read = chunklen;
+                st->read = 0;
+                int remaining_len = chunkbuf_len - ret;
+                if(remaining_len > 0) {
+                    // we have read more into chunk_buf than the chunk_header
+                    // it is save to just move bufpos back
+                    *st->bufpos -= remaining_len;
+                }
+                //st->remaining_len = chunkbuf_len - ret;
+                st->chunk_buf_pos = 0;
+                
+                if(chunklen == 0) {
+                    st->read_eof = WS_TRUE;
+                    break;
+                }
+            }
+        }
+        
+        if(!perform_io && rd == 0) {
+            perform_io = WS_TRUE;
+        }
+    }
+    
+    return rd;
+}
+
+ssize_t net_http_sendfile(HttpStream *st, sendfiledata *sfd) {
+    if(st->write_eof) return 0;
     ssize_t ret = 0;
     // TODO: support chunked transfer encoding
     if(st->fd->sendfile) {
@@ -321,9 +603,10 @@
 }
 
 void net_http_finish(HttpStream *st) {
-    if(st->chunked_enc) {
+    if(st->chunked_enc && !st->write_eof) {
         st->fd->write(st->fd, "0\r\n\r\n", 5);
     }
+    st->write_eof = WS_TRUE;
 }
 
 void net_http_setmode(HttpStream *st, int mode) {
@@ -452,7 +735,7 @@
     va_list arg;
     va_start(arg, format);
     sstr_t buf = ucx_vasprintf(ucx_default_allocator(), format, arg);
-    ssize_t r = net_write(fd, buf.ptr, buf.length);
+    ssize_t r = buf.length > 0 ? net_write(fd, buf.ptr, buf.length) : 0;
     free(buf.ptr);
     va_end(arg);
     if(r < 0) {

mercurial