adds public aio and poll api and asynchronous send_range function aio

Sat, 18 Feb 2017 13:27:25 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sat, 18 Feb 2017 13:27:25 +0100
branch
aio
changeset 172
5580517faafc
parent 170
711d00eeed25
child 184
a2a15ad6e4b9

adds public aio and poll api and asynchronous send_range function

aio and poll api is only implemented on solaris yet
send_file saf uses send_range_aio for single range requests

src/server/daemon/event.h file | annotate | diff | comparison | revisions
src/server/daemon/event_solaris.c file | annotate | diff | comparison | revisions
src/server/daemon/httprequest.h file | annotate | diff | comparison | revisions
src/server/daemon/vfs.c file | annotate | diff | comparison | revisions
src/server/daemon/vfs.h file | annotate | diff | comparison | revisions
src/server/public/nsapi.h file | annotate | diff | comparison | revisions
src/server/public/vfs.h file | annotate | diff | comparison | revisions
src/server/safs/service.c file | annotate | diff | comparison | revisions
src/server/safs/service.h file | annotate | diff | comparison | revisions
src/server/util/io.c file | annotate | diff | comparison | revisions
src/server/util/io.h file | annotate | diff | comparison | revisions
--- a/src/server/daemon/event.h	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/daemon/event.h	Sat Feb 18 13:27:25 2017 +0100
@@ -62,10 +62,12 @@
 EVHandler* evhandler_create(EventHandlerConfig *cfg);
 
 int ev_pollin(EventHandler *h, int fd, Event *event);
+int ev_pollout(EventHandler *h, int fd, Event *event);
+int ev_remove_poll(EventHandler *h, int fd);
+int ev_send(EventHandler *h, Event *event);
 
-int ev_pollout(EventHandler *h, int fd, Event *event);
-
-int evt_send(EventHandler *h, Event *event);
+int ev_aioread(int fd, aiocb_s *cb);
+int ev_aiowrite(int fd, aiocb_s *cb);
 
 
 #ifdef	__cplusplus
--- a/src/server/daemon/event_solaris.c	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/daemon/event_solaris.c	Sat Feb 18 13:27:25 2017 +0100
@@ -30,6 +30,8 @@
 #include <stdlib.h>
 #include <atomic.h>
 
+#include "../util/io.h"
+
 #include "event_solaris.h"
 
 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
@@ -77,26 +79,41 @@
         
         for(int i=0;i<nev;i++) {
             Event *event = events[i].portev_user;
-            if(event->fn) {
-                if(event->fn(ev, event)) {
-                    /*
-                     * on solaris we have to reassociate the fd after
-                     * each event
-                     * we do this if the event function returns 1
-                     */
-                    if(port_associate(
-                            ev->port,
-                            PORT_SOURCE_FD,
-                            (uintptr_t)event->object,
-                            ev_convert2sys_events(event->events),
-                            event))
-                    {
-                        perror("port_associate");
-                    }                 
-                } else if(event->finish) {
-                    event->finish(ev, event);
+            if(events[i].portev_source == PORT_SOURCE_AIO) {
+                aiocb_t *aiocb = (aiocb_t*)events[i].portev_object;
+                if(event) {
+                    aiocb_s *aio = (aiocb_s*)event->object;
+                    aio->result = aiocb->aio_resultp.aio_return;
+                    aio->result_errno = aiocb->aio_resultp.aio_errno;
+                    if(event->fn) {
+                        if(!event->fn(ev, event) && event->finish) {
+                            event->finish(ev, event);
+                        }
+                    }
                 }
-            }
+                free(aiocb);  
+            } else {
+                if(event->fn) {
+                    if(event->fn(ev, event)) {
+                        /*
+                         * on solaris we have to reassociate the fd after
+                         * each event
+                         * we do this if the event function returns 1
+                         */
+                        if(port_associate(
+                                ev->port,
+                                PORT_SOURCE_FD,
+                                (uintptr_t)event->object,
+                                ev_convert2sys_events(event->events),
+                                event))
+                        {
+                            perror("port_associate");
+                        }                 
+                    } else if(event->finish) {
+                        event->finish(ev, event);
+                    }
+                }
+            }  
         }
     }
 }
@@ -135,8 +152,67 @@
             event);
 }
 
-int evt_send(EventHandler *h, Event *event) {
+int ev_remove_poll(EventHandler *h, int fd) {
+    return port_dissociate(h->port, PORT_SOURCE_FD, (uintptr_t)fd);
+}
+
+int ev_send(EventHandler *h, Event *event) {
     event->object = 0;
     event->events = 0;
     return port_send(h->port, 0, event);
 }
+
+static int ev_aio(int fd, aiocb_s *cb, WSBool read) {
+    EventHandler *ev = cb->evhandler;
+    if(!ev) {
+        return -1;
+    }
+    
+    aiocb_t *aiocb = malloc(sizeof(aiocb_t));
+    if(!aiocb) {
+        return -1;
+    }
+    ZERO(aiocb, sizeof(aiocb_t));
+    
+    aiocb->aio_fildes = fd;
+    aiocb->aio_buf = cb->buf;
+    aiocb->aio_nbytes = cb->nbytes;
+    aiocb->aio_offset = cb->offset;
+    
+    port_notify_t *portnotify = malloc(sizeof(port_notify_t));
+    if(!portnotify) {
+        free(aiocb);
+        return -1;
+    }
+    portnotify->portnfy_port = ev->port;
+    portnotify->portnfy_user = cb->event;
+    aiocb->aio_sigevent.sigev_notify = SIGEV_PORT;
+    aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify;
+    
+    if(read) {
+        return aio_read(aiocb);
+    } else {
+        return aio_write(aiocb);
+    }
+}
+
+int ev_aioread(int fd, aiocb_s *cb) {
+    return ev_aio(fd, cb, TRUE);
+}
+
+int ev_aiowrite(int fd, aiocb_s *cb) {
+    return ev_aio(fd, cb, FALSE);
+}
+
+
+int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event);
+}
+
+int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event);
+}
+
+int event_removepoll(EventHandler *ev, SYS_NETFD fd) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL);
+}
--- a/src/server/daemon/httprequest.h	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/daemon/httprequest.h	Sat Feb 18 13:27:25 2017 +0100
@@ -105,7 +105,7 @@
         NSAPIRequest *rq,
         threadpool_t *pool);
 
-void nsapi_function_return(Session *sn, Request *rq, int ret);
+//void nsapi_function_return(Session *sn, Request *rq, int ret);
 
 void nsapi_change_threadpool(
         NSAPISession *sn,
--- a/src/server/daemon/vfs.c	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/daemon/vfs.c	Sat Feb 18 13:27:25 2017 +0100
@@ -32,12 +32,14 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <sys/types.h>
+#include <aio.h>
 
 #include <ucx/map.h>
 
 #include "../util/pool.h"
 #include "acl.h"
 #include "vfs.h"
+#include "event.h"
 
 #define VFS_MALLOC(pool, size) pool ? pool_malloc(pool, size) : malloc(size)
 #define VFS_FREE(pool, ptr) pool ? pool_free(pool, ptr) : free(ptr)
@@ -58,7 +60,9 @@
     sys_file_read,
     sys_file_write,
     sys_file_seek,
-    sys_file_close
+    sys_file_close,
+    sys_file_aioread,
+    sys_file_aiowrite
 };
 
 static VFS_DIRIO sys_dir_io = {
@@ -497,6 +501,19 @@
     close(fd->fd);
 }
 
+int sys_file_aioread(aiocb_s *aiocb) {
+    WS_ASSERT(aiocb->buf);
+    WS_ASSERT(aiocb->nbytes > 0);
+    return ev_aioread(aiocb->filedes->fd, aiocb);
+}
+
+int sys_file_aiowrite(aiocb_s *aiocb) {
+    WS_ASSERT(aiocb->buf);
+    WS_ASSERT(aiocb->nbytes > 0);
+    return ev_aiowrite(aiocb->filedes->fd, aiocb);
+}
+
+
 int sys_dir_read(VFS_DIR dir, VFS_ENTRY *entry, int getstat) {
     SysVFSDir *dirdata = dir->data;
     struct dirent *result = NULL;
@@ -578,3 +595,35 @@
     vfs_close(fd);
     return 0;
 }
+
+// AIO API
+
+NSAPI_PUBLIC int system_aio_read(aiocb_s *aiocb) {
+    if(!aiocb->event || !aiocb->evhandler) {
+        return -1;
+    }
+   
+    SYS_FILE file = aiocb->filedes;
+    aiocb->event->object = (intptr_t)aiocb;
+    if(file->io->opt_aioread) {
+        return file->io->opt_aioread(aiocb);
+    } else {
+        // TODO: implement
+        return -1;
+    }
+}
+
+NSAPI_PUBLIC int system_aio_write(aiocb_s *aiocb) {
+    if(!aiocb->event || !aiocb->evhandler) {
+        return -1;
+    }
+    
+    SYS_FILE file = aiocb->filedes;
+    aiocb->event->object = (intptr_t)aiocb;
+    if(file->io->opt_aiowrite) {
+        return file->io->opt_aiowrite(aiocb);
+    } else {
+        // TODO: implement
+        return -1;
+    }
+}
--- a/src/server/daemon/vfs.h	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/daemon/vfs.h	Sat Feb 18 13:27:25 2017 +0100
@@ -62,6 +62,8 @@
 ssize_t sys_file_write(SYS_FILE fd, const void *buf, size_t nbyte);
 off_t sys_file_seek(SYS_FILE fd, off_t offset, int whence);
 void sys_file_close(SYS_FILE fd);
+int sys_file_aioread(aiocb_s *aiocb);
+int sys_file_aiowrite(aiocb_s *aiocb);
 
 int sys_dir_read(VFS_DIR dir, VFS_ENTRY *entry, int getstat);
 void sys_dir_close(VFS_DIR dir);
--- a/src/server/public/nsapi.h	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/public/nsapi.h	Sat Feb 18 13:27:25 2017 +0100
@@ -1138,7 +1138,19 @@
 #define ISMOPTIONS(r)   ((r)->method_num == METHOD_OPTIONS)
 
 
-// new type
+// new types
+typedef struct aiocb_s {
+    SYS_FILE filedes;
+    void         *buf;
+    size_t       nbytes;
+    off_t        offset;
+    ssize_t      result;
+    int          result_errno;
+    Event        *event;
+    EventHandler *evhandler;
+} aiocb_s;
+
+
 typedef struct _thread_pool       threadpool_t;
 typedef struct  _threadpool_job   threadpool_job;
 typedef void*(*job_callback_f)(void *data);
@@ -1319,6 +1331,8 @@
 // NSAPI extension
 ssize_t net_printf(SYS_NETFD fd, char *format, ...);
 
+int net_setnonblock(SYS_NETFD fd, int nonblock);
+int net_errno(SYS_NETFD fd);
 
 NSAPI_PUBLIC pb_param *INTparam_create(const char *name, const char *value);
 
@@ -1527,6 +1541,9 @@
 NSAPI_PUBLIC off_t system_lseek(SYS_FILE fd, off_t offset, int whence);
 NSAPI_PUBLIC int system_fclose(SYS_FILE fd);
 
+NSAPI_PUBLIC int system_aio_read(aiocb_s *aiocb);
+NSAPI_PUBLIC int system_aio_write(aiocb_s *aiocb);
+
 
 int log_ereport(int degree, const char *format, ...);
 int log_ereport_v(int degree, const char *format, va_list args);
@@ -1542,6 +1559,7 @@
 NSAPI_PUBLIC pblock* util_parse_param(pool_handle_t *pool, char *query);
 #define util_parse_param util_parse_param
 
+void nsapi_function_return(Session *sn, Request *rq, int ret);
 
 // threadpool
 threadpool_t* threadpool_new(int min, int max);
@@ -1549,12 +1567,9 @@
 threadpool_job* threadpool_get_job(threadpool_t *pool);
 void threadpool_run(threadpool_t *pool, job_callback_f func, void *data);
 
-int ev_pollin(EventHandler *h, int fd, Event *event);
-
-int ev_pollout(EventHandler *h, int fd, Event *event);
-
-int evt_send(EventHandler *h, Event *event);
-
+int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event);
+int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event);
+int event_removepoll(EventHandler *ev, SYS_NETFD fd);
 
 // assert
 void ws_log_assert(const char *file, const char *func, int line);
--- a/src/server/public/vfs.h	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/public/vfs.h	Sat Feb 18 13:27:25 2017 +0100
@@ -94,6 +94,8 @@
     ssize_t (*write)(SYS_FILE fd, const void *buf, size_t nbyte);
     off_t (*seek)(SYS_FILE fd, off_t offset, int whence);
     void (*close)(SYS_FILE fd);
+    int (*opt_aioread)(aiocb_s *aiocb);
+    int (*opt_aiowrite)(aiocb_s *aiocb);
 };
 
 struct VFS_DIRIO {
--- a/src/server/safs/service.c	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/safs/service.c	Sat Feb 18 13:27:25 2017 +0100
@@ -277,6 +277,213 @@
     return 0;
 }
 
+
+static void send_range_cleanup(AsyncSendRange *asr) {
+    WSBool error = asr->error;
+    Session *sn = asr->sn;
+    Request *rq = asr->rq;
+    
+    pool_handle_t *pool = asr->sn->pool;
+    vfs_close(asr->in);
+    pool_free(pool, asr->aio->buf);
+    pool_free(pool, asr->aio);
+    pool_free(pool, asr->readev);
+    pool_free(pool, asr->writeev);
+    pool_free(pool, asr);
+    
+    int ret = REQ_PROCEED;
+    if(error) {
+        rq->rq_attr.keep_alive = 0;
+        ret = REQ_ABORTED;
+    }
+    // return to nsapi loop
+    nsapi_function_return(sn, rq, ret);
+}
+
+static int send_buf(
+        SYS_NETFD out,
+        char *restrict buf,
+        size_t len,
+        size_t *restrict pos)
+{
+    while(*pos < len) {
+        ssize_t w = net_write(out, buf + *pos, len - *pos);
+        if(w <= 0) {
+            return -1;
+        }
+        *pos += w;
+    }
+    return 0;
+}
+
+static int send_bytes(AsyncSendRange *asr, WSBool *completed) {
+    *completed = FALSE;
+    if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) {
+        if(net_errno(asr->out) == EAGAIN) {
+            return 0;
+        } else {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    
+    if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) {
+        if(net_errno(asr->out) == EAGAIN) {
+            return 0;
+        } else {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    
+    if(!asr->read_complete) {
+        // write completed => new asynchronous read
+        asr->aio->offset += asr->aio->result;
+        if(system_aio_read(asr->aio)) {
+            asr->error = TRUE;
+            return 1;
+        }
+    }
+    *completed = TRUE;
+    return 0;
+}
+
+static int send_range_readevent(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    asr->read_inprogress = FALSE;
+    asr->wpos = 0;
+    if(asr->error) {
+        return 0;
+    }
+    
+    int ret = 1;
+    if(asr->aio->result == 0) {
+        asr->read_complete = TRUE;
+        ret = 0;
+    }
+    
+    WSBool completed;
+    if(send_bytes(asr, &completed)) {
+        return 0;
+    }
+    if(!completed && !asr->write_inprogress) {
+        asr->write_inprogress = TRUE;
+        if(event_pollout(ev, asr->out, asr->writeev)) {
+            asr->error = TRUE;
+            return 0;
+        }
+    }
+    
+    return ret;
+}
+
+static int send_range_writeevent(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(asr->error) {
+        return 0;
+    }
+    
+    WSBool completed;
+    if(send_bytes(asr, &completed)) {
+        return 0;
+    }
+    
+    if(completed && asr->read_complete) {
+        // everything completed
+        return 0;
+    }
+    
+    return 1;
+}
+
+static int send_range_aio_finish(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(!asr->write_inprogress) {
+        send_range_cleanup(asr);
+    }
+    asr->read_inprogress = FALSE;
+    return 0;
+}
+
+static int send_range_poll_finish(EventHandler *ev, Event *event) {
+    AsyncSendRange *asr = event->cookie;
+    if(!asr->read_inprogress) {
+        send_range_cleanup(asr);
+    }
+    asr->write_inprogress = FALSE;
+    return 0;
+}
+
+static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) {
+    net_setnonblock(sn->csd, TRUE);
+    
+    // try to send the header
+    ssize_t hw = net_write(sn->csd, header, headerlen);
+    if(hw < 0) {
+        if(net_errno(sn->csd) == EAGAIN) {
+            hw = 0;
+        } else {
+            return REQ_ABORTED;
+        }
+    }
+    
+    AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange));
+    asr->sn = sn;
+    asr->rq = rq;
+    asr->in = fd;
+    asr->out = sn->csd;
+    asr->offset = offset;
+    asr->length = length;
+    asr->pos = offset;
+    asr->read_complete = FALSE;
+    asr->read_inprogress = FALSE;
+    asr->write_inprogress = FALSE;
+    asr->error = FALSE;
+    if(hw == headerlen) {
+        asr->header = NULL;
+        asr->headerlen = 0;
+        asr->headerpos = 0;
+    } else {
+        asr->header = header;
+        asr->headerlen = headerlen;
+        asr->headerpos = hw;
+    }
+    
+    Event *readev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(readev, sizeof(Event));
+    readev->cookie = asr;
+    readev->fn = send_range_readevent;
+    readev->finish = send_range_aio_finish;
+    
+    Event *writeev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(writeev, sizeof(Event));
+    writeev->cookie = asr;
+    writeev->fn = send_range_writeevent;
+    writeev->finish = send_range_poll_finish;
+    
+    asr->readev = readev;
+    asr->writeev = writeev;
+    
+    aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s));
+    aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE);
+    aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length;
+    aio->filedes = fd;
+    aio->offset = offset;
+    aio->evhandler = sn->ev;
+    aio->event = readev;
+    
+    asr->aio = aio;
+    asr->wpos = 0;
+    
+    if(system_aio_read(aio)) {
+        send_range_cleanup(asr);
+        return REQ_ABORTED;
+    }
+    asr->read_inprogress = TRUE;
+    
+    return REQ_PROCESSING;
+}
+
 struct multi_range_elm {
     sstr_t header;
     off_t  offset;
@@ -420,9 +627,15 @@
         // send response header
         http_start_response(sn, rq);
         // send content
+        int ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0);
+        if(ret == REQ_PROCESSING) {
+            return ret;
+        }
+/*
         if(send_range(sn, fd, offset, length, NULL, 0)) {
             // TODO: error
         }
+*/
     } else {
         if(send_multi_range(sn, rq, fd, s.st_size, range)) {
             // TODO: error
--- a/src/server/safs/service.h	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/safs/service.h	Sat Feb 18 13:27:25 2017 +0100
@@ -30,11 +30,14 @@
 #define	SERVICE_H
 
 #include "../public/nsapi.h"
+#include "../util/systems.h"
 
 #ifdef	__cplusplus
 extern "C" {
 #endif
 
+#define AIO_BUF_SIZE 16384
+    
 typedef struct HttpRange HttpRange; 
    
 struct HttpRange {
@@ -42,7 +45,27 @@
     off_t end;
     HttpRange *next;
 };
-    
+
+typedef struct AsyncSendRange {
+    Session   *sn;
+    Request   *rq;
+    SYS_FILE  in;
+    SYS_NETFD out;
+    off_t     offset;
+    off_t     length;
+    off_t     pos;
+    char      *header;
+    int       headerlen;
+    size_t    headerpos;
+    Event     *readev;
+    Event     *writeev;
+    aiocb_s   *aio;
+    size_t    wpos;
+    WSBool    read_complete;
+    WSBool    read_inprogress;
+    WSBool    write_inprogress;
+    WSBool    error;
+} AsyncSendRange;
     
 int send_file(pblock *pb, Session *sn, Request *rq);
 
--- a/src/server/util/io.c	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/util/io.c	Sat Feb 18 13:27:25 2017 +0100
@@ -64,6 +64,7 @@
 #include "../daemon/vfs.h"
 #include "io.h"
 #include "pool.h"
+#include "../daemon/event.h"
 #include "ucx/utils.h"
 
 IOStream native_io_funcs = {
@@ -72,7 +73,10 @@
     (io_read_f)net_sys_read,
     (io_sendfile_f)NET_SYS_SENDFILE,
     (io_close_f)net_sys_close,
-    NULL
+    NULL,
+    (io_setmode_f)net_sys_setmode,
+    (io_poll_f)net_sys_poll,
+    0
 };
 
 IOStream http_io_funcs = {
@@ -81,7 +85,10 @@
     (io_read_f)net_http_read,
     (io_sendfile_f)net_http_sendfile,
     (io_close_f)net_http_close,
-    (io_finish_f)net_http_finish
+    (io_finish_f)net_http_finish,
+    (io_setmode_f)net_http_setmode,
+    (io_poll_f)net_http_poll,
+    0
 };
 
 IOStream ssl_io_funcs = {
@@ -90,7 +97,10 @@
     (io_read_f)net_ssl_read,
     NULL,
     (io_close_f)net_ssl_close,
-    (io_finish_f)net_ssl_finish
+    (io_finish_f)net_ssl_finish,
+    (io_setmode_f)net_ssl_setmode,
+    (io_poll_f)net_ssl_poll,
+    0
 };
 
 
@@ -170,6 +180,34 @@
     close(st->fd);
 }
 
+void net_sys_setmode(SysStream *st, int mode) {
+    int flags;
+    if (-1 == (flags = fcntl(st->fd, F_GETFL, 0))) {
+        flags = 0;
+    }
+    if(mode == IO_MODE_BLOCKING) {
+        if (fcntl(st->fd, F_SETFL, flags & ~O_NONBLOCK) != 0) {
+            perror("fcntl");
+            // TODO: error
+        }
+    } else if(mode == IO_MODE_NONBLOCKING) {
+        if (fcntl(st->fd, F_SETFL, flags | O_NONBLOCK) != 0) {
+            perror("fcntl");
+            // TODO: error
+        }
+    }
+}
+
+int net_sys_poll(SysStream *st, EventHandler *ev, int events, Event *cb) {
+    switch(events) {
+        default: return -1;
+        case IO_POLL_NONE: return ev_remove_poll(ev, st->fd);
+        case IO_POLL_IN: return ev_pollin(ev, st->fd, cb);
+        case IO_POLL_OUT: return ev_pollout(ev, st->fd, cb);
+        case IO_POLL_IN | IO_POLL_OUT: return -1; // TODO: implement
+    }
+}
+
 #elif defined(XP_WIN32)
 
 ssize_t net_sys_write(SysStream *st, void *buf, size_t nbytes) {
@@ -287,6 +325,14 @@
     }
 }
 
+void net_http_setmode(HttpStream *st, int mode) {
+    st->fd->setmode(st->fd, mode);
+}
+
+int net_http_poll(HttpStream *st, EventHandler *ev, int events, Event *cb) {
+    return st->fd->poll(st->fd, ev, events, cb);
+}
+
 
 /*
  * SSLStream implementation
@@ -341,6 +387,34 @@
     
 }
 
+void net_ssl_setmode(SSLStream *st, int mode) {
+    int flags;
+    if (-1 == (flags = fcntl(SSL_get_fd(st->ssl), F_GETFL, 0))) {
+        flags = 0;
+    }
+    if(mode == IO_MODE_BLOCKING) {
+        if (fcntl(SSL_get_fd(st->ssl), F_SETFL, flags & ~O_NONBLOCK) != 0) {
+            perror("fcntl");
+            // TODO: error
+        }
+    } else if(mode == IO_MODE_NONBLOCKING) {
+        if (fcntl(SSL_get_fd(st->ssl), F_SETFL, flags | O_NONBLOCK) != 0) {
+            perror("fcntl");
+            // TODO: error
+        }
+    }
+}
+
+int net_ssl_poll(SSLStream *st, EventHandler *ev, int events, Event *cb) {
+    int fd = SSL_get_fd(st->ssl);
+    switch(events) {
+        default: return -1;
+        case IO_POLL_NONE: return ev_remove_poll(ev, fd);
+        case IO_POLL_IN: return ev_pollin(ev, fd, cb);
+        case IO_POLL_OUT: return ev_pollout(ev, fd, cb);
+        case IO_POLL_IN | IO_POLL_OUT: return -1; // TODO: implement
+    }
+}
 
 /* -------------------- public nsapi network functions -------------------- */
 
@@ -348,6 +422,9 @@
     ssize_t r = ((IOStream*)fd)->read(fd, buf, nbytes);
     if(r == 0) {
         return IO_EOF;
+    } else if(r < 0) {
+        ((IOStream*)fd)->io_errno = errno;
+        return IO_ERROR;
     }
     return r;
 }
@@ -355,6 +432,7 @@
 ssize_t net_write(SYS_NETFD fd, void *buf, size_t nbytes) {
     ssize_t r = ((IOStream*)fd)->write(fd, buf, nbytes);
     if(r < 0) {
+        ((IOStream*)fd)->io_errno = errno;
         return IO_ERROR;
     }  
     return r;
@@ -363,6 +441,7 @@
 ssize_t net_writev(SYS_NETFD fd, struct iovec *iovec, int iovcnt) {
     ssize_t r = ((IOStream*)fd)->writev(fd, iovec, iovcnt);
     if(r < 0) {
+        ((IOStream*)fd)->io_errno = errno;
         return IO_ERROR;
     }
     return r;
@@ -375,6 +454,9 @@
     ssize_t r = net_write(fd, buf.ptr, buf.length);
     free(buf.ptr);
     va_end(arg);
+    if(r < 0) {
+        ((IOStream*)fd)->io_errno = errno;
+    }
     return r;
 }
 
@@ -383,14 +465,15 @@
     if(out->sendfile && sfd->fd && sfd->fd->fd != -1) {
         ssize_t r = out->sendfile(fd, sfd);
         if(r < 0) {
+            out->io_errno = errno;
             return IO_ERROR;
         }
+        return r;
     } else {
         // stream/file does not support sendfile
         // do regular copy
         return net_fallback_sendfile(out, sfd);
     }
-    return IO_ERROR;
 }
 
 // private
@@ -418,12 +501,14 @@
         hlen -= r;
         if(r <= 0) {
             free(buf);
+            fd->io_errno = errno;
             return IO_ERROR;
         }
     }
 
     if(system_lseek(sfd->fd, sfd->offset, SEEK_SET) == -1) {
         free(buf);
+        fd->io_errno = errno;
         return IO_ERROR;
     }
 
@@ -442,6 +527,7 @@
     }
     free(buf);
     if(length > 0) {
+        fd->io_errno = errno;
         return IO_ERROR;
     }
 
@@ -450,6 +536,7 @@
         trailer += r;
         tlen -= r;
         if(r <= 0) {
+            fd->io_errno = errno;
             return IO_ERROR;
         }
     }
@@ -466,6 +553,17 @@
     ((IOStream*)fd)->close(fd);
 }
 
+int net_setnonblock(SYS_NETFD fd, int nonblock) {
+    ((IOStream*)fd)->setmode(
+            fd,
+            nonblock ? IO_MODE_NONBLOCKING : IO_MODE_BLOCKING);
+    return 0;
+}
+
+int net_errno(SYS_NETFD fd) {
+    return ((IOStream*)fd)->io_errno;
+}
+
 // private
 void net_finish(SYS_NETFD fd) {
     ((IOStream*)fd)->finish(fd);
--- a/src/server/util/io.h	Sat Feb 04 16:42:11 2017 +0100
+++ b/src/server/util/io.h	Sat Feb 18 13:27:25 2017 +0100
@@ -48,6 +48,13 @@
 #define SYS_SOCKET SOCKET
 #endif
 
+#define IO_MODE_BLOCKING    0
+#define IO_MODE_NONBLOCKING 1
+    
+#define IO_POLL_NONE        0
+#define IO_POLL_IN          1
+#define IO_POLL_OUT         2
+    
 typedef struct IOStream     IOStream;
 typedef struct SysStream    SysStream;
 typedef struct HttpStream   HttpStream;
@@ -58,6 +65,8 @@
 typedef ssize_t(*io_sendfile_f)(IOStream *, sendfiledata *);
 typedef void(*io_close_f)(IOStream *);
 typedef void(*io_finish_f)(IOStream *);
+typedef void(*io_setmode_f)(IOStream *, int);
+typedef int(*io_poll_f)(IOStream *, EventHandler *, int, Event *);
 
 struct IOStream {
     io_write_f    write;
@@ -66,6 +75,9 @@
     io_sendfile_f sendfile;
     io_close_f    close;
     io_finish_f   finish;
+    io_setmode_f  setmode;
+    io_poll_f     poll;
+    int           io_errno;
 };
 
 struct SysStream {
@@ -102,6 +114,8 @@
 ssize_t net_sys_read(SysStream *st, void *buf, size_t nbytes);
 ssize_t net_sys_sendfile(SysStream *st, sendfiledata *sfd);
 void net_sys_close(SysStream *st);
+void net_sys_setmode(SysStream *st, int mode);
+int net_sys_poll(SysStream *st, EventHandler *ev, int events, Event *cb);
 
 /* http stream */
 IOStream* httpstream_new(pool_handle_t *pool, IOStream *fd);
@@ -112,6 +126,8 @@
 ssize_t net_http_sendfile(HttpStream *st, sendfiledata *sfd);
 void    net_http_close(HttpStream *st);
 void    net_http_finish(HttpStream *st);
+void    net_http_setmode(HttpStream *st, int mode);
+int     net_http_poll(HttpStream *st, EventHandler *ev, int events, Event *cb);
 
 /* ssl stream */
 IOStream* sslstream_new(pool_handle_t *pool, SSL *ssl);
@@ -121,6 +137,8 @@
 ssize_t net_ssl_read(SSLStream *st, void *buf, size_t nbytes);
 void    net_ssl_close(SSLStream *st);
 void    net_ssl_finish(SSLStream *st);
+void    net_ssl_setmode(SSLStream *st, int mode);
+int     net_ssl_poll(SSLStream *st, EventHandler *ev, int events, Event *cb);
 
 /* net_ functions */
 ssize_t net_fallback_sendfile(IOStream *fd, sendfiledata *sfd);

mercurial