src/server/safs/service.c

branch
aio
changeset 172
5580517faafc
parent 127
84e206063b64
child 184
a2a15ad6e4b9
--- a/src/server/safs/service.c	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/safs/service.c	Sat Feb 18 13:27:25 2017 +0100
@@ -277,6 +277,213 @@
     return 0;
 }
 
+
+static void send_range_cleanup(AsyncSendRange *asr) {
+    WSBool error = asr->error;
+    Session *sn = asr->sn;
+    Request *rq = asr->rq;
+    
+    pool_handle_t *pool = asr->sn->pool;
+    vfs_close(asr->in);
+    pool_free(pool, asr->aio->buf);
+    pool_free(pool, asr->aio);
+    pool_free(pool, asr->readev);
+    pool_free(pool, asr->writeev);
+    pool_free(pool, asr);
+    
+    int ret = REQ_PROCEED;
+    if(error) {
+        rq->rq_attr.keep_alive = 0;
+        ret = REQ_ABORTED;
+    }
+    // return to nsapi loop
+    nsapi_function_return(sn, rq, ret);
+}
+
+static int send_buf(
+        SYS_NETFD out,
+        char *restrict buf,
+        size_t len,
+        size_t *restrict pos)
+{
+    while(*pos < len) {
+        ssize_t w = net_write(out, buf + *pos, len - *pos);
+        if(w <= 0) {
+            return -1;
+        }
+        *pos += w;
+    }
+    return 0;
+}
+
+static int send_bytes(AsyncSendRange *asr, WSBool *completed) {
+    *completed = FALSE;
+    if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) {
+        if(net_errno(asr->out) == EAGAIN) {
+            return 0;
+        } else {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    
+    if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) {
+        if(net_errno(asr->out) == EAGAIN) {
+            return 0;
+        } else {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    
+    if(!asr->read_complete) {
+        // write completed => new asynchronous read
+        asr->aio->offset += asr->aio->result;
+        if(system_aio_read(asr->aio)) {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    *completed = TRUE;
+    return 0;
+}
+
+static int send_range_readevent(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    asr->read_inprogress = FALSE;
+    asr->wpos = 0;
+    if(asr->error) {
+        return 0;
+    }
+    
+    int ret = 1;
+    if(asr->aio->result == 0) {
+        asr->read_complete = TRUE;
+        ret = 0;
+    }
+    
+    WSBool completed;
+    if(send_bytes(asr, &completed)) {
+        return 0;
+    }
+    if(!completed && !asr->write_inprogress) {
+        asr->write_inprogress = TRUE;
+        if(event_pollout(ev, asr->out, asr->writeev)) {
+            asr->error = TRUE;
+            return 0;
+        }
+    }
+    
+    return ret;
+}
+
+static int send_range_writeevent(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(asr->error) {
+        return 0;
+    }
+    
+    WSBool completed;
+    if(send_bytes(asr, &completed)) {
+        return 0;
+    }
+    
+    if(completed && asr->read_complete) {
+        // everything completed
+        return 0;
+    }
+    
+    return 1;
+}
+
+static int send_range_aio_finish(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(!asr->write_inprogress) {
+        send_range_cleanup(asr);
+    }
+    asr->read_inprogress = FALSE;
+    return 0;
+}
+
+static int send_range_poll_finish(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(!asr->read_inprogress) {
+        send_range_cleanup(asr);
+    }
+    asr->write_inprogress = FALSE;
+    return 0;
+}
+
+static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) {
+    net_setnonblock(sn->csd, TRUE);
+    
+    // try to send the header
+    ssize_t hw = net_write(sn->csd, header, headerlen);
+    if(hw < 0) {
+        if(net_errno(sn->csd) == EAGAIN) {
+            hw = 0;
+        } else {
+            return REQ_ABORTED;
+        }
+    }
+    
+    AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange));
+    asr->sn = sn;
+    asr->rq = rq;
+    asr->in = fd;
+    asr->out = sn->csd;
+    asr->offset = offset;
+    asr->length = length;
+    asr->pos = offset;
+    asr->read_complete = FALSE;
+    asr->read_inprogress = FALSE;
+    asr->write_inprogress = FALSE;
+    asr->error = FALSE;
+    if(hw == headerlen) {
+        asr->header = NULL;
+        asr->headerlen = 0;
+        asr->headerpos = 0;
+    } else {
+        asr->header = header;
+        asr->headerlen = headerlen;
+        asr->headerpos = hw;
+    }
+    
+    Event *readev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(readev, sizeof(Event));
+    readev->cookie = asr;
+    readev->fn = send_range_readevent;
+    readev->finish = send_range_aio_finish;
+    
+    Event *writeev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(writeev, sizeof(Event));
+    writeev->cookie = asr;
+    writeev->fn = send_range_writeevent;
+    writeev->finish = send_range_poll_finish;
+    
+    asr->readev = readev;
+    asr->writeev = writeev;
+    
+    aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s));
+    aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE);
+    aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length;
+    aio->filedes = fd;
+    aio->offset = offset;
+    aio->evhandler = sn->ev;
+    aio->event = readev;
+    
+    asr->aio = aio;
+    asr->wpos = 0;
+    
+    if(system_aio_read(aio)) {
+        send_range_cleanup(asr);
+        return REQ_ABORTED;
+    }
+    asr->read_inprogress = TRUE;
+    
+    return REQ_PROCESSING;
+}
+
 struct multi_range_elm {
     sstr_t header;
     off_t  offset;
@@ -420,9 +627,15 @@
         // send response header
         http_start_response(sn, rq);
         // send content
+        int ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0);
+        if(ret == REQ_PROCESSING) {
+            return ret;
+        }
+/*
         if(send_range(sn, fd, offset, length, NULL, 0)) {
             // TODO: error
         }
+*/
     } else {
         if(send_multi_range(sn, rq, fd, s.st_size, range)) {
             // TODO: error

mercurial