# HG changeset patch # User Olaf Wintermann # Date 1515866460 -3600 # Node ID aa8393527b1edf16d6373c696365c20e5432b1dc # Parent f33974f0dce0369c161914674f6c92b25d4e3fd8# Parent 6a145e13d933a1b1ef76e088411de65636b40107 merges aio into default branch diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event.c --- a/src/server/daemon/event.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event.c Sat Jan 13 19:01:00 2018 +0100 @@ -26,16 +26,17 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include +#include "../../ucx/map.h" +#include "../util/atomic.h" #include "event.h" UcxMap *event_handler_map = NULL; int numevhandlers = 0; -event_handler_t *default_event_handler = NULL; +EVHandler *default_event_handler = NULL; -event_handler_t *last_handler_c = NULL; +EVHandler *last_handler_c = NULL; int create_event_handler(EventHandlerConfig *cfg) { if(event_handler_map == NULL) { @@ -50,7 +51,7 @@ } /* create new handler */ - event_handler_t *e = evhandler_create(cfg->nthreads); + EVHandler *e = evhandler_create(cfg); if(e == NULL) { return 1; } @@ -95,10 +96,21 @@ } -event_handler_t* get_default_event_handler() { +EVHandler* get_default_event_handler() { return default_event_handler; } -event_handler_t* get_event_handler(char *name) { +EVHandler* get_event_handler(char *name) { return ucx_map_cstr_get(event_handler_map, name); } + +EventHandler* ev_instance(EVHandler *ev) { + int nev = ev->numins; + if(nev == 1) { + return ev->instances[0]; + } + + int ins = ev->current & nev; + ws_atomic_inc32(&ev->current); + return ev->instances[ins]; +} diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event.h --- a/src/server/daemon/event.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event.h Sat Jan 13 19:01:00 2018 +0100 @@ -36,26 +36,11 @@ extern "C" { #endif -#define EVENT_POLLIN 0x1 -#define EVENT_POLLOUT 0x2 - -typedef struct event_handler event_handler_t; -typedef struct event event_t; - -typedef int(*event_func)(event_handler_t*, event_t*); - -struct event { - pblock *pb; - Session *sn; - Request *rq; - event_func fn; - event_func finish; - intptr_t object; - int events; - int poll; - void *cookie; - int error; -}; +typedef struct EVHandler { + EventHandler **instances; + uint32_t numins; + uint32_t current; +} EVHandler; typedef struct event_handler_conf { sstr_t name; @@ -63,27 +48,26 @@ int isdefault; } EventHandlerConfig; -typedef struct event_handler_object { - event_handler_t *handler; - int nthreads; -} EventHandlerObject; - int create_event_handler(EventHandlerConfig *cfg); int check_event_handler_cfg(); -event_handler_t* get_default_event_handler(); +EVHandler* get_default_event_handler(); -event_handler_t* get_event_handler(char *name); +EVHandler* get_event_handler(char *name); + +EventHandler* ev_instance(EVHandler *ev); /* implementation in event_$platform */ -event_handler_t* evhandler_create(int numthreads); - -int ev_pollin(event_handler_t *h, int fd, event_t *event); +EVHandler* evhandler_create(EventHandlerConfig *cfg); -int ev_pollout(event_handler_t *h, int fd, event_t *event); +int ev_pollin(EventHandler *h, int fd, Event *event); +int ev_pollout(EventHandler *h, int fd, Event *event); +int ev_remove_poll(EventHandler *h, int fd); +int ev_send(EventHandler *h, Event *event); -int evt_send(event_handler_t *h, event_t *event); +int ev_aioread(int fd, aiocb_s *cb); +int ev_aiowrite(int fd, aiocb_s *cb); #ifdef __cplusplus diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event_bsd.c --- a/src/server/daemon/event_bsd.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event_bsd.c Sat Jan 13 19:01:00 2018 +0100 @@ -33,81 +33,63 @@ #include "event_bsd.h" -event_handler_t* evhandler_create(int numthreads) { - event_handler_t *ev = malloc(sizeof(event_handler_t)); - if(ev == NULL) { - return NULL; - } +EVHandler* evhandler_create(EventHandlerConfig *cfg) { + EVHandler *ev = malloc(sizeof(EVHandler)); + ev->current = 0; + ev->instances = calloc(cfg->nthreads, sizeof(void*)); + ev->numins = cfg->nthreads; - ev->ports = calloc(numthreads, sizeof(int)); - if(ev->ports == NULL) { - free(ev); - return NULL; - } - ev->nports = numthreads; - ev->lp = 0; - - /* create ports event threads */ - for(int i=0;iports[i] = port_create(); - ev->ports[i] = kqueue(); - if(ev->ports[i] == 0) { - free(ev->ports); - free(ev); + for(int i=0;inthreads;i++) { + EventHandler *handler = malloc(sizeof(EventHandler)); + ev->instances[i] = handler; + + handler->kqueue = kqueue(); + if(handler->kqueue == 0) { + // TODO: error return NULL; } - /* - * start a new handler thread - * the thread needs the event port and a pointer to the event handler - */ - ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); - if(conf == NULL) { - free(ev->ports); - free(ev); - return NULL; - } - conf->handler = ev; - conf->port = ev->ports[i]; - - systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); - /* TODO: error handling */ + SYS_THREAD t = systhread_start( + 0, + 0, + (thrstartfunc)ev_handle_events, + handler); + systhread_detach(t); } return ev; } -void ev_handle_events(ev_thr_conf_t *conf) { - event_handler_t *ev = conf->handler; - int kq = conf->port; - - free(conf); - + +void ev_handle_events(EventHandler *ev) { struct timespec timeout; timeout.tv_nsec = 0; timeout.tv_sec = 600; - struct kevent events[16]; + struct kevent events[64]; + struct kevent changes[64]; + int numchanges = 0; for(;;) { // wait for events - int nev; - nev = kevent(kq, NULL, 0, events, 16, &timeout); + int nev = kevent(ev->kqueue, changes, numchanges, events, 64, &timeout); if(nev == -1) { // TODO: check for error perror("kevent"); continue; } + numchanges = 0; for(int i=0;ifn) { - int ep = event->poll; + int ep = event->events; if(event->fn(ev, event)) { - // TODO: reassociate? - // TODO: check ep and event->poll + if(event->events != ep) { + changes[numchanges++].filter = ev_convert2sys_events(ep); + } } else if(event->finish) { + changes[numchanges++].filter = ev_convert2sys_events(ep); event->finish(ev, event); } } @@ -115,33 +97,31 @@ } } -/* returns a event handler port */ -int ev_get_port(event_handler_t *h) { - int nps = h->nports; - if(nps == 1) { - return h->ports[0]; +int ev_convert2sys_events(int events) { + int e = 0; + if((events & EVENT_POLLIN) == EVENT_POLLIN) { + e |= EVFILT_READ; } - - int cp = h->lp % nps; - ws_atomic_inc32(&h->lp); - - return h->ports[cp]; + if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { + e |= EVFILT_WRITE; + } + return e; } -int ev_pollin(event_handler_t *h, int fd, event_t *event) { - event->poll = EVENT_POLLIN; +int ev_pollin(EventHandler *h, int fd, Event *event) { + event->events = EVENT_POLLIN; struct kevent kev; EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, event); - return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL); + return kevent(h->kqueue, &kev, 1, NULL, 0, NULL); } -int ev_pollout(event_handler_t *h, int fd, event_t *event) { - event->poll = EVENT_POLLOUT; +int ev_pollout(EventHandler *h, int fd, Event *event) { + event->events = EVENT_POLLOUT; struct kevent kev; EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, event); - return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL); + return kevent(h->kqueue, &kev, 1, NULL, 0, NULL); } -int evt_send(event_handler_t *h, event_t *event) { +int event_send(EventHandler *h, Event *event) { return 0; } diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event_bsd.h --- a/src/server/daemon/event_bsd.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event_bsd.h Sat Jan 13 19:01:00 2018 +0100 @@ -39,20 +39,13 @@ extern "C" { #endif -struct event_handler { - int *ports; - uint32_t nports; - uint32_t lp; +struct EventHandler { + int kqueue; }; -typedef struct ev_thr_conf { - event_handler_t *handler; - int port; -} ev_thr_conf_t; +void ev_handle_events(EventHandler *ev); -void ev_handle_events(ev_thr_conf_t *conf); - -int ev_get_port(event_handler_t *h); +int ev_convert2sys_events(int events); #ifdef __cplusplus } diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event_linux.c --- a/src/server/daemon/event_linux.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event_linux.c Sat Jan 13 19:01:00 2018 +0100 @@ -30,65 +30,61 @@ #include #include #include +#include #include "../util/systhr.h" #include "../util/atomic.h" +#include "../util/io.h" #include "event.h" #include "event_linux.h" -event_handler_t* evhandler_create(int numthreads) { - event_handler_t *ev = malloc(sizeof(event_handler_t)); - if(ev == NULL) { - return NULL; - } +EVHandler* evhandler_create(EventHandlerConfig *cfg) { + EVHandler *ev = malloc(sizeof(EVHandler)); + ev->current = 0; + ev->instances = calloc(cfg->nthreads, sizeof(void*)); + ev->numins = cfg->nthreads; - ev->ep = calloc(numthreads, sizeof(int)); - if(ev->ep == NULL) { - free(ev); - return NULL; - } - ev->nep = numthreads; - ev->lep = 0; - - /* create ports event threads */ - for(int i=0;iep[i] = epoll_create(64); - if(ev->ep[i] == 0) { - free(ev->ep); - free(ev); + for(int i=0;inthreads;i++) { + EventHandler *handler = malloc(sizeof(EventHandler)); + ev->instances[i] = handler; + + handler->ep = epoll_create(64); + if(handler->ep == 0) { + // TODO: error return NULL; } - /* - * start a new handler thread - * the thread needs the event port and a pointer to the event handler - */ - ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); - if(conf == NULL) { - free(ev->ep); - free(ev); + int eventpipe[2]; + if(pipe(eventpipe)) { return NULL; } - conf->handler = ev; - conf->ep = ev->ep[i]; + handler->eventin = eventpipe[0]; + handler->eventout = eventpipe[1]; - systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); - /* TODO: error handling */ + struct epoll_event epev; + epev.events = EPOLLIN | EPOLLET; // input event, edge triggered + epev.data.ptr = NULL; + if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) { + return NULL; + } + + SYS_THREAD t = systhread_start( + 0, + 0, + (thrstartfunc)ev_handle_events, + handler); + systhread_detach(t); } return ev; } -void ev_handle_events(ev_thr_conf_t *conf) { - event_handler_t *ev = conf->handler; - int ep = conf->ep; - - free(conf); +void ev_handle_events(EventHandler *ev) { + int ep = ev->ep; //port_event_t events[16]; struct epoll_event events[16]; @@ -102,9 +98,29 @@ } for(int i=0;ifn) { - int saved_ev = event->poll; + Event *event = events[i].data.ptr; + if(!event) { + char ebuf[sizeof(Event*)]; + int ebufpos = 0; + char *b = ebuf; + while(ebufpos < sizeof(Event*)) { + ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos); + if(r < 0) { + break; + } + ebufpos += r; + } + if(ebufpos == sizeof(Event*)) { + intptr_t *p = (intptr_t*)b; + *(&event) = (Event*)*p; + if(event->fn) { + if(!event->fn(ev, event) && event->finish) { + event->finish(ev, event); + } + } + } + } else if(event->fn) { + int saved_ev = event->events; if(!event->fn(ev, event)) { // event fn returned 0 -> remove event from epoll if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { @@ -121,21 +137,16 @@ event->finish(ev, event); } } else { - if(saved_ev != event->poll) { + if(saved_ev != event->events) { // event type changed struct epoll_event epev; epev.events = EPOLLET; epev.data.ptr = event; // adjust epoll events - if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { - epev.events |= EPOLLIN; - } - if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) { - epev.events |= EPOLLOUT; - } + epev.events = ev_convert2sys_events(event->events); - if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, NULL)) { + if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) { log_ereport( LOG_FAILURE, "epoll_wait failed: %s", @@ -148,40 +159,77 @@ } } -/* returns a event handler port */ -int ev_get_port(event_handler_t *h) { - int nps = h->nep; - if(nps == 1) { - return h->ep[0]; +int ev_convert2sys_events(int events) { + int e = EPOLLET; + if((events & EVENT_POLLIN) == EVENT_POLLIN) { + e |= EPOLLIN; } - - int cp = h->lep % nps; - ws_atomic_inc32(&h->lep); - - return h->ep[cp]; + if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { + e |= EPOLLOUT; + } + return e; } -int ev_pollin(event_handler_t *h, int fd, event_t *event) { +int ev_pollin(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; struct epoll_event epev; epev.events = EPOLLIN | EPOLLET; // input event, edge triggered epev.data.ptr = event; - return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); + return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); } -int ev_pollout(event_handler_t *h, int fd, event_t *event) { +int ev_pollout(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->poll = EVENT_POLLOUT; + event->events = EVENT_POLLOUT; struct epoll_event epev; epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered epev.data.ptr = event; - return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); + return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); +} + +int ev_remove_poll(EventHandler *h, int fd) { + return epoll_ctl(h->ep, EPOLL_CTL_DEL, fd, NULL); +} + +int event_send(EventHandler *h, Event *event) { + event->object = 0; + event->events = 0; + ssize_t r = write(h->eventout, &event, sizeof(Event*)); + if(r < sizeof(Event*)) { + log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno)); + } + return r > 0 ? 0 : 1; } -int evt_send(event_handler_t *h, event_t *event) { - event->object = 0; - // TODO: implement using threadpool or eventfd - fprintf(stderr, "Warning: evt_send not implemented\n"); - return 0; +// TODO: remove this fake aio +int ev_aioread(int fd, aiocb_s *cb) { + ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); + cb->result = result; + if(result < 0) { + cb->result_errno = errno; + } + return event_send(cb->evhandler, cb->event); } + +int ev_aiowrite(int fd, aiocb_s *cb) { + ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset); + cb->result = result; + if(result < 0) { + cb->result_errno = errno; + } + return event_send(cb->evhandler, cb->event); +} + + +int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); +} + +int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); +} + +int event_removepoll(EventHandler *ev, SYS_NETFD fd) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); +} diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event_linux.h --- a/src/server/daemon/event_linux.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event_linux.h Sat Jan 13 19:01:00 2018 +0100 @@ -36,21 +36,24 @@ extern "C" { #endif -struct event_handler { - int *ep; // epoll fds - uint32_t nep; // number of epoll fds - uint32_t lep; // last fd - // TODO: message queue/thread pool +struct EventHandler { + /* + * epoll fd + */ + int ep; + /* + * pipe read fd + */ + int eventin; + /* + * pipe write fd + */ + int eventout; }; -typedef struct ev_thr_conf { - event_handler_t *handler; - int ep; -} ev_thr_conf_t; +void ev_handle_events(EventHandler *ev); -void ev_handle_events(ev_thr_conf_t *conf); - -int ev_get_port(event_handler_t *h); +int ev_convert2sys_events(int events); #ifdef __cplusplus } diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event_solaris.c --- a/src/server/daemon/event_solaris.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event_solaris.c Sat Jan 13 19:01:00 2018 +0100 @@ -30,59 +30,39 @@ #include #include +#include "../util/io.h" + #include "event_solaris.h" -event_handler_t* evhandler_create(int numthreads) { - event_handler_t *ev = malloc(sizeof(event_handler_t)); - if(ev == NULL) { - return NULL; - } +EVHandler* evhandler_create(EventHandlerConfig *cfg) { + EVHandler *ev = malloc(sizeof(EVHandler)); + ev->current = 0; + ev->instances = calloc(cfg->nthreads, sizeof(void*)); + ev->numins = cfg->nthreads; - ev->ports = calloc(numthreads, sizeof(int)); - if(ev->ports == NULL) { - free(ev); - return NULL; - } - ev->nports = numthreads; - ev->lp = 0; - - /* create ports event threads */ - for(int i=0;iports[i] = port_create(); - if(ev->ports[i] == 0) { - free(ev->ports); - free(ev); + for(int i=0;inthreads;i++) { + EventHandler *handler = malloc(sizeof(EventHandler)); + ev->instances[i] = handler; + + handler->port = port_create(); + if(handler->port == 0) { + // TODO: error return NULL; } - /* - * start a new handler thread - * the thread needs the event port and a pointer to the event handler - */ - ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); - if(conf == NULL) { - free(ev->ports); - free(ev); - return NULL; - } - conf->handler = ev; - conf->port = ev->ports[i]; - - systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); - // TODO: error handling + SYS_THREAD t = systhread_start( + 0, + 0, + (thrstartfunc)ev_handle_events, + handler); + systhread_detach(t); } return ev; } -void ev_handle_events(ev_thr_conf_t *conf) { - event_handler_t *ev = conf->handler; - int port = conf->port; - - free(conf); - - port_event_t events[16]; +void ev_handle_events(EventHandler *ev) { + port_event_t events[64]; struct timespec timeout; timeout.tv_nsec = 0; timeout.tv_sec = 600; @@ -90,7 +70,7 @@ for(;;) { // wait for events uint_t nev = 1; - int ret = port_getn(port, events, 16, &nev, &timeout); + int ret = port_getn(ev->port, events, 64, &nev, &timeout); if(ret == -1) { // TODO: check for error perror("port_getn"); @@ -98,85 +78,141 @@ } for(int i=0;ifn) { - int saved_ev = event->poll; - if(event->fn(ev, event)) { - /* - * on solaris we have to reassociate the fd after - * each event - * we do this if the event function returns 1 - */ - - if(event->poll != saved_ev) { - // event type changed - int ne = 0; - if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { - ne |= POLLIN; - } - if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) { - ne |= POLLOUT; + Event *event = events[i].portev_user; + if(events[i].portev_source == PORT_SOURCE_AIO) { + aiocb_t *aiocb = (aiocb_t*)events[i].portev_object; + if(event) { + aiocb_s *aio = (aiocb_s*)event->object; + aio->result = aiocb->aio_resultp.aio_return; + aio->result_errno = aiocb->aio_resultp.aio_errno; + if(event->fn) { + if(!event->fn(ev, event) && event->finish) { + event->finish(ev, event); } } - - if(ev_poll(ev, event)) { - perror("port_associate"); - } - } else if(event->finish) { - event->finish(ev, event); } - } + free(aiocb); + } else { + if(event->fn) { + if(event->fn(ev, event)) { + /* + * on solaris we have to reassociate the fd after + * each event + * we do this if the event function returns 1 + */ + if(port_associate( + ev->port, + PORT_SOURCE_FD, + (uintptr_t)event->object, + ev_convert2sys_events(event->events), + event)) + { + perror("port_associate"); + } + } else if(event->finish) { + event->finish(ev, event); + } + } + } } } } -// returns a event handler port -int ev_get_port(event_handler_t *h) { - int nps = h->nports; - if(nps == 1) { - return h->ports[0]; +int ev_convert2sys_events(int events) { + int e = 0; + if((events & EVENT_POLLIN) == EVENT_POLLIN) { + e |= POLLIN; } - - int cp = h->lp % nps; - atomic_inc_32(&h->lp); - - return h->ports[cp]; + if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { + e |= POLLOUT; + } + return e; } -int ev_pollin(event_handler_t *h, int fd, event_t *event) { + +int ev_pollin(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->events = POLLIN; - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; return port_associate( - ev_get_port(h), + h->port, PORT_SOURCE_FD, (uintptr_t)fd, POLLIN, event); } -int ev_pollout(event_handler_t *h, int fd, event_t *event) { +int ev_pollout(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->events = POLLOUT; - event->poll = EVENT_POLLOUT; + event->events = EVENT_POLLOUT; return port_associate( - ev_get_port(h), + h->port, PORT_SOURCE_FD, (uintptr_t)fd, POLLOUT, event); } -int ev_poll(event_handler_t *h, event_t *event) { - return port_associate( - ev_get_port(h), - PORT_SOURCE_FD, - event->object, - event->events, - event); +int ev_remove_poll(EventHandler *h, int fd) { + return port_dissociate(h->port, PORT_SOURCE_FD, (uintptr_t)fd); +} + +int event_send(EventHandler *h, Event *event) { + event->object = 0; + event->events = 0; + return port_send(h->port, 0, event); } -int evt_send(event_handler_t *h, event_t *event) { - event->object = 0; - return port_send(ev_get_port(h), 0, event); +static int ev_aio(int fd, aiocb_s *cb, WSBool read) { + EventHandler *ev = cb->evhandler; + if(!ev) { + return -1; + } + + aiocb_t *aiocb = malloc(sizeof(aiocb_t)); + if(!aiocb) { + return -1; + } + ZERO(aiocb, sizeof(aiocb_t)); + + aiocb->aio_fildes = fd; + aiocb->aio_buf = cb->buf; + aiocb->aio_nbytes = cb->nbytes; + aiocb->aio_offset = cb->offset; + + port_notify_t *portnotify = malloc(sizeof(port_notify_t)); + if(!portnotify) { + free(aiocb); + return -1; + } + portnotify->portnfy_port = ev->port; + portnotify->portnfy_user = cb->event; + aiocb->aio_sigevent.sigev_notify = SIGEV_PORT; + aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify; + + if(read) { + return aio_read(aiocb); + } else { + return aio_write(aiocb); + } } + +int ev_aioread(int fd, aiocb_s *cb) { + return ev_aio(fd, cb, TRUE); +} + +int ev_aiowrite(int fd, aiocb_s *cb) { + return ev_aio(fd, cb, FALSE); +} + + +int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); +} + +int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); +} + +int event_removepoll(EventHandler *ev, SYS_NETFD fd) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); +} diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/event_solaris.h --- a/src/server/daemon/event_solaris.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/event_solaris.h Sat Jan 13 19:01:00 2018 +0100 @@ -39,22 +39,13 @@ extern "C" { #endif -struct event_handler { - int *ports; - uint32_t nports; - uint32_t lp; +struct EventHandler { + int port; }; -typedef struct ev_thr_conf { - event_handler_t *handler; - int port; -} ev_thr_conf_t; +int ev_convert2sys_events(int events); -void ev_handle_events(ev_thr_conf_t *conf); - -int ev_get_port(event_handler_t *h); - -int ev_poll(event_handler_t *h, event_t *event); +void ev_handle_events(EventHandler *ev); #ifdef __cplusplus } diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/httplistener.c --- a/src/server/daemon/httplistener.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/httplistener.c Sat Jan 13 19:01:00 2018 +0100 @@ -403,33 +403,24 @@ conn->fd = clientfd; conn->listener = ls; if(ls->ssl) { + // SSL connections are always non-blocking + // set socket non blocking + int flags; + if((flags = fcntl(conn->fd, F_GETFL, 0)) == -1) { + flags = 0; + } + if(fcntl(conn->fd, F_SETFL, flags | O_NONBLOCK)) { + perror("Error: acceptor_thread: fcntl"); + // TODO: error + } + SSL *ssl = SSL_new(ls->ssl->sslctx); SSL_set_fd(ssl, clientfd); - int ssl_ar = SSL_accept(ssl); - if(ssl_ar <= 0) { - int error = SSL_get_error(ssl, ssl_ar); - char *errstr; - switch(error) { - default: errstr = "unknown"; break; - case SSL_ERROR_ZERO_RETURN: errstr = "SSL_ERROR_ZERO_RETURN"; break; - case SSL_ERROR_WANT_READ: errstr = "SSL_ERROR_WANT_READ"; break; - case SSL_ERROR_WANT_WRITE: errstr = "SSL_ERROR_WANT_WRITE"; break; - case SSL_ERROR_WANT_CONNECT: errstr = "SSL_ERROR_WANT_CONNECT"; break; - case SSL_ERROR_WANT_ACCEPT: errstr = "SSL_ERROR_WANT_ACCEPT"; break; - case SSL_ERROR_WANT_X509_LOOKUP: errstr = "SSL_ERROR_WANT_X509_LOOKUP"; break; - case SSL_ERROR_SYSCALL: errstr = "SSL_ERROR_SYSCALL"; break; - case SSL_ERROR_SSL: errstr = "SSL_ERROR_SSL"; break; - } - log_ereport(LOG_VERBOSE, "SSL accept error[%d]: %s", error, errstr); - free(conn); - conn = NULL; - system_close(clientfd); - } else { - conn->ssl = ssl; - conn->read = connection_ssl_read; - conn->write = connection_ssl_write; - conn->close = connection_ssl_close; - } + + conn->ssl = ssl; + conn->read = connection_ssl_read; + conn->write = connection_ssl_write; + conn->close = connection_ssl_close; } else { conn->ssl = NULL; conn->read = connection_read; diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/httprequest.c --- a/src/server/daemon/httprequest.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/httprequest.c Sat Jan 13 19:01:00 2018 +0100 @@ -40,6 +40,7 @@ #include "httprequest.h" #include "config.h" #include "vserver.h" +#include "event.h" #include "httplistener.h" #include "func.h" #include "error.h" @@ -86,7 +87,7 @@ return S("/"); } -int handle_request(HTTPRequest *request, threadpool_t *thrpool) { +int handle_request(HTTPRequest *request, threadpool_t *thrpool, EventHandler *ev) { // handle nsapi request // create pool @@ -110,23 +111,22 @@ sn->connection = request->connection; sn->netbuf = request->netbuf; sn->sn.pool = pool; - //sn->sn.csd = stream_new_from_fd(pool, request->connection->fd); - //sn->sn.csd = net_stream_from_fd(pool, request->connection->fd); - IOStream *io; - if(request->connection->ssl) { - io = sslstream_new(pool, request->connection->ssl); - sn->sn.ssl = 1; - } else { - io = sysstream_new(pool, request->connection->fd); - } + SessionHandler *sh = request->connection->session_handler; + WSBool ssl; + IOStream *io = sh->create_iostream(sh, request->connection, pool, &ssl); sn->sn.csd = httpstream_new(pool, io); - + sn->sn.ssl = ssl; sn->sn.client = pblock_create_pool(sn->sn.pool, 8); sn->sn.next = NULL; sn->sn.fill = 1; sn->sn.subject = NULL; + if(!ev) { + ev = ev_instance(get_default_event_handler()); + } + sn->sn.ev = ev; + // the session needs the current server configuration sn->config = request->connection->listener->cfg; diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/httprequest.h --- a/src/server/daemon/httprequest.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/httprequest.h Sat Jan 13 19:01:00 2018 +0100 @@ -79,7 +79,7 @@ * request: request object * pool: current thread pool or NULL */ -int handle_request(HTTPRequest *request, threadpool_t *pool); +int handle_request(HTTPRequest *request, threadpool_t *pool, EventHandler *ev); @@ -105,7 +105,7 @@ NSAPIRequest *rq, threadpool_t *pool); -void nsapi_function_return(Session *sn, Request *rq, int ret); +//void nsapi_function_return(Session *sn, Request *rq, int ret); void nsapi_change_threadpool( NSAPISession *sn, diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/sessionhandler.c --- a/src/server/daemon/sessionhandler.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/sessionhandler.c Sat Jan 13 19:01:00 2018 +0100 @@ -40,9 +40,9 @@ #include "httplistener.h" typedef struct _event_http_io { - HTTPRequest *request; - HttpParser *parser; - int error; + HTTPRequest *request; + HttpParser *parser; + int error; } EventHttpIO; @@ -105,11 +105,30 @@ free(conn); } +IOStream* create_connection_iostream( + SessionHandler *sh, + Connection *conn, + pool_handle_t *pool, + WSBool *ssl) +{ + IOStream *io = NULL; + if(conn->ssl) { + io = sslstream_new(pool, conn->ssl); + *ssl = 1; + } else { + io = sysstream_new(pool, conn->fd); + *ssl = 0; + } + return io; +} + + SessionHandler* create_basic_session_handler() { BasicSessionHandler *handler = malloc(sizeof(BasicSessionHandler)); handler->threadpool = threadpool_new(4, 8); handler->sh.enqueue_connection = basic_enq_conn; handler->sh.keep_alive = basic_keep_alive; + handler->sh.create_iostream = create_connection_iostream; return (SessionHandler*)handler; } @@ -170,7 +189,7 @@ } // process request - r = handle_request(&request, NULL); // TODO: use correct thread pool + r = handle_request(&request, NULL, NULL); // TODO: use correct thread pool // TODO: free, see evt_request_finish @@ -189,6 +208,7 @@ handler->eventhandler = get_default_event_handler(); handler->sh.enqueue_connection = evt_enq_conn; handler->sh.keep_alive = evt_keep_alive; + handler->sh.create_iostream = create_connection_iostream; return (SessionHandler*)handler; } @@ -200,7 +220,7 @@ // set socket non blocking int flags; - if (-1 == (flags = fcntl(conn->fd, F_GETFL, 0))) { + if ((flags = fcntl(conn->fd, F_GETFL, 0)) == -1) { flags = 0; } if (fcntl(conn->fd, F_SETFL, flags | O_NONBLOCK) != 0) { @@ -238,14 +258,14 @@ * evt_enq_conn() --> event handler --> handle_request() */ - event_handler_t *ev = ((EventSessionHandler*)handler)->eventhandler; - - event_t *event = malloc(sizeof(event_t)); - ZERO(event, sizeof(event_t)); - event->fn = evt_request_input; + Event *event = malloc(sizeof(Event)); + ZERO(event, sizeof(Event)); + event->fn = conn->ssl ? evt_request_ssl_accept : evt_request_input; event->finish = evt_request_finish; event->cookie = io; + EventHandler *ev = ev_instance(((EventSessionHandler*)handler)->eventhandler); + if(ev_pollin(ev, conn->fd, event) != 0) { // TODO: ev_pollin should log, intercept some errors here log_ereport(LOG_FAILURE, "Cannot enqueue connection"); @@ -254,7 +274,44 @@ } } -int evt_request_input(event_handler_t *handler, event_t *event) { +int evt_request_ssl_accept(EventHandler *handler, Event *event) { + EventHttpIO *io = event->cookie; + Connection *conn = io->request->connection; + + int ret = SSL_accept(conn->ssl); + if(ret <= 0) { + int error = SSL_get_error(conn->ssl, ret); + char *errstr; + switch(error) { + default: errstr = "unknown"; break; + case SSL_ERROR_WANT_READ: { + event->events = EVENT_POLLIN; + return 1; + } + case SSL_ERROR_WANT_WRITE: { + event->events = EVENT_POLLOUT; + return 1; + } + case SSL_ERROR_ZERO_RETURN: errstr = "SSL_ERROR_ZERO_RETURN"; break; + case SSL_ERROR_WANT_CONNECT: errstr = "SSL_ERROR_WANT_CONNECT"; break; + case SSL_ERROR_WANT_ACCEPT: errstr = "SSL_ERROR_WANT_ACCEPT"; break; + case SSL_ERROR_WANT_X509_LOOKUP: errstr = "SSL_ERROR_WANT_X509_LOOKUP"; break; + case SSL_ERROR_SYSCALL: errstr = "SSL_ERROR_SYSCALL"; break; + case SSL_ERROR_SSL: errstr = "SSL_ERROR_SSL"; break; + + log_ereport(LOG_VERBOSE, "SSL accept error[%d]: %s", error, errstr); + event->finish = evt_request_error; + io->error = 1; + return 0; + } + } + + // SSL_accept successful, start request input now + event->fn = evt_request_input; + return evt_request_input(handler, event); +} + +int evt_request_input(EventHandler *handler, Event *event) { EventHttpIO *io = event->cookie; HttpParser *parser = io->parser; HTTPRequest *request = io->request; @@ -272,11 +329,11 @@ // SSL specific error handling switch(conn->ssl_error) { case SSL_ERROR_WANT_READ: { - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; return 1; } case SSL_ERROR_WANT_WRITE: { - event->poll = EVENT_POLLOUT; + event->events = EVENT_POLLOUT; return 1; } } @@ -302,7 +359,7 @@ * we need more data -> return 1 to tell the event handler to * continue polling */ - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; return 1; } @@ -336,12 +393,12 @@ return 0; } -int evt_request_finish(event_handler_t *h, event_t *event) { +int evt_request_finish(EventHandler *h, Event *event) { EventHttpIO *io = event->cookie; HttpParser *parser = io->parser; HTTPRequest *request = io->request; - int r = handle_request(request, NULL); + int r = handle_request(request, NULL, h); if(r != 0) { // TODO: error message connection_destroy(request->connection); @@ -362,7 +419,7 @@ return 0; } -int evt_request_error(event_handler_t *h, event_t *event) { +int evt_request_error(EventHandler *h, Event *event) { EventHttpIO *io = event->cookie; HttpParser *parser = io->parser; HTTPRequest *request = io->request; diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/sessionhandler.h --- a/src/server/daemon/sessionhandler.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/sessionhandler.h Sat Jan 13 19:01:00 2018 +0100 @@ -31,6 +31,7 @@ #include "../util/thrpool.h" #include "../public/nsapi.h" +#include "../util/io.h" #include "event.h" #include @@ -72,6 +73,11 @@ * available */ void(*keep_alive)(SessionHandler*, Connection *conn); + + /* + * Creates an IOStream object for the connection + */ + IOStream*(*create_iostream)(SessionHandler *sh, Connection *conn, pool_handle_t *pool, WSBool *ssl); }; /* @@ -93,7 +99,7 @@ */ typedef struct _event_session_handler { SessionHandler sh; - event_handler_t *eventhandler; + EVHandler *eventhandler; } EventSessionHandler; /* @@ -111,6 +117,16 @@ void connection_destroy(Connection *conn); +/* + * generic create_iostream function for BasicSessionHandler + * and EventSessionHandler + */ +IOStream* create_connection_iostream( + SessionHandler *sh, + Connection *conn, + pool_handle_t *pool, + WSBool *ssl); + SessionHandler* create_basic_session_handler(); @@ -125,9 +141,10 @@ void evt_enq_conn(SessionHandler *handler, Connection *conn); -int evt_request_input(event_handler_t *h, event_t *event); -int evt_request_finish(event_handler_t *h, event_t *event); -int evt_request_error(event_handler_t *h, event_t *event); +int evt_request_ssl_accept(EventHandler *handler, Event *event); +int evt_request_input(EventHandler *h, Event *event); +int evt_request_finish(EventHandler *h, Event *event); +int evt_request_error(EventHandler *h, Event *event); void evt_keep_alive(SessionHandler *handler, Connection *conn); diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/threadpools.c --- a/src/server/daemon/threadpools.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/threadpools.c Sat Jan 13 19:01:00 2018 +0100 @@ -34,12 +34,16 @@ #include "threadpools.h" -UcxMap *thread_pool_map = NULL; -int numthrpools = 0; +static UcxMap *thread_pool_map; +static int num_thrpools; +static UcxMap *io_pool_map; +static int num_iopools; -threadpool_t *default_thread_pool = NULL; +static threadpool_t *default_thread_pool; +static threadpool_t *last_thrpool_c; -threadpool_t *last_thrpool_c = NULL; +static threadpool_t *default_io_pool; +static threadpool_t *last_io_pool; int create_threadpool(sstr_t name, ThreadPoolConfig *cfg) { if(thread_pool_map == NULL) { @@ -62,8 +66,37 @@ int ret = ucx_map_sstr_put(thread_pool_map, name, tp); if(ret == 0) { - numthrpools++; + num_thrpools++; last_thrpool_c = tp; + if(!default_thread_pool) { + default_thread_pool = tp; + } + } + + return ret; + } +} + +int create_io_pool(sstr_t name, int numthreads) { + if(io_pool_map == NULL) { + io_pool_map = ucx_map_new(4); + } + threadpool_t *pool = ucx_map_sstr_get(io_pool_map, name); + if(pool) { + pool->min_threads = numthreads; + pool->max_threads = numthreads; + return 0; + } else { + threadpool_t *tp = threadpool_new(numthreads, numthreads); + + int ret = ucx_map_sstr_put(io_pool_map, name, tp); + + if(ret == 0) { + num_iopools++; + last_io_pool = tp; + if(!default_io_pool) { + default_io_pool = tp; + } } return ret; @@ -71,22 +104,24 @@ } int check_thread_pool_cfg() { - if(numthrpools > 0 ) { - if(default_thread_pool) { - return 0; - } else { - default_thread_pool = last_thrpool_c; - return 0; + if(num_thrpools == 0) { + ThreadPoolConfig cfg; + cfg.min_threads = 4; + cfg.max_threads = 8; + cfg.queue_size = 64; + cfg.stack_size = 262144; + if(create_threadpool(sstr("default"), &cfg)) { + return 1; } } - ThreadPoolConfig cfg; - cfg.min_threads = 4; - cfg.max_threads = 8; - cfg.queue_size = 64; - cfg.stack_size = 262144; + if(num_iopools == 0) { + if(create_io_pool(sstr("default"), 8)) { + return 1; + } + } - return create_threadpool(sstr("default"), &cfg); + return 0; } threadpool_t* get_default_threadpool() { @@ -96,3 +131,11 @@ threadpool_t* get_threadpool(sstr_t name) { return ucx_map_sstr_get(thread_pool_map, name); } + +threadpool_t* get_default_iopool() { + return default_io_pool; +} + +threadpool_t* get_iopool(sstr_t name) { + return ucx_map_sstr_get(io_pool_map, name); +} diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/threadpools.h --- a/src/server/daemon/threadpools.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/threadpools.h Sat Jan 13 19:01:00 2018 +0100 @@ -51,6 +51,8 @@ threadpool_t* get_default_threadpool(); threadpool_t* get_threadpool(sstr_t name); +threadpool_t* get_default_iopool(); +threadpool_t* get_iopool(sstr_t name); #ifdef __cplusplus } diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/vfs.c --- a/src/server/daemon/vfs.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/vfs.c Sat Jan 13 19:01:00 2018 +0100 @@ -32,20 +32,42 @@ #include #include #include +#include #include #include "../util/pool.h" #include "netsite.h" #include "acl.h" #include "vfs.h" +#include "threadpools.h" +#include "event.h" + +#define VFS_MALLOC(pool, size) pool ? pool_malloc(pool, size) : malloc(size) +#define VFS_FREE(pool, ptr) pool ? pool_free(pool, ptr) : free(ptr) static UcxMap *vfs_map; +static VFS sys_vfs = { + sys_vfs_open, + sys_vfs_stat, + sys_vfs_fstat, + sys_vfs_opendir, + sys_vfs_mkdir, + sys_vfs_unlink, + VFS_CHECKS_ACL +}; + static VFS_IO sys_file_io = { sys_file_read, sys_file_write, + sys_file_pread, + sys_file_pwrite, sys_file_seek, - sys_file_close + sys_file_close, + //sys_file_aioread, + //sys_file_aiowrite, + NULL, + NULL }; static VFS_DIRIO sys_dir_io = { @@ -77,7 +99,7 @@ VFSContext *ctx = pool_malloc(sn->pool, sizeof(VFSContext)); ctx->sn = sn; ctx->rq = rq; - ctx->vfs = rq->vfs; + ctx->vfs = rq->vfs ? rq->vfs : &sys_vfs; ctx->user = acllist_getuser(sn, rq, rq->acllist); ctx->acllist = rq->acllist; ctx->aclreqaccess = rq->aclreqaccess; @@ -87,36 +109,164 @@ } SYS_FILE vfs_open(VFSContext *ctx, char *path, int oflags) { + WS_ASSERT(ctx); + WS_ASSERT(path); + + uint32_t access_mask = ctx->aclreqaccess | acl_oflag2mask(oflags); + + // ctx->aclreqaccess should be the complete access mask + uint32_t m = ctx->aclreqaccess; // save original access mask + ctx->aclreqaccess = access_mask; // set mask for vfs->open call + if((ctx->vfs->flags & VFS_CHECKS_ACL) != VFS_CHECKS_ACL) { + // VFS does not evaluates the ACL itself, so we have to do it here + SysACL sysacl; + if(sys_acl_check(ctx, access_mask, &sysacl)) { + return NULL; + } + } + SYS_FILE file = ctx->vfs->open(ctx, path, oflags); + ctx->aclreqaccess = m; // restore original access mask + return file; +} + +SYS_FILE vfs_openRO(VFSContext *ctx, char *path) { + return vfs_open(ctx, path, O_RDONLY); +} + +SYS_FILE vfs_openWO(VFSContext *ctx, char *path) { + return vfs_open(ctx, path, O_WRONLY | O_CREAT); +} + +SYS_FILE vfs_openRW(VFSContext *ctx, char *path) { + return vfs_open(ctx, path, O_RDONLY | O_WRONLY | O_CREAT); +} + +int vfs_stat(VFSContext *ctx, char *path, struct stat *buf) { + WS_ASSERT(ctx); + WS_ASSERT(path); + + uint32_t access_mask = ctx->aclreqaccess | ACL_READ_ATTRIBUTES; + + // ctx->aclreqaccess should be the complete access mask + uint32_t m = ctx->aclreqaccess; // save original access mask + ctx->aclreqaccess = access_mask; // set mask for vfs->open call + if((ctx->vfs->flags & VFS_CHECKS_ACL) != VFS_CHECKS_ACL) { + // VFS does not evaluates the ACL itself, so we have to do it here + SysACL sysacl; + if(sys_acl_check(ctx, access_mask, &sysacl)) { + return -1; + } + } + int ret = ctx->vfs->stat(ctx, path, buf); + ctx->aclreqaccess = m; // restore original access mask + return ret; +} + +int vfs_fstat(VFSContext *ctx, SYS_FILE fd, struct stat *buf) { + WS_ASSERT(ctx); + WS_ASSERT(fd); + WS_ASSERT(buf); + + return ctx->vfs->fstat(ctx, fd, buf); +} + +void vfs_close(SYS_FILE fd) { + WS_ASSERT(fd); + + fd->io->close(fd); + if(fd->ctx) { + pool_free(fd->ctx->pool, fd); + } else { + free(fd); + } +} + +VFS_DIR vfs_opendir(VFSContext *ctx, char *path) { + WS_ASSERT(ctx); WS_ASSERT(path); - Session *sn; - Request *rq; - pool_handle_t *pool; - uint32_t access_mask; + uint32_t access_mask = ctx->aclreqaccess | ACL_LIST; - if(ctx) { - access_mask = ctx->aclreqaccess; - access_mask |= acl_oflag2mask(oflags); - if(!ctx->pool) { - // TODO: log warning - // broken VFSContext + // ctx->aclreqaccess should be the complete access mask + uint32_t m = ctx->aclreqaccess; // save original access mask + ctx->aclreqaccess = access_mask; // set mask for vfs->open call + if((ctx->vfs->flags & VFS_CHECKS_ACL) != VFS_CHECKS_ACL) { + // VFS does not evaluates the ACL itself, so we have to do it here + SysACL sysacl; + if(sys_acl_check(ctx, access_mask, &sysacl)) { + return NULL; } - if(ctx->vfs) { - // ctx->aclreqaccess should be the complete access mask - uint32_t m = ctx->aclreqaccess; // save original access mask - ctx->aclreqaccess = access_mask; // set mask for vfs->open call - SYS_FILE file = ctx->vfs->open(ctx, path, oflags); - ctx->aclreqaccess = m; // restore original access mask - return file; - } else { - pool = ctx->pool; + } + VFS_DIR dir = ctx->vfs->opendir(ctx, path); + ctx->aclreqaccess = m; // restore original access mask + return dir; +} + +int vfs_readdir(VFS_DIR dir, VFS_ENTRY *entry) { + WS_ASSERT(dir); + WS_ASSERT(entry); + + return dir->io->readdir(dir, entry, 0); +} + +int vfs_readdir_stat(VFS_DIR dir, VFS_ENTRY *entry) { + WS_ASSERT(dir); + WS_ASSERT(entry); + + return dir->io->readdir(dir, entry, 1); +} + +void vfs_closedir(VFS_DIR dir) { + WS_ASSERT(dir); + + dir->io->close(dir); + if(dir->ctx) { + VFS_FREE(dir->ctx->pool, dir); + } else { + free(dir); + } +} + +int vfs_mkdir(VFSContext *ctx, char *path) { + WS_ASSERT(ctx); + WS_ASSERT(path); + + return vfs_path_op(ctx, path, ctx->vfs->mkdir, ACL_ADD_FILE); +} + +int vfs_unlink(VFSContext *ctx, char *path) { + WS_ASSERT(ctx); + WS_ASSERT(path); + + return vfs_path_op(ctx, path, ctx->vfs->unlink, ACL_DELETE); +} + + +// private +int vfs_path_op(VFSContext *ctx, char *path, vfs_op_f op, uint32_t access) { + uint32_t access_mask = ctx->aclreqaccess; + access_mask |= access; + + // ctx->aclreqaccess should be the complete access mask + uint32_t m = ctx->aclreqaccess; // save original access mask + ctx->aclreqaccess = access_mask; // set mask for vfs function call + if((ctx->vfs->flags & VFS_CHECKS_ACL) != VFS_CHECKS_ACL) { + // VFS does not evaluates the ACL itself, so we have to do it here + SysACL sysacl; + if(sys_acl_check(ctx, access_mask, &sysacl)) { + return -1; } - } else { - sn = NULL; - rq = NULL; - pool = NULL; - access_mask = acl_oflag2mask(oflags); } + int ret = op(ctx, path); + ctx->aclreqaccess = m; // restore original access mask + return ret; +} + +/* system vfs implementation */ + +SYS_FILE sys_vfs_open(VFSContext *ctx, char *path, int oflags) { + uint32_t access_mask = ctx->aclreqaccess; + pool_handle_t *pool = ctx->pool; // check ACLs SysACL sysacl; @@ -151,9 +301,7 @@ } } - - VFSFile *file = pool ? - pool_malloc(pool, sizeof(VFSFile)) : malloc(sizeof(VFSFile)); + VFSFile *file = VFS_MALLOC(pool, sizeof(VFSFile)); if(!file) { system_close(fd); return NULL; @@ -165,43 +313,8 @@ return file; } -SYS_FILE vfs_openRO(VFSContext *ctx, char *path) { - return vfs_open(ctx, path, O_RDONLY); -} - -SYS_FILE vfs_openWO(VFSContext *ctx, char *path) { - return vfs_open(ctx, path, O_WRONLY | O_CREAT); -} - -SYS_FILE vfs_openRW(VFSContext *ctx, char *path) { - return vfs_open(ctx, path, O_RDONLY | O_WRONLY | O_CREAT); -} - -int vfs_stat(VFSContext *ctx, char *path, struct stat *buf) { - Session *sn; - Request *rq; - uint32_t access_mask; - - if(ctx) { - access_mask = ctx->aclreqaccess; - access_mask |= ACL_READ_ATTRIBUTES; - if(!ctx->pool) { - // TODO: log warning - // broken VFSContext - } - if(ctx->vfs) { - // ctx->aclreqaccess should be the complete access mask - uint32_t m = ctx->aclreqaccess; // save original access mask - ctx->aclreqaccess = access_mask; // set mask for vfs->fstat call - int ret = ctx->vfs->stat(ctx, path, buf); - ctx->aclreqaccess = m; // restore original access mask - return ret; - } - } else { - sn = NULL; - rq = NULL; - access_mask = ACL_READ_ATTRIBUTES; - } +int sys_vfs_stat(VFSContext *ctx, char *path, struct stat *buf) { + uint32_t access_mask = ctx->aclreqaccess; // check ACLs SysACL sysacl; @@ -228,17 +341,7 @@ return 0; } -int vfs_fstat(VFSContext *ctx, SYS_FILE fd, struct stat *buf) { - if(ctx) { - if(!ctx->pool) { - // TODO: log warning - // broken VFSContext - } - if(ctx->vfs) { - return ctx->vfs->fstat(ctx, fd, buf); - } - } - +int sys_vfs_fstat(VFSContext *ctx, SYS_FILE fd, struct stat *buf) { // stat if(fstat(fd->fd, buf)) { if(ctx) { @@ -250,46 +353,9 @@ return 0; } -void vfs_close(SYS_FILE fd) { - fd->io->close(fd); - if(fd->ctx) { - pool_free(fd->ctx->pool, fd); - } else { - free(fd); - } -} - -VFS_DIR vfs_opendir(VFSContext *ctx, char *path) { - WS_ASSERT(path); - - Session *sn; - Request *rq; - pool_handle_t *pool; - uint32_t access_mask; - - if(ctx) { - access_mask = ctx->aclreqaccess; - access_mask |= ACL_LIST; - if(!ctx->pool) { - // TODO: log warning - // broken VFSContext - } - if(ctx->vfs) { - // ctx->aclreqaccess should be the complete access mask - uint32_t m = ctx->aclreqaccess; // save original access mask - ctx->aclreqaccess = access_mask; // set mask for vfs->opendir call - VFS_DIR dir = ctx->vfs->opendir(ctx, path); - ctx->aclreqaccess = m; // restore original access mask - return dir; - } else { - pool = ctx->pool; - } - } else { - sn = NULL; - rq = NULL; - pool = NULL; - access_mask = ACL_LIST; - } +VFS_DIR sys_vfs_opendir(VFSContext *ctx, char *path) { + uint32_t access_mask = ctx->aclreqaccess; + pool_handle_t *pool = ctx->pool; // check ACLs SysACL sysacl; @@ -327,26 +393,26 @@ return NULL; } - SysVFSDir *dir_data = pool ? - pool_malloc(pool, sizeof(SysVFSDir)) : malloc(sizeof(SysVFSDir)); + SysVFSDir *dir_data = VFS_MALLOC(pool, sizeof(SysVFSDir)); if(!dir_data) { closedir(sys_dir); return NULL; } long maxfilelen = fpathconf(dir_fd, _PC_NAME_MAX); size_t entry_len = offsetof(struct dirent, d_name) + maxfilelen + 1; - dir_data->cur = pool ? - pool_malloc(pool, entry_len) : malloc(entry_len); + dir_data->cur = VFS_MALLOC(pool, entry_len); if(!dir_data->cur) { closedir(sys_dir); + VFS_FREE(pool, dir_data); return NULL; } dir_data->dir = sys_dir; - VFSDir *dir = pool ? - pool_malloc(pool, sizeof(VFSDir)) : malloc(sizeof(VFSDir)); + VFSDir *dir = VFS_MALLOC(pool, sizeof(VFSDir)); if(!dir) { closedir(sys_dir); + VFS_FREE(pool, dir_data->cur); + VFS_FREE(pool, dir_data); return NULL; } dir->ctx = ctx; @@ -356,74 +422,26 @@ return dir; } -int vfs_readdir(VFS_DIR dir, VFS_ENTRY *entry) { - return dir->io->readdir(dir, entry, 0); -} - -int vfs_readdir_stat(VFS_DIR dir, VFS_ENTRY *entry) { - return dir->io->readdir(dir, entry, 1); +int sys_vfs_mkdir(VFSContext *ctx, char *path) { + return sys_path_op(ctx, path, sys_mkdir); } -void vfs_closedir(VFS_DIR dir) { - dir->io->close(dir); - if(dir->ctx) { - pool_free(dir->ctx->pool, dir); - } else { - free(dir); - } -} - -int vfs_mkdir(VFSContext *ctx, char *path) { - if(ctx && ctx->vfs) { - return vfs_path_op(ctx, path, ctx->vfs->mkdir, ACL_ADD_FILE); - } else { - return sys_path_op(ctx, path, sys_mkdir, ACL_ADD_FILE); - } -} - -int vfs_unlink(VFSContext *ctx, char *path) { - if(ctx && ctx->vfs) { - return vfs_path_op(ctx, path, ctx->vfs->unlink, ACL_DELETE); - } else { - return sys_path_op(ctx, path, sys_unlink, ACL_DELETE); - } +int sys_vfs_unlink(VFSContext *ctx, char *path) { + return sys_path_op(ctx, path, sys_unlink); } -// private -int vfs_path_op(VFSContext *ctx, char *path, vfs_op_f op, uint32_t access) { - Session *sn; - Request *rq; - +int sys_path_op(VFSContext *ctx, char *path, sys_op_f op) { uint32_t access_mask = ctx->aclreqaccess; - access_mask |= access; - if(!ctx->pool) { - // TODO: log warning - // broken VFSContext - return -1; - } - - // ctx->aclreqaccess should be the complete access mask - uint32_t m = ctx->aclreqaccess; // save original access mask - ctx->aclreqaccess = access_mask; // set mask for vfs function call - int ret = op(ctx, path); - ctx->aclreqaccess = m; // restore original access mask - return ret; -} - -int sys_path_op(VFSContext *ctx, char *path, sys_op_f op, uint32_t access) { - if(ctx) { - access |= ctx->aclreqaccess; - } // check ACLs SysACL sysacl; - if(sys_acl_check(ctx, access, &sysacl)) { + if(sys_acl_check(ctx, access_mask, &sysacl)) { return -1; } if(sysacl.acl) { - if(!fs_acl_check(&sysacl, ctx->user, path, access)) { + if(!fs_acl_check(&sysacl, ctx->user, path, access_mask)) { acl_set_error_status(ctx->sn, ctx->rq, sysacl.acl, ctx->user); return -1; } @@ -432,10 +450,8 @@ // do path operation if(op(ctx, path, &sysacl)) { // error - if(ctx) { - ctx->vfs_errno = errno; - sys_set_error_status(ctx); - } + ctx->vfs_errno = errno; + sys_set_error_status(ctx); return -1; } @@ -482,6 +498,14 @@ return write(fd->fd, buf, nbyte); } +ssize_t sys_file_pread(SYS_FILE fd, void *buf, size_t nbyte, off_t offset) { + return pread(fd->fd, buf, nbyte, offset); +} + +ssize_t sys_file_pwrite(SYS_FILE fd, const void *buf, size_t nbyte, off_t offset) { + return pwrite(fd->fd, buf, nbyte, offset); +} + off_t sys_file_seek(SYS_FILE fd, off_t offset, int whence) { return lseek(fd->fd, offset, whence); } @@ -490,6 +514,19 @@ system_close(fd->fd); } +int sys_file_aioread(aiocb_s *aiocb) { + WS_ASSERT(aiocb->buf); + WS_ASSERT(aiocb->nbytes > 0); + return ev_aioread(aiocb->filedes->fd, aiocb); +} + +int sys_file_aiowrite(aiocb_s *aiocb) { + WS_ASSERT(aiocb->buf); + WS_ASSERT(aiocb->nbytes > 0); + return ev_aiowrite(aiocb->filedes->fd, aiocb); +} + + int sys_dir_read(VFS_DIR dir, VFS_ENTRY *entry, int getstat) { SysVFSDir *dirdata = dir->data; struct dirent *result = NULL; @@ -500,19 +537,13 @@ return sys_dir_read(dir, entry, getstat); } else { entry->name = name; -#ifndef OSX - /* TODO: - * implement alternative for fstat for OS X and other crappy - * Unices - */ if(getstat) { // TODO: check ACLs again for new path if(fstatat(dir->fd, result->d_name, &entry->stat, 0)) { entry->stat_errno = errno; } entry->stat_extra = NULL; - } -#endif + } return 1; } } else { @@ -563,6 +594,14 @@ return fd->io->write(fd, buf, nbyte); } +NSAPI_PUBLIC int system_pread(SYS_FILE fd, void *buf, int nbyte, off_t offset) { + return fd->io->pread(fd, buf, nbyte, offset); +} + +NSAPI_PUBLIC int system_pwrite(SYS_FILE fd, const void *buf, int nbyte, off_t offset) { + return fd->io->pwrite(fd, buf, nbyte, offset); +} + NSAPI_PUBLIC off_t system_lseek(SYS_FILE fd, off_t offset, int whence) { return fd->io->seek(fd, offset, whence); } @@ -571,3 +610,64 @@ vfs_close(fd); return 0; } + +// AIO API + +NSAPI_PUBLIC int system_aio_read(aiocb_s *aiocb) { + if(!aiocb->event || !aiocb->evhandler) { + return -1; + } + + SYS_FILE file = aiocb->filedes; + aiocb->event->object = (intptr_t)aiocb; + if(file->io->opt_aioread) { + return file->io->opt_aioread(aiocb); + } else { + vfs_queue_aio(aiocb, VFS_AIO_READ); + return 0; + } +} + +NSAPI_PUBLIC int system_aio_write(aiocb_s *aiocb) { + if(!aiocb->event || !aiocb->evhandler) { + return -1; + } + + SYS_FILE file = aiocb->filedes; + aiocb->event->object = (intptr_t)aiocb; + if(file->io->opt_aiowrite) { + return file->io->opt_aiowrite(aiocb); + } else { + vfs_queue_aio(aiocb, VFS_AIO_WRITE); + return 0; + } +} + +static void* vfs_aio_read(aiocb_s *aiocb) { + int result = system_pread(aiocb->filedes, aiocb->buf, aiocb->nbytes, aiocb->offset); + aiocb->result = result; + if(result < 0) { + aiocb->result_errno = errno; + } + event_send(aiocb->evhandler, aiocb->event); + return NULL; +} + +static void* vfs_aio_write(aiocb_s *aiocb) { + int result = system_pwrite(aiocb->filedes, aiocb->buf, aiocb->nbytes, aiocb->offset); + aiocb->result = result; + if(result < 0) { + aiocb->result_errno = errno; + } + event_send(aiocb->evhandler, aiocb->event); + return NULL; +} + +void vfs_queue_aio(aiocb_s *aiocb, VFSAioOp op) { + threadpool_t *pool = get_default_iopool(); // TODO: use specific IOPool + if(op == VFS_AIO_READ) { + threadpool_run(pool, (job_callback_f)vfs_aio_read, aiocb); + } else if(VFS_AIO_WRITE) { + threadpool_run(pool, (job_callback_f)vfs_aio_write, aiocb); + } +} diff -r f33974f0dce0 -r aa8393527b1e src/server/daemon/vfs.h --- a/src/server/daemon/vfs.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/daemon/vfs.h Sat Jan 13 19:01:00 2018 +0100 @@ -40,25 +40,47 @@ DIR *dir; struct dirent *cur; } SysVFSDir; + +enum VFSAioOp { + VFS_AIO_READ = 0, + VFS_AIO_WRITE +}; +typedef enum VFSAioOp VFSAioOp; int vfs_init(); typedef int(*vfs_op_f)(VFSContext *, char *); typedef int(*sys_op_f)(VFSContext *, char *, SysACL *); int vfs_path_op(VFSContext *ctx, char *path, vfs_op_f op, uint32_t access); -int sys_path_op(VFSContext *ctx, char *path, sys_op_f op, uint32_t access); +SYS_FILE sys_vfs_open(VFSContext *ctx, char *path, int oflags); +int sys_vfs_stat(VFSContext *ctx, char *path, struct stat *buf); +int sys_vfs_fstat(VFSContext *ctx, SYS_FILE fd, struct stat *buf); +VFS_DIR sys_vfs_opendir(VFSContext *ctx, char *path); +int sys_vfs_mkdir(VFSContext *ctx, char *path); +int sys_vfs_unlink(VFSContext *ctx, char *path); + +int sys_path_op(VFSContext *ctx, char *path, sys_op_f op); int sys_acl_check(VFSContext *ctx, uint32_t access_mask, SysACL *externacl); void sys_set_error_status(VFSContext *ctx); + ssize_t sys_file_read(SYS_FILE fd, void *buf, size_t nbyte); ssize_t sys_file_write(SYS_FILE fd, const void *buf, size_t nbyte); +ssize_t sys_file_pread(SYS_FILE fd, void *buf, size_t nbyte, off_t offset); +ssize_t sys_file_pwrite(SYS_FILE fd, const void *buf, size_t nbyte, off_t offset); off_t sys_file_seek(SYS_FILE fd, off_t offset, int whence); void sys_file_close(SYS_FILE fd); +int sys_file_aioread(aiocb_s *aiocb); +int sys_file_aiowrite(aiocb_s *aiocb); + int sys_dir_read(VFS_DIR dir, VFS_ENTRY *entry, int getstat); void sys_dir_close(VFS_DIR dir); + int sys_mkdir(VFSContext *ctx, char *path, SysACL *sysacl); int sys_unlink(VFSContext *ctx, char *path, SysACL *sysacl); +void vfs_queue_aio(aiocb_s *aiocb, VFSAioOp op); + #ifdef __cplusplus } #endif diff -r f33974f0dce0 -r aa8393527b1e src/server/public/nsapi.h --- a/src/server/public/nsapi.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/public/nsapi.h Sat Jan 13 19:01:00 2018 +0100 @@ -605,6 +605,28 @@ }; +// nsapi ext begin + +#define EVENT_POLLIN 0x1 +#define EVENT_POLLOUT 0x2 + +typedef struct EventHandler EventHandler; +typedef struct Event Event; + +typedef int(*eventfunc)(EventHandler*, Event*); + +struct Event { + eventfunc fn; + eventfunc finish; + void *cookie; + intptr_t object; + int events; + int error; +}; + +// nsapi ext end + + typedef void* CONDVAR; typedef void *COUNTING_SEMAPHORE; typedef void* CRITICAL; @@ -684,6 +706,8 @@ struct in_addr iaddr; pool_handle_t *pool; + + EventHandler *ev; /* event handler instance (new) */ void *clauth; /* v2 ACL client authentication information */ struct Session *next; @@ -1117,7 +1141,19 @@ #define ISMOPTIONS(r) ((r)->method_num == METHOD_OPTIONS) -// new type +// new types +typedef struct aiocb_s { + SYS_FILE filedes; + void *buf; + size_t nbytes; + off_t offset; + ssize_t result; + int result_errno; + Event *event; + EventHandler *evhandler; +} aiocb_s; + + typedef struct _thread_pool threadpool_t; typedef struct _threadpool_job threadpool_job; typedef void*(*job_callback_f)(void *data); @@ -1298,6 +1334,8 @@ // NSAPI extension ssize_t net_printf(SYS_NETFD fd, char *format, ...); +int net_setnonblock(SYS_NETFD fd, int nonblock); +int net_errno(SYS_NETFD fd); NSAPI_PUBLIC pb_param *INTparam_create(const char *name, const char *value); @@ -1503,9 +1541,14 @@ /* file */ NSAPI_PUBLIC int system_fread(SYS_FILE fd, void *buf, int nbyte); NSAPI_PUBLIC int system_fwrite(SYS_FILE fd, const void *buf, int nbyte); +NSAPI_PUBLIC int system_pread(SYS_FILE fd, void *buf, int nbyte, off_t offset); +NSAPI_PUBLIC int system_pwrite(SYS_FILE fd, const void *buf, int nbyte, off_t offset); NSAPI_PUBLIC off_t system_lseek(SYS_FILE fd, off_t offset, int whence); NSAPI_PUBLIC int system_fclose(SYS_FILE fd); +NSAPI_PUBLIC int system_aio_read(aiocb_s *aiocb); +NSAPI_PUBLIC int system_aio_write(aiocb_s *aiocb); + int log_ereport(int degree, const char *format, ...); int log_ereport_v(int degree, const char *format, va_list args); @@ -1521,6 +1564,7 @@ NSAPI_PUBLIC pblock* util_parse_param(pool_handle_t *pool, char *query); #define util_parse_param util_parse_param +void nsapi_function_return(Session *sn, Request *rq, int ret); // threadpool threadpool_t* threadpool_new(int min, int max); @@ -1528,6 +1572,10 @@ threadpool_job* threadpool_get_job(threadpool_t *pool); void threadpool_run(threadpool_t *pool, job_callback_f func, void *data); +int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event); +int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event); +int event_removepoll(EventHandler *ev, SYS_NETFD fd); +int event_send(EventHandler *ev, Event *event); // assert void ws_log_assert(const char *file, const char *func, int line); diff -r f33974f0dce0 -r aa8393527b1e src/server/public/vfs.h --- a/src/server/public/vfs.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/public/vfs.h Sat Jan 13 19:01:00 2018 +0100 @@ -36,6 +36,8 @@ extern "C" { #endif +#define VFS_CHECKS_ACL 0x0001 + typedef struct VFS_IO VFS_IO; typedef struct VFS_DIRIO VFS_DIRIO; typedef struct VFSFile VFSFile; @@ -52,6 +54,7 @@ VFS_DIR (*opendir)(VFSContext *ctx, char *path); int (*mkdir)(VFSContext *ctx, char *path); int (*unlink)(VFSContext *ctx, char *path); + uint32_t flags; }; struct VFSContext { @@ -89,8 +92,12 @@ struct VFS_IO { ssize_t (*read)(SYS_FILE fd, void *buf, size_t nbyte); ssize_t (*write)(SYS_FILE fd, const void *buf, size_t nbyte); + ssize_t (*pread)(SYS_FILE fd, void *buf, size_t nbyte, off_t offset); + ssize_t (*pwrite)(SYS_FILE fd, const void *buf, size_t nbyte, off_t offset); off_t (*seek)(SYS_FILE fd, off_t offset, int whence); void (*close)(SYS_FILE fd); + int (*opt_aioread)(aiocb_s *aiocb); + int (*opt_aiowrite)(aiocb_s *aiocb); }; struct VFS_DIRIO { diff -r f33974f0dce0 -r aa8393527b1e src/server/safs/service.c --- a/src/server/safs/service.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/safs/service.c Sat Jan 13 19:01:00 2018 +0100 @@ -43,7 +43,6 @@ #include - /* * prepares servicing a file * @@ -277,6 +276,223 @@ return 0; } + +static void send_range_cleanup(AsyncSendRange *asr) { + WSBool error = asr->error; + Session *sn = asr->sn; + Request *rq = asr->rq; + + pool_handle_t *pool = asr->sn->pool; + vfs_close(asr->in); + pool_free(pool, asr->aio->buf); + pool_free(pool, asr->aio); + pool_free(pool, asr->readev); + pool_free(pool, asr->writeev); + pool_free(pool, asr); + + int ret = REQ_PROCEED; + if(error) { + rq->rq_attr.keep_alive = 0; + ret = REQ_ABORTED; + } + // return to nsapi loop + nsapi_function_return(sn, rq, ret); +} + +static int send_buf( + SYS_NETFD out, + char *restrict buf, + size_t len, + size_t *restrict pos) +{ + while(*pos < len) { + ssize_t w = net_write(out, buf + *pos, len - *pos); + if(w <= 0) { + return -1; + } + *pos += w; + } + return 0; +} + +static int send_bytes(AsyncSendRange *asr, WSBool *completed) { + *completed = FALSE; + if(asr->header) { + if(send_buf(asr->out, asr->header, asr->headerlen, &asr->headerpos)) { + if(net_errno(asr->out) == EAGAIN) { + return 0; + } else { + asr->error = TRUE; + return 1; + } + } + if(asr->headerpos >= asr->headerlen) { + asr->header = NULL; + } + } + + if(send_buf(asr->out, asr->aio->buf, asr->aio->result, &asr->wpos)) { + if(net_errno(asr->out) == EAGAIN) { + return 0; + } else { + asr->error = TRUE; + return 1; + } + } + + if(!asr->read_complete) { + // write completed => new asynchronous read + asr->aio->offset += asr->aio->result; + size_t length = asr->end - asr->offset; + asr->aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length; + asr->read_inprogress = TRUE; + if(system_aio_read(asr->aio)) { + asr->error = TRUE; + return 1; + } + } + *completed = TRUE; + return 0; +} + +static int send_range_readevent(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + asr->read_inprogress = FALSE; + asr->wpos = 0; + asr->offset += asr->aio->result; + if(asr->error || asr->aio->result < 0) { + return 0; + } + + int ret = 1; + if(asr->aio->result == 0 || asr->offset >= asr->end) { + asr->read_complete = TRUE; + ret = 0; + } + + WSBool completed; + if(send_bytes(asr, &completed)) { + return 0; + } + if(!completed && !asr->write_inprogress) { + asr->write_inprogress = TRUE; + if(event_pollout(ev, asr->out, asr->writeev)) { + asr->error = TRUE; + return 0; + } + } + + return ret; +} + +static int send_range_writeevent(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(asr->error) { + return 1; + } + + WSBool completed; + if(send_bytes(asr, &completed)) { + return 1; + } + + if(completed) { + return 0; + } + + return 1; +} + +static int send_range_aio_finish(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(!asr->write_inprogress) { + send_range_cleanup(asr); + } + asr->read_inprogress = FALSE; + return 0; +} + +static int send_range_poll_finish(EventHandler *ev, Event *event) { + AsyncSendRange *asr = event->cookie; + if(!asr->read_inprogress) { + send_range_cleanup(asr); + } + asr->write_inprogress = FALSE; + return 0; +} + +static int send_range_aio(Session *sn, Request *rq, SYS_FILE fd, off_t offset, off_t length, char *header, int headerlen) { + net_setnonblock(sn->csd, TRUE); + + // try to send the header + ssize_t hw = net_write(sn->csd, header, headerlen); + if(hw < 0) { + if(net_errno(sn->csd) == EAGAIN) { + hw = 0; + } else { + return REQ_ABORTED; + } + } + + AsyncSendRange *asr = pool_malloc(sn->pool, sizeof(AsyncSendRange)); + asr->sn = sn; + asr->rq = rq; + asr->in = fd; + asr->out = sn->csd; + asr->offset = offset; + asr->end = offset + length; + //asr->length = length; + asr->pos = offset; + asr->read_complete = FALSE; + asr->read_inprogress = FALSE; + asr->write_inprogress = FALSE; + asr->error = FALSE; + if(hw == headerlen) { + asr->header = NULL; + asr->headerlen = 0; + asr->headerpos = 0; + } else { + asr->header = header; + asr->headerlen = headerlen; + asr->headerpos = hw; + } + + Event *readev = pool_malloc(sn->pool, sizeof(Event)); + ZERO(readev, sizeof(Event)); + readev->cookie = asr; + readev->fn = send_range_readevent; + readev->finish = send_range_aio_finish; + + Event *writeev = pool_malloc(sn->pool, sizeof(Event)); + ZERO(writeev, sizeof(Event)); + writeev->cookie = asr; + writeev->fn = send_range_writeevent; + writeev->finish = send_range_poll_finish; + + asr->readev = readev; + asr->writeev = writeev; + + aiocb_s *aio = pool_malloc(sn->pool, sizeof(aiocb_s)); + aio->buf = pool_malloc(sn->pool, AIO_BUF_SIZE); + aio->nbytes = AIO_BUF_SIZE < length ? AIO_BUF_SIZE : length; + aio->filedes = fd; + aio->offset = offset; + aio->evhandler = sn->ev; + aio->event = readev; + + asr->aio = aio; + asr->wpos = 0; + + asr->read_inprogress = TRUE; + if(system_aio_read(aio)) { + send_range_cleanup(asr); + return REQ_ABORTED; + } + asr->read_inprogress = TRUE; + + return REQ_PROCESSING; +} + struct multi_range_elm { sstr_t header; off_t offset; @@ -416,24 +632,30 @@ length = s.st_size; } + int ret = REQ_NOACTION; if(single_range) { // send response header http_start_response(sn, rq); // send content + ret = send_range_aio(sn, rq, fd, offset, length, NULL, 0); + if(ret == REQ_PROCESSING) { + return ret; + } +/* if(send_range(sn, fd, offset, length, NULL, 0)) { // TODO: error } +//*/ } else { - if(send_multi_range(sn, rq, fd, s.st_size, range)) { - // TODO: error - } + ret = send_multi_range(sn, rq, fd, s.st_size, range); + // TODO: error } // cleanup vfs_close(fd); free_range(sn, range); - return REQ_PROCEED; + return ret; } diff -r f33974f0dce0 -r aa8393527b1e src/server/safs/service.h --- a/src/server/safs/service.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/safs/service.h Sat Jan 13 19:01:00 2018 +0100 @@ -30,11 +30,14 @@ #define SERVICE_H #include "../public/nsapi.h" +#include "../util/systems.h" #ifdef __cplusplus extern "C" { #endif +#define AIO_BUF_SIZE 16384 + typedef struct HttpRange HttpRange; struct HttpRange { @@ -42,7 +45,28 @@ off_t end; HttpRange *next; }; - + +typedef struct AsyncSendRange { + Session *sn; + Request *rq; + SYS_FILE in; + SYS_NETFD out; + off_t offset; + off_t end; + //off_t length; + off_t pos; + char *header; + int headerlen; + size_t headerpos; + Event *readev; + Event *writeev; + aiocb_s *aio; + size_t wpos; + WSBool read_complete; + WSBool read_inprogress; + WSBool write_inprogress; + WSBool error; +} AsyncSendRange; int send_file(pblock *pb, Session *sn, Request *rq); diff -r f33974f0dce0 -r aa8393527b1e src/server/util/io.c --- a/src/server/util/io.c Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/util/io.c Sat Jan 13 19:01:00 2018 +0100 @@ -65,6 +65,7 @@ #include "io.h" #include "pool.h" #include "../daemon/netsite.h" +#include "../daemon/event.h" #include "ucx/utils.h" IOStream native_io_funcs = { @@ -73,7 +74,10 @@ (io_read_f)net_sys_read, (io_sendfile_f)NET_SYS_SENDFILE, (io_close_f)net_sys_close, - NULL + NULL, + (io_setmode_f)net_sys_setmode, + (io_poll_f)net_sys_poll, + 0 }; IOStream http_io_funcs = { @@ -82,7 +86,10 @@ (io_read_f)net_http_read, (io_sendfile_f)net_http_sendfile, (io_close_f)net_http_close, - (io_finish_f)net_http_finish + (io_finish_f)net_http_finish, + (io_setmode_f)net_http_setmode, + (io_poll_f)net_http_poll, + 0 }; IOStream ssl_io_funcs = { @@ -91,7 +98,10 @@ (io_read_f)net_ssl_read, NULL, (io_close_f)net_ssl_close, - (io_finish_f)net_ssl_finish + (io_finish_f)net_ssl_finish, + (io_setmode_f)net_ssl_setmode, + (io_poll_f)net_ssl_poll, + 0 }; @@ -171,6 +181,34 @@ system_close(st->fd); } +void net_sys_setmode(SysStream *st, int mode) { + int flags; + if (-1 == (flags = fcntl(st->fd, F_GETFL, 0))) { + flags = 0; + } + if(mode == IO_MODE_BLOCKING) { + if (fcntl(st->fd, F_SETFL, flags & ~O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } else if(mode == IO_MODE_NONBLOCKING) { + if (fcntl(st->fd, F_SETFL, flags | O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } +} + +int net_sys_poll(SysStream *st, EventHandler *ev, int events, Event *cb) { + switch(events) { + default: return -1; + case IO_POLL_NONE: return ev_remove_poll(ev, st->fd); + case IO_POLL_IN: return ev_pollin(ev, st->fd, cb); + case IO_POLL_OUT: return ev_pollout(ev, st->fd, cb); + case IO_POLL_IN | IO_POLL_OUT: return -1; // TODO: implement + } +} + #elif defined(XP_WIN32) ssize_t net_sys_write(SysStream *st, void *buf, size_t nbytes) { @@ -288,6 +326,14 @@ } } +void net_http_setmode(HttpStream *st, int mode) { + st->fd->setmode(st->fd, mode); +} + +int net_http_poll(HttpStream *st, EventHandler *ev, int events, Event *cb) { + return st->fd->poll(st->fd, ev, events, cb); +} + /* * SSLStream implementation @@ -342,6 +388,34 @@ } +void net_ssl_setmode(SSLStream *st, int mode) { + int flags; + if (-1 == (flags = fcntl(SSL_get_fd(st->ssl), F_GETFL, 0))) { + flags = 0; + } + if(mode == IO_MODE_BLOCKING) { + if (fcntl(SSL_get_fd(st->ssl), F_SETFL, flags & ~O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } else if(mode == IO_MODE_NONBLOCKING) { + if (fcntl(SSL_get_fd(st->ssl), F_SETFL, flags | O_NONBLOCK) != 0) { + perror("fcntl"); + // TODO: error + } + } +} + +int net_ssl_poll(SSLStream *st, EventHandler *ev, int events, Event *cb) { + int fd = SSL_get_fd(st->ssl); + switch(events) { + default: return -1; + case IO_POLL_NONE: return ev_remove_poll(ev, fd); + case IO_POLL_IN: return ev_pollin(ev, fd, cb); + case IO_POLL_OUT: return ev_pollout(ev, fd, cb); + case IO_POLL_IN | IO_POLL_OUT: return -1; // TODO: implement + } +} /* -------------------- public nsapi network functions -------------------- */ @@ -349,6 +423,9 @@ ssize_t r = ((IOStream*)fd)->read(fd, buf, nbytes); if(r == 0) { return IO_EOF; + } else if(r < 0) { + ((IOStream*)fd)->io_errno = errno; + return IO_ERROR; } return r; } @@ -356,6 +433,7 @@ ssize_t net_write(SYS_NETFD fd, void *buf, size_t nbytes) { ssize_t r = ((IOStream*)fd)->write(fd, buf, nbytes); if(r < 0) { + ((IOStream*)fd)->io_errno = errno; return IO_ERROR; } return r; @@ -364,6 +442,7 @@ ssize_t net_writev(SYS_NETFD fd, struct iovec *iovec, int iovcnt) { ssize_t r = ((IOStream*)fd)->writev(fd, iovec, iovcnt); if(r < 0) { + ((IOStream*)fd)->io_errno = errno; return IO_ERROR; } return r; @@ -376,6 +455,9 @@ ssize_t r = net_write(fd, buf.ptr, buf.length); free(buf.ptr); va_end(arg); + if(r < 0) { + ((IOStream*)fd)->io_errno = errno; + } return r; } @@ -384,14 +466,15 @@ if(out->sendfile && sfd->fd && sfd->fd->fd != -1) { ssize_t r = out->sendfile(fd, sfd); if(r < 0) { + out->io_errno = errno; return IO_ERROR; } + return r; } else { // stream/file does not support sendfile // do regular copy return net_fallback_sendfile(out, sfd); } - return IO_ERROR; } // private @@ -419,12 +502,14 @@ hlen -= r; if(r <= 0) { free(buf); + fd->io_errno = errno; return IO_ERROR; } } if(system_lseek(sfd->fd, sfd->offset, SEEK_SET) == -1) { free(buf); + fd->io_errno = errno; return IO_ERROR; } @@ -443,6 +528,7 @@ } free(buf); if(length > 0) { + fd->io_errno = errno; return IO_ERROR; } @@ -451,6 +537,7 @@ trailer += r; tlen -= r; if(r <= 0) { + fd->io_errno = errno; return IO_ERROR; } } @@ -467,6 +554,17 @@ ((IOStream*)fd)->close(fd); } +int net_setnonblock(SYS_NETFD fd, int nonblock) { + ((IOStream*)fd)->setmode( + fd, + nonblock ? IO_MODE_NONBLOCKING : IO_MODE_BLOCKING); + return 0; +} + +int net_errno(SYS_NETFD fd) { + return ((IOStream*)fd)->io_errno; +} + // private void net_finish(SYS_NETFD fd) { ((IOStream*)fd)->finish(fd); diff -r f33974f0dce0 -r aa8393527b1e src/server/util/io.h --- a/src/server/util/io.h Thu Aug 31 16:29:49 2017 +0200 +++ b/src/server/util/io.h Sat Jan 13 19:01:00 2018 +0100 @@ -48,6 +48,13 @@ #define SYS_SOCKET SOCKET #endif +#define IO_MODE_BLOCKING 0 +#define IO_MODE_NONBLOCKING 1 + +#define IO_POLL_NONE 0 +#define IO_POLL_IN 1 +#define IO_POLL_OUT 2 + typedef struct IOStream IOStream; typedef struct SysStream SysStream; typedef struct HttpStream HttpStream; @@ -58,6 +65,8 @@ typedef ssize_t(*io_sendfile_f)(IOStream *, sendfiledata *); typedef void(*io_close_f)(IOStream *); typedef void(*io_finish_f)(IOStream *); +typedef void(*io_setmode_f)(IOStream *, int); +typedef int(*io_poll_f)(IOStream *, EventHandler *, int, Event *); struct IOStream { io_write_f write; @@ -66,6 +75,9 @@ io_sendfile_f sendfile; io_close_f close; io_finish_f finish; + io_setmode_f setmode; + io_poll_f poll; + int io_errno; }; struct SysStream { @@ -102,6 +114,8 @@ ssize_t net_sys_read(SysStream *st, void *buf, size_t nbytes); ssize_t net_sys_sendfile(SysStream *st, sendfiledata *sfd); void net_sys_close(SysStream *st); +void net_sys_setmode(SysStream *st, int mode); +int net_sys_poll(SysStream *st, EventHandler *ev, int events, Event *cb); /* http stream */ IOStream* httpstream_new(pool_handle_t *pool, IOStream *fd); @@ -112,6 +126,8 @@ ssize_t net_http_sendfile(HttpStream *st, sendfiledata *sfd); void net_http_close(HttpStream *st); void net_http_finish(HttpStream *st); +void net_http_setmode(HttpStream *st, int mode); +int net_http_poll(HttpStream *st, EventHandler *ev, int events, Event *cb); /* ssl stream */ IOStream* sslstream_new(pool_handle_t *pool, SSL *ssl); @@ -121,6 +137,8 @@ ssize_t net_ssl_read(SSLStream *st, void *buf, size_t nbytes); void net_ssl_close(SSLStream *st); void net_ssl_finish(SSLStream *st); +void net_ssl_setmode(SSLStream *st, int mode); +int net_ssl_poll(SSLStream *st, EventHandler *ev, int events, Event *cb); /* net_ functions */ ssize_t net_fallback_sendfile(IOStream *fd, sendfiledata *sfd);