Sat, 18 Feb 2017 13:27:25 +0100
adds public aio and poll api and asynchronous send_range function
aio and poll api is only implemented on solaris yet
send_file saf uses send_range_aio for single range requests
--- a/src/server/daemon/event.h Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/daemon/event.h Sat Feb 18 13:27:25 2017 +0100 @@ -62,10 +62,12 @@ EVHandler* evhandler_create(EventHandlerConfig *cfg); int ev_pollin(EventHandler *h, int fd, Event *event); +int ev_pollout(EventHandler *h, int fd, Event *event); +int ev_remove_poll(EventHandler *h, int fd); +int ev_send(EventHandler *h, Event *event); -int ev_pollout(EventHandler *h, int fd, Event *event); - -int evt_send(EventHandler *h, Event *event); +int ev_aioread(int fd, aiocb_s *cb); +int ev_aiowrite(int fd, aiocb_s *cb); #ifdef __cplusplus
--- a/src/server/daemon/event_solaris.c Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/daemon/event_solaris.c Sat Feb 18 13:27:25 2017 +0100 @@ -30,6 +30,8 @@ #include <stdlib.h> #include <atomic.h> +#include "../util/io.h" + #include "event_solaris.h" EVHandler* evhandler_create(EventHandlerConfig *cfg) { @@ -77,26 +79,41 @@ for(int i=0;i<nev;i++) { Event *event = events[i].portev_user; - if(event->fn) { - if(event->fn(ev, event)) { - /* - * on solaris we have to reassociate the fd after - * each event - * we do this if the event function returns 1 - */ - if(port_associate( - ev->port, - PORT_SOURCE_FD, - (uintptr_t)event->object, - ev_convert2sys_events(event->events), - event)) - { - perror("port_associate"); - } - } else if(event->finish) { - event->finish(ev, event); + if(events[i].portev_source == PORT_SOURCE_AIO) { + aiocb_t *aiocb = (aiocb_t*)events[i].portev_object; + if(event) { + aiocb_s *aio = (aiocb_s*)event->object; + aio->result = aiocb->aio_resultp.aio_return; + aio->result_errno = aiocb->aio_resultp.aio_errno; + if(event->fn) { + if(!event->fn(ev, event) && event->finish) { + event->finish(ev, event); + } + } } - } + free(aiocb); + } else { + if(event->fn) { + if(event->fn(ev, event)) { + /* + * on solaris we have to reassociate the fd after + * each event + * we do this if the event function returns 1 + */ + if(port_associate( + ev->port, + PORT_SOURCE_FD, + (uintptr_t)event->object, + ev_convert2sys_events(event->events), + event)) + { + perror("port_associate"); + } + } else if(event->finish) { + event->finish(ev, event); + } + } + } } } } @@ -135,8 +152,67 @@ event); } -int evt_send(EventHandler *h, Event *event) { +int ev_remove_poll(EventHandler *h, int fd) { + return port_dissociate(h->port, PORT_SOURCE_FD, (uintptr_t)fd); +} + +int ev_send(EventHandler *h, Event *event) { event->object = 0; event->events = 0; return port_send(h->port, 0, event); } + +static int ev_aio(int fd, aiocb_s *cb, WSBool read) { + EventHandler *ev = cb->evhandler; + if(!ev) { + return -1; + } + + aiocb_t *aiocb = malloc(sizeof(aiocb_t)); + if(!aiocb) { + return -1; + } + ZERO(aiocb, sizeof(aiocb_t)); + + aiocb->aio_fildes = fd; + aiocb->aio_buf = cb->buf; + aiocb->aio_nbytes = cb->nbytes; + aiocb->aio_offset = cb->offset; + + port_notify_t *portnotify = malloc(sizeof(port_notify_t)); + if(!portnotify) { + free(aiocb); + return -1; + } + portnotify->portnfy_port = ev->port; + portnotify->portnfy_user = cb->event; + aiocb->aio_sigevent.sigev_notify = SIGEV_PORT; + aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify; + + if(read) { + return aio_read(aiocb); + } else { + return aio_write(aiocb); + } +} + +int ev_aioread(int fd, aiocb_s *cb) { + return ev_aio(fd, cb, TRUE); +} + +int ev_aiowrite(int fd, aiocb_s *cb) { + return ev_aio(fd, cb, FALSE); +} + + +int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); +} + +int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); +} + +int event_removepoll(EventHandler *ev, SYS_NETFD fd) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); +}
--- a/src/server/daemon/httprequest.h Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/daemon/httprequest.h Sat Feb 18 13:27:25 2017 +0100 @@ -105,7 +105,7 @@ NSAPIRequest *rq, threadpool_t *pool); -void nsapi_function_return(Session *sn, Request *rq, int ret); +//void nsapi_function_return(Session *sn, Request *rq, int ret); void nsapi_change_threadpool( NSAPISession *sn,
--- a/src/server/daemon/vfs.c Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/daemon/vfs.c Sat Feb 18 13:27:25 2017 +0100 @@ -32,12 +32,14 @@ #include <stdlib.h> #include <unistd.h> #include <sys/types.h> +#include <aio.h> #include <ucx/map.h> #include "../util/pool.h" #include "acl.h" #include "vfs.h" +#include "event.h" #define VFS_MALLOC(pool, size) pool ? pool_malloc(pool, size) : malloc(size) #define VFS_FREE(pool, ptr) pool ? pool_free(pool, ptr) : free(ptr) @@ -58,7 +60,9 @@ sys_file_read, sys_file_write, sys_file_seek, - sys_file_close + sys_file_close, + sys_file_aioread, + sys_file_aiowrite }; static VFS_DIRIO sys_dir_io = { @@ -497,6 +501,19 @@ close(fd->fd); } +int sys_file_aioread(aiocb_s *aiocb) { + WS_ASSERT(aiocb->buf); + WS_ASSERT(aiocb->nbytes > 0); + return ev_aioread(aiocb->filedes->fd, aiocb); +} + +int sys_file_aiowrite(aiocb_s *aiocb) { + WS_ASSERT(aiocb->buf); + WS_ASSERT(aiocb->nbytes > 0); + return ev_aiowrite(aiocb->filedes->fd, aiocb); +} + + int sys_dir_read(VFS_DIR dir, VFS_ENTRY *entry, int getstat) { SysVFSDir *dirdata = dir->data; struct dirent *result = NULL; @@ -578,3 +595,35 @@ vfs_close(fd); return 0; } + +// AIO API + +NSAPI_PUBLIC int system_aio_read(aiocb_s *aiocb) { + if(!aiocb->event || !aiocb->evhandler) { + return -1; + } + + SYS_FILE file = aiocb->filedes; + aiocb->event->object = (intptr_t)aiocb; + if(file->io->opt_aioread) { + return file->io->opt_aioread(aiocb); + } else { + // TODO: implement + return -1; + } +} + +NSAPI_PUBLIC int system_aio_write(aiocb_s *aiocb) { + if(!aiocb->event || !aiocb->evhandler) { + return -1; + } + + SYS_FILE file = aiocb->filedes; + aiocb->event->object = (intptr_t)aiocb; + if(file->io->opt_aiowrite) { + return file->io->opt_aiowrite(aiocb); + } else { + // TODO: implement + return -1; + } +}
--- a/src/server/daemon/vfs.h Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/daemon/vfs.h Sat Feb 18 13:27:25 2017 +0100 @@ -62,6 +62,8 @@ ssize_t sys_file_write(SYS_FILE fd, const void *buf, size_t nbyte); off_t sys_file_seek(SYS_FILE fd, off_t offset, int whence); void sys_file_close(SYS_FILE fd); +int sys_file_aioread(aiocb_s *aiocb); +int sys_file_aiowrite(aiocb_s *aiocb); int sys_dir_read(VFS_DIR dir, VFS_ENTRY *entry, int getstat); void sys_dir_close(VFS_DIR dir);
--- a/src/server/public/nsapi.h Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/public/nsapi.h Sat Feb 18 13:27:25 2017 +0100 @@ -1138,7 +1138,19 @@ #define ISMOPTIONS(r) ((r)->method_num == METHOD_OPTIONS) -// new type +// new types +typedef struct aiocb_s { + SYS_FILE filedes; + void *buf; + size_t nbytes; + off_t offset; + ssize_t result; + int result_errno; + Event *event; + EventHandler *evhandler; +} aiocb_s; + + typedef struct _thread_pool threadpool_t; typedef struct _threadpool_job threadpool_job; typedef void*(*job_callback_f)(void *data); @@ -1319,6 +1331,8 @@ // NSAPI extension ssize_t net_printf(SYS_NETFD fd, char *format, ...); +int net_setnonblock(SYS_NETFD fd, int nonblock); +int net_errno(SYS_NETFD fd); NSAPI_PUBLIC pb_param *INTparam_create(const char *name, const char *value); @@ -1527,6 +1541,9 @@ NSAPI_PUBLIC off_t system_lseek(SYS_FILE fd, off_t offset, int whence); NSAPI_PUBLIC int system_fclose(SYS_FILE fd); +NSAPI_PUBLIC int system_aio_read(aiocb_s *aiocb); +NSAPI_PUBLIC int system_aio_write(aiocb_s *aiocb); + int log_ereport(int degree, const char *format, ...); int log_ereport_v(int degree, const char *format, va_list args); @@ -1542,6 +1559,7 @@ NSAPI_PUBLIC pblock* util_parse_param(pool_handle_t *pool, char *query); #define util_parse_param util_parse_param +void nsapi_function_return(Session *sn, Request *rq, int ret); // threadpool threadpool_t* threadpool_new(int min, int max); @@ -1549,12 +1567,9 @@ threadpool_job* threadpool_get_job(threadpool_t *pool); void threadpool_run(threadpool_t *pool, job_callback_f func, void *data); -int ev_pollin(EventHandler *h, int fd, Event *event); - -int ev_pollout(EventHandler *h, int fd, Event *event); - -int evt_send(EventHandler *h, Event *event); - +int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event); +int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event); +int event_removepoll(EventHandler *ev, SYS_NETFD fd); // assert void ws_log_assert(const char *file, const char *func, int line);
--- a/src/server/public/vfs.h Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/public/vfs.h Sat Feb 18 13:27:25 2017 +0100 @@ -94,6 +94,8 @@ ssize_t (*write)(SYS_FILE fd, const void *buf, size_t nbyte); off_t (*seek)(SYS_FILE fd, off_t offset, int whence); void (*close)(SYS_FILE fd); + int (*opt_aioread)(aiocb_s *aiocb); + int (*opt_aiowrite)(aiocb_s *aiocb); }; struct VFS_DIRIO {
--- a/src/server/safs/service.c Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/safs/service.c Sat Feb 18 13:27:25 2017 +0100 @@ -277,6 +277,213 @@ return 0; } + +static void send_range_cleanup(AsyncSendRange *asr) { + WSBool error = asr->error; + Session *sn = asr->sn; + Request *rq = asr->rq; + + pool_handle_t *pool = asr->sn->pool; + vfs_close(asr->in); + pool_free(pool, asr->aio->buf); + pool_free(pool, asr->aio); + pool_free(pool, asr->readev); + pool_free(pool, asr->writeev); + pool_free(pool, asr); + + int ret = REQ_PROCEED; + if(error) { + rq->rq_attr.keep_alive = 0; + ret = REQ_ABORTED; + } + // return to nsapi loop + nsapi_function_return(sn, rq, ret); +} + +static int send_buf( + SYS_NETFD out, + char *restrict buf, + size_t len, + size_t *restrict pos) +{ + while(*pos < len) { + ssize_t w = net_write(out, buf + *pos, len - *pos); + if(w <= 0) { + return -1; + } + *pos += w; + } + return 0; +} + +static int send_bytes(AsyncSendRange *asr, WSBool *completed) { + *completed = FALSE; + if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) { + if(net_errno(asr->out) == EAGAIN) { + return 0; + } else { + asr->error = TRUE; + return 1; + } + } + + if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) { + if(net_errno(asr->out) == EAGAIN) { + return 0; + } else { + asr->error = TRUE; + return 1; + } + } + + if(!asr->read_complete) { + // write completed => new asynchronous read + asr->aio->offset += asr->aio->result; + if(system_aio_read(asr->aio)) { + asr->error = TRUE; + return 1; + } + } + *completed = TRUE; + return 0; +} + +static int send_range_readevent(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + asr->read_inprogress = FALSE; + asr->wpos = 0; + if(asr->error) { + return 0; + } + + int ret = 1; + if(asr->aio->result == 0) { + asr->read_complete = TRUE; + ret = 0; + } + + WSBool completed; + if(send_bytes(asr, &completed)) { + return 0; + } + if(!completed && !asr->write_inprogress) { + asr->write_inprogress = TRUE; + if(event_pollout(ev, asr->out, asr->writeev)) { + asr->error = TRUE; + return 0; + } + } + + return ret; +} + +static int send_range_writeevent(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(asr->error) { + return 0; + } + + WSBool completed; + if(send_bytes(asr, &completed)) { + return 0; + } + + if(completed && asr->read_complete) { + // everything completed + return 0; + } + + return 1; +} + +static int send_range_aio_finish(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(!asr->write_inprogress) { + send_range_cleanup(asr); + } + asr->read_inprogress = FALSE; + return 0; +} + +static int send_range_poll_finish(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(!asr->read_inprogress) { + send_range_cleanup(asr); + } + asr->write_inprogress = FALSE; + return 0; +} + +static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) { + net_setnonblock(sn->csd, TRUE); + + // try to send the header + ssize_t hw = net_write(sn->csd, header, headerlen); + if(hw < 0) { + if(net_errno(sn->csd) == EAGAIN) { + hw = 0; + } else { + return REQ_ABORTED; + } + } + + AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange)); + asr->sn = sn; + asr->rq = rq; + asr->in = fd; + asr->out = sn->csd; + asr->offset = offset; + asr->length = length; + asr->pos = offset; + asr->read_complete = FALSE; + asr->read_inprogress = FALSE; + asr->write_inprogress = FALSE; + asr->error = FALSE; + if(hw == headerlen) { + asr->header = NULL; + asr->headerlen = 0; + asr->headerpos = 0; + } else { + asr->header = header; + asr->headerlen = headerlen; + asr->headerpos = hw; + } + + Event *readev = pool_malloc(sn->pool, sizeof(Event)); + ZERO(readev, sizeof(Event)); + readev->cookie = asr; + readev->fn = send_range_readevent; + readev->finish = send_range_aio_finish; + + Event *writeev = pool_malloc(sn->pool, sizeof(Event)); + ZERO(writeev, sizeof(Event)); + writeev->cookie = asr; + writeev->fn = send_range_writeevent; + writeev->finish = send_range_poll_finish; + + asr->readev = readev; + asr->writeev = writeev; + + aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s)); + aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE); + aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length; + aio->filedes = fd; + aio->offset = offset; + aio->evhandler = sn->ev; + aio->event = readev; + + asr->aio = aio; + asr->wpos = 0; + + if(system_aio_read(aio)) { + send_range_cleanup(asr); + return REQ_ABORTED; + } + asr->read_inprogress = TRUE; + + return REQ_PROCESSING; +} + struct multi_range_elm { sstr_t header; off_t offset; @@ -420,9 +627,15 @@ // send response header http_start_response(sn, rq); // send content + int ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0); + if(ret == REQ_PROCESSING) { + return ret; + } +/* if(send_range(sn, fd, offset, length, NULL, 0)) { // TODO: error } +*/ } else { if(send_multi_range(sn, rq, fd, s.st_size, range)) { // TODO: error
--- a/src/server/safs/service.h Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/safs/service.h Sat Feb 18 13:27:25 2017 +0100 @@ -30,11 +30,14 @@ #define SERVICE_H #include "../public/nsapi.h" +#include "../util/systems.h" #ifdef __cplusplus extern "C" { #endif +#define AIO_BUF_SIZE 16384 + typedef struct HttpRange HttpRange; struct HttpRange { @@ -42,7 +45,27 @@ off_t end; HttpRange *next; }; - + +typedef struct AsyncSendRange { + Session *sn; + Request *rq; + SYS_FILE in; + SYS_NETFD out; + off_t offset; + off_t length; + off_t pos; + char *header; + int headerlen; + size_t headerpos; + Event *readev; + Event *writeev; + aiocb_s *aio; + size_t wpos; + WSBool read_complete; + WSBool read_inprogress; + WSBool write_inprogress; + WSBool error; +} AsyncSendRange; int send_file(pblock *pb, Session *sn, Request *rq);
--- a/src/server/util/io.c Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/util/io.c Sat Feb 18 13:27:25 2017 +0100 @@ -64,6 +64,7 @@ #include "../daemon/vfs.h" #include "io.h" #include "pool.h" +#include "../daemon/event.h" #include "ucx/utils.h" IOStream native_io_funcs = { @@ -72,7 +73,10 @@ (io_read_f)net_sys_read, (io_sendfile_f)NET_SYS_SENDFILE, (io_close_f)net_sys_close, - NULL + NULL, + (io_setmode_f)net_sys_setmode, + (io_poll_f)net_sys_poll, + 0 }; IOStream http_io_funcs = { @@ -81,7 +85,10 @@ (io_read_f)net_http_read, (io_sendfile_f)net_http_sendfile, (io_close_f)net_http_close, - (io_finish_f)net_http_finish + (io_finish_f)net_http_finish, + (io_setmode_f)net_http_setmode, + (io_poll_f)net_http_poll, + 0 }; IOStream ssl_io_funcs = { @@ -90,7 +97,10 @@ (io_read_f)net_ssl_read, NULL, (io_close_f)net_ssl_close, - (io_finish_f)net_ssl_finish + (io_finish_f)net_ssl_finish, + (io_setmode_f)net_ssl_setmode, + (io_poll_f)net_ssl_poll, + 0 }; @@ -170,6 +180,34 @@ close(st->fd); } +void net_sys_setmode(SysStream *st, int mode) { + int flags; + if (-1 == (flags = fcntl(st->fd, F_GETFL, 0))) { + flags = 0; + } + if(mode == IO_MODE_BLOCKING) { + if (fcntl(st->fd, F_SETFL, flags & ~O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } else if(mode == IO_MODE_NONBLOCKING) { + if (fcntl(st->fd, F_SETFL, flags | O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } +} + +int net_sys_poll(SysStream *st, EventHandler *ev, int events, Event *cb) { + switch(events) { + default: return -1; + case IO_POLL_NONE: return ev_remove_poll(ev, st->fd); + case IO_POLL_IN: return ev_pollin(ev, st->fd, cb); + case IO_POLL_OUT: return ev_pollout(ev, st->fd, cb); + case IO_POLL_IN | IO_POLL_OUT: return -1; // TODO: implement + } +} + #elif defined(XP_WIN32) ssize_t net_sys_write(SysStream *st, void *buf, size_t nbytes) { @@ -287,6 +325,14 @@ } } +void net_http_setmode(HttpStream *st, int mode) { + st->fd->setmode(st->fd, mode); +} + +int net_http_poll(HttpStream *st, EventHandler *ev, int events, Event *cb) { + return st->fd->poll(st->fd, ev, events, cb); +} + /* * SSLStream implementation @@ -341,6 +387,34 @@ } +void net_ssl_setmode(SSLStream *st, int mode) { + int flags; + if (-1 == (flags = fcntl(SSL_get_fd(st->ssl), F_GETFL, 0))) { + flags = 0; + } + if(mode == IO_MODE_BLOCKING) { + if (fcntl(SSL_get_fd(st->ssl), F_SETFL, flags & ~O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } else if(mode == IO_MODE_NONBLOCKING) { + if (fcntl(SSL_get_fd(st->ssl), F_SETFL, flags | O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } +} + +int net_ssl_poll(SSLStream *st, EventHandler *ev, int events, Event *cb) { + int fd = SSL_get_fd(st->ssl); + switch(events) { + default: return -1; + case IO_POLL_NONE: return ev_remove_poll(ev, fd); + case IO_POLL_IN: return ev_pollin(ev, fd, cb); + case IO_POLL_OUT: return ev_pollout(ev, fd, cb); + case IO_POLL_IN | IO_POLL_OUT: return -1; // TODO: implement + } +} /* -------------------- public nsapi network functions -------------------- */ @@ -348,6 +422,9 @@ ssize_t r = ((IOStream*)fd)->read(fd, buf, nbytes); if(r == 0) { return IO_EOF; + } else if(r < 0) { + ((IOStream*)fd)->io_errno = errno; + return IO_ERROR; } return r; } @@ -355,6 +432,7 @@ ssize_t net_write(SYS_NETFD fd, void *buf, size_t nbytes) { ssize_t r = ((IOStream*)fd)->write(fd, buf, nbytes); if(r < 0) { + ((IOStream*)fd)->io_errno = errno; return IO_ERROR; } return r; @@ -363,6 +441,7 @@ ssize_t net_writev(SYS_NETFD fd, struct iovec *iovec, int iovcnt) { ssize_t r = ((IOStream*)fd)->writev(fd, iovec, iovcnt); if(r < 0) { + ((IOStream*)fd)->io_errno = errno; return IO_ERROR; } return r; @@ -375,6 +454,9 @@ ssize_t r = net_write(fd, buf.ptr, buf.length); free(buf.ptr); va_end(arg); + if(r < 0) { + ((IOStream*)fd)->io_errno = errno; + } return r; } @@ -383,14 +465,15 @@ if(out->sendfile && sfd->fd && sfd->fd->fd != -1) { ssize_t r = out->sendfile(fd, sfd); if(r < 0) { + out->io_errno = errno; return IO_ERROR; } + return r; } else { // stream/file does not support sendfile // do regular copy return net_fallback_sendfile(out, sfd); } - return IO_ERROR; } // private @@ -418,12 +501,14 @@ hlen -= r; if(r <= 0) { free(buf); + fd->io_errno = errno; return IO_ERROR; } } if(system_lseek(sfd->fd, sfd->offset, SEEK_SET) == -1) { free(buf); + fd->io_errno = errno; return IO_ERROR; } @@ -442,6 +527,7 @@ } free(buf); if(length > 0) { + fd->io_errno = errno; return IO_ERROR; } @@ -450,6 +536,7 @@ trailer += r; tlen -= r; if(r <= 0) { + fd->io_errno = errno; return IO_ERROR; } } @@ -466,6 +553,17 @@ ((IOStream*)fd)->close(fd); } +int net_setnonblock(SYS_NETFD fd, int nonblock) { + ((IOStream*)fd)->setmode( + fd, + nonblock ? IO_MODE_NONBLOCKING : IO_MODE_BLOCKING); + return 0; +} + +int net_errno(SYS_NETFD fd) { + return ((IOStream*)fd)->io_errno; +} + // private void net_finish(SYS_NETFD fd) { ((IOStream*)fd)->finish(fd);
--- a/src/server/util/io.h Sat Feb 04 16:42:11 2017 +0100 +++ b/src/server/util/io.h Sat Feb 18 13:27:25 2017 +0100 @@ -48,6 +48,13 @@ #define SYS_SOCKET SOCKET #endif +#define IO_MODE_BLOCKING 0 +#define IO_MODE_NONBLOCKING 1 + +#define IO_POLL_NONE 0 +#define IO_POLL_IN 1 +#define IO_POLL_OUT 2 + typedef struct IOStream IOStream; typedef struct SysStream SysStream; typedef struct HttpStream HttpStream; @@ -58,6 +65,8 @@ typedef ssize_t(*io_sendfile_f)(IOStream *, sendfiledata *); typedef void(*io_close_f)(IOStream *); typedef void(*io_finish_f)(IOStream *); +typedef void(*io_setmode_f)(IOStream *, int); +typedef int(*io_poll_f)(IOStream *, EventHandler *, int, Event *); struct IOStream { io_write_f write; @@ -66,6 +75,9 @@ io_sendfile_f sendfile; io_close_f close; io_finish_f finish; + io_setmode_f setmode; + io_poll_f poll; + int io_errno; }; struct SysStream { @@ -102,6 +114,8 @@ ssize_t net_sys_read(SysStream *st, void *buf, size_t nbytes); ssize_t net_sys_sendfile(SysStream *st, sendfiledata *sfd); void net_sys_close(SysStream *st); +void net_sys_setmode(SysStream *st, int mode); +int net_sys_poll(SysStream *st, EventHandler *ev, int events, Event *cb); /* http stream */ IOStream* httpstream_new(pool_handle_t *pool, IOStream *fd); @@ -112,6 +126,8 @@ ssize_t net_http_sendfile(HttpStream *st, sendfiledata *sfd); void net_http_close(HttpStream *st); void net_http_finish(HttpStream *st); +void net_http_setmode(HttpStream *st, int mode); +int net_http_poll(HttpStream *st, EventHandler *ev, int events, Event *cb); /* ssl stream */ IOStream* sslstream_new(pool_handle_t *pool, SSL *ssl); @@ -121,6 +137,8 @@ ssize_t net_ssl_read(SSLStream *st, void *buf, size_t nbytes); void net_ssl_close(SSLStream *st); void net_ssl_finish(SSLStream *st); +void net_ssl_setmode(SSLStream *st, int mode); +int net_ssl_poll(SSLStream *st, EventHandler *ev, int events, Event *cb); /* net_ functions */ ssize_t net_fallback_sendfile(IOStream *fd, sendfiledata *sfd);