src/server/safs/service.c

changeset 193
aa8393527b1e
parent 192
6a145e13d933
child 199
d62f2016cfe5
--- a/src/server/safs/service.c	Thu Aug 31 16:29:49 2017 +0200
+++ b/src/server/safs/service.c	Sat Jan 13 19:01:00 2018 +0100
@@ -43,7 +43,6 @@
 
 #include <errno.h>
 
-
 /*
  * prepares servicing a file
  *
@@ -277,6 +276,223 @@
     return 0;
 }
 
+
+static void send_range_cleanup(AsyncSendRange *asr) {
+    WSBool error = asr->error;
+    Session *sn = asr->sn;
+    Request *rq = asr->rq;
+    
+    pool_handle_t *pool = asr->sn->pool;
+    vfs_close(asr->in);
+    pool_free(pool, asr->aio->buf);
+    pool_free(pool, asr->aio);
+    pool_free(pool, asr->readev);
+    pool_free(pool, asr->writeev);
+    pool_free(pool, asr);
+    
+    int ret = REQ_PROCEED;
+    if(error) {
+        rq->rq_attr.keep_alive = 0;
+        ret = REQ_ABORTED;
+    }
+    // return to nsapi loop
+    nsapi_function_return(sn, rq, ret);
+}
+
+static int send_buf(
+        SYS_NETFD out,
+        char *restrict buf,
+        size_t len,
+        size_t *restrict pos)
+{
+    while(*pos < len) {
+        ssize_t w = net_write(out, buf + *pos, len - *pos);
+        if(w <= 0) {
+            return -1;
+        }
+        *pos += w;
+    }
+    return 0;
+}
+
+static int send_bytes(AsyncSendRange *asr, WSBool *completed) {
+    *completed = FALSE;
+    if(asr->header) {
+        if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) {
+            if(net_errno(asr->out) == EAGAIN) {
+                return 0;
+            } else {
+                asr->error = TRUE;
+                return 1;
+            }
+        }
+        if(asr->headerpos >= asr->headerlen) {
+            asr->header = NULL;
+        }
+    }
+    
+    if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) {
+        if(net_errno(asr->out) == EAGAIN) {
+            return 0;
+        } else {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    
+    if(!asr->read_complete) {
+        // write completed => new asynchronous read
+        asr->aio->offset += asr->aio->result;
+        size_t length = asr->end - asr->offset;
+        asr->aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length;
+        asr->read_inprogress = TRUE;
+        if(system_aio_read(asr->aio)) {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    *completed = TRUE;
+    return 0;
+}
+
+static int send_range_readevent(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    asr->read_inprogress = FALSE;
+    asr->wpos = 0;
+    asr->offset += asr->aio->result;
+    if(asr->error || asr->aio->result < 0) {
+        return 0;
+    }
+    
+    int ret = 1;
+    if(asr->aio->result == 0 || asr->offset >= asr->end) {
+        asr->read_complete = TRUE;
+        ret = 0;
+    }
+    
+    WSBool completed;
+    if(send_bytes(asr, &completed)) {
+        return 0;
+    }
+    if(!completed && !asr->write_inprogress) {
+        asr->write_inprogress = TRUE;
+        if(event_pollout(ev, asr->out, asr->writeev)) {
+            asr->error = TRUE;
+            return 0;
+        }
+    }
+    
+    return ret;
+}
+
+static int send_range_writeevent(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(asr->error) {
+        return 1;
+    }
+    
+    WSBool completed;
+    if(send_bytes(asr, &completed)) {
+        return 1;
+    }
+    
+    if(completed) {
+        return 0;
+    }
+    
+    return 1;
+}
+
+static int send_range_aio_finish(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(!asr->write_inprogress) {
+        send_range_cleanup(asr);
+    }
+    asr->read_inprogress = FALSE;
+    return 0;
+}
+
+static int send_range_poll_finish(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(!asr->read_inprogress) {
+        send_range_cleanup(asr);
+    }
+    asr->write_inprogress = FALSE;
+    return 0;
+}
+
+static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) {
+    net_setnonblock(sn->csd, TRUE);
+    
+    // try to send the header
+    ssize_t hw = net_write(sn->csd, header, headerlen);
+    if(hw < 0) {
+        if(net_errno(sn->csd) == EAGAIN) {
+            hw = 0;
+        } else {
+            return REQ_ABORTED;
+        }
+    }
+    
+    AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange));
+    asr->sn = sn;
+    asr->rq = rq;
+    asr->in = fd;
+    asr->out = sn->csd;
+    asr->offset = offset;
+    asr->end = offset + length;
+    //asr->length = length;
+    asr->pos = offset;
+    asr->read_complete = FALSE;
+    asr->read_inprogress = FALSE;
+    asr->write_inprogress = FALSE;
+    asr->error = FALSE;
+    if(hw == headerlen) {
+        asr->header = NULL;
+        asr->headerlen = 0;
+        asr->headerpos = 0;
+    } else {
+        asr->header = header;
+        asr->headerlen = headerlen;
+        asr->headerpos = hw;
+    }
+    
+    Event *readev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(readev, sizeof(Event));
+    readev->cookie = asr;
+    readev->fn = send_range_readevent;
+    readev->finish = send_range_aio_finish;
+    
+    Event *writeev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(writeev, sizeof(Event));
+    writeev->cookie = asr;
+    writeev->fn = send_range_writeevent;
+    writeev->finish = send_range_poll_finish;
+    
+    asr->readev = readev;
+    asr->writeev = writeev;
+    
+    aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s));
+    aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE);
+    aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length;
+    aio->filedes = fd;
+    aio->offset = offset;
+    aio->evhandler = sn->ev;
+    aio->event = readev;
+    
+    asr->aio = aio;
+    asr->wpos = 0;
+    
+    asr->read_inprogress = TRUE;
+    if(system_aio_read(aio)) {
+        send_range_cleanup(asr);
+        return REQ_ABORTED;
+    }
+    asr->read_inprogress = TRUE;
+    
+    return REQ_PROCESSING;
+}
+
 struct multi_range_elm {
     sstr_t header;
     off_t  offset;
@@ -416,24 +632,30 @@
         length = s.st_size;
     }
     
+    int ret = REQ_NOACTION;
     if(single_range) {
         // send response header
         http_start_response(sn, rq);
         // send content
+        ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0);
+        if(ret == REQ_PROCESSING) {
+            return ret;
+        }
+/*
         if(send_range(sn, fd, offset, length, NULL, 0)) {
             // TODO: error
         }
+//*/
     } else {
-        if(send_multi_range(sn, rq, fd, s.st_size, range)) {
-            // TODO: error
-        }
+        ret = send_multi_range(sn, rq, fd, s.st_size, range);
+        // TODO: error
     }
     
     // cleanup
     vfs_close(fd);
     free_range(sn, range);
 
-    return REQ_PROCEED;
+    return ret;
 }
 
 

mercurial