diff -r f33974f0dce0 -r aa8393527b1e src/server/safs/service.c --- a/src/server/safs/service.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/safs/service.c Sat Jan 13 19:01:00 2018 +0100 @@ -43,7 +43,6 @@ #include - /* * prepares servicing a file * @@ -277,6 +276,223 @@ return 0; } + +static void send_range_cleanup(AsyncSendRange *asr) { + WSBool error = asr->error; + Session *sn = asr->sn; + Request *rq = asr->rq; + + pool_handle_t *pool = asr->sn->pool; + vfs_close(asr->in); + pool_free(pool, asr->aio->buf); + pool_free(pool, asr->aio); + pool_free(pool, asr->readev); + pool_free(pool, asr->writeev); + pool_free(pool, asr); + + int ret = REQ_PROCEED; + if(error) { + rq->rq_attr.keep_alive = 0; + ret = REQ_ABORTED; + } + // return to nsapi loop + nsapi_function_return(sn, rq, ret); +} + +static int send_buf( + SYS_NETFD out, + char *restrict buf, + size_t len, + size_t *restrict pos) +{ + while(*pos < len) { + ssize_t w = net_write(out, buf + *pos, len - *pos); + if(w <= 0) { + return -1; + } + *pos += w; + } + return 0; +} + +static int send_bytes(AsyncSendRange *asr, WSBool *completed) { + *completed = FALSE; + if(asr->header) { + if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) { + if(net_errno(asr->out) == EAGAIN) { + return 0; + } else { + asr->error = TRUE; + return 1; + } + } + if(asr->headerpos >= asr->headerlen) { + asr->header = NULL; + } + } + + if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) { + if(net_errno(asr->out) == EAGAIN) { + return 0; + } else { + asr->error = TRUE; + return 1; + } + } + + if(!asr->read_complete) { + // write completed => new asynchronous read + asr->aio->offset += asr->aio->result; + size_t length = asr->end - asr->offset; + asr->aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length; + asr->read_inprogress = TRUE; + if(system_aio_read(asr->aio)) { + asr->error = TRUE; + return 1; + } + } + *completed = TRUE; + return 0; +} + +static int send_range_readevent(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + asr->read_inprogress = FALSE; + asr->wpos = 0; + asr->offset += asr->aio->result; + if(asr->error || asr->aio->result < 0) { + return 0; + } + + int ret = 1; + if(asr->aio->result == 0 || asr->offset >= asr->end) { + asr->read_complete = TRUE; + ret = 0; + } + + WSBool completed; + if(send_bytes(asr, &completed)) { + return 0; + } + if(!completed && !asr->write_inprogress) { + asr->write_inprogress = TRUE; + if(event_pollout(ev, asr->out, asr->writeev)) { + asr->error = TRUE; + return 0; + } + } + + return ret; +} + +static int send_range_writeevent(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(asr->error) { + return 1; + } + + WSBool completed; + if(send_bytes(asr, &completed)) { + return 1; + } + + if(completed) { + return 0; + } + + return 1; +} + +static int send_range_aio_finish(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(!asr->write_inprogress) { + send_range_cleanup(asr); + } + asr->read_inprogress = FALSE; + return 0; +} + +static int send_range_poll_finish(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(!asr->read_inprogress) { + send_range_cleanup(asr); + } + asr->write_inprogress = FALSE; + return 0; +} + +static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) { + net_setnonblock(sn->csd, TRUE); + + // try to send the header + ssize_t hw = net_write(sn->csd, header, headerlen); + if(hw < 0) { + if(net_errno(sn->csd) == EAGAIN) { + hw = 0; + } else { + return REQ_ABORTED; + } + } + + AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange)); + asr->sn = sn; + asr->rq = rq; + asr->in = fd; + asr->out = sn->csd; + asr->offset = offset; + asr->end = offset + length; + //asr->length = length; + asr->pos = offset; + asr->read_complete = FALSE; + asr->read_inprogress = FALSE; + asr->write_inprogress = FALSE; + asr->error = FALSE; + if(hw == headerlen) { + asr->header = NULL; + asr->headerlen = 0; + asr->headerpos = 0; + } else { + asr->header = header; + asr->headerlen = headerlen; + asr->headerpos = hw; + } + + Event *readev = pool_malloc(sn->pool, sizeof(Event)); + ZERO(readev, sizeof(Event)); + readev->cookie = asr; + readev->fn = send_range_readevent; + readev->finish = send_range_aio_finish; + + Event *writeev = pool_malloc(sn->pool, sizeof(Event)); + ZERO(writeev, sizeof(Event)); + writeev->cookie = asr; + writeev->fn = send_range_writeevent; + writeev->finish = send_range_poll_finish; + + asr->readev = readev; + asr->writeev = writeev; + + aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s)); + aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE); + aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length; + aio->filedes = fd; + aio->offset = offset; + aio->evhandler = sn->ev; + aio->event = readev; + + asr->aio = aio; + asr->wpos = 0; + + asr->read_inprogress = TRUE; + if(system_aio_read(aio)) { + send_range_cleanup(asr); + return REQ_ABORTED; + } + asr->read_inprogress = TRUE; + + return REQ_PROCESSING; +} + struct multi_range_elm { sstr_t header; off_t offset; @@ -416,24 +632,30 @@ length = s.st_size; } + int ret = REQ_NOACTION; if(single_range) { // send response header http_start_response(sn, rq); // send content + ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0); + if(ret == REQ_PROCESSING) { + return ret; + } +/* if(send_range(sn, fd, offset, length, NULL, 0)) { // TODO: error } +//*/ } else { - if(send_multi_range(sn, rq, fd, s.st_size, range)) { - // TODO: error - } + ret = send_multi_range(sn, rq, fd, s.st_size, range); + // TODO: error } // cleanup vfs_close(fd); free_range(sn, range); - return REQ_PROCEED; + return ret; }