--- a/src/server/daemon/event_linux.c Tue Jan 09 15:02:24 2018 +0100 +++ b/src/server/daemon/event_linux.c Wed Jan 10 15:46:17 2018 +0100 @@ -30,65 +30,57 @@ #include <stdlib.h> #include <errno.h> #include <sys/epoll.h> +#include <sys/eventfd.h> #include "../util/systhr.h" #include "../util/atomic.h" +#include "../util/io.h" #include "event.h" #include "event_linux.h" -event_handler_t* evhandler_create(int numthreads) { - event_handler_t *ev = malloc(sizeof(event_handler_t)); - if(ev == NULL) { - return NULL; - } +EVHandler* evhandler_create(EventHandlerConfig *cfg) { + EVHandler *ev = malloc(sizeof(EVHandler)); + ev->current = 0; + ev->instances = calloc(cfg->nthreads, sizeof(void*)); + ev->numins = cfg->nthreads; - ev->ep = calloc(numthreads, sizeof(int)); - if(ev->ep == NULL) { - free(ev); - return NULL; - } - ev->nep = numthreads; - ev->lep = 0; - - /* create ports event threads */ - for(int i=0;i<numthreads;i++) { - /* create port */ - ev->ep[i] = epoll_create(64); - if(ev->ep[i] == 0) { - free(ev->ep); - free(ev); + for(int i=0;i<cfg->nthreads;i++) { + EventHandler *handler = malloc(sizeof(EventHandler)); + ev->instances[i] = handler; + + handler->ep = epoll_create(64); + if(handler->ep == 0) { + // TODO: error + return NULL; + } + handler->eventfd = eventfd(0, 0); + if(handler->eventfd == 0) { + return NULL; + } + struct epoll_event epev; + epev.events = EPOLLIN | EPOLLET; // input event, edge triggered + epev.data.ptr = NULL; + if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventfd, &epev)) { return NULL; } - /* - * start a new handler thread - * the thread needs the event port and a pointer to the event handler - */ - ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); - if(conf == NULL) { - free(ev->ep); - free(ev); - return NULL; - } - conf->handler = ev; - conf->ep = ev->ep[i]; - - systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); - /* TODO: error handling */ + SYS_THREAD t = systhread_start( + 0, + 0, + (thrstartfunc)ev_handle_events, + handler); + systhread_detach(t); } return ev; } -void ev_handle_events(ev_thr_conf_t *conf) { - event_handler_t *ev = conf->handler; - int ep = conf->ep; - - free(conf); +void ev_handle_events(EventHandler *ev) { + int ep = ev->ep; //port_event_t events[16]; struct epoll_event events[16]; @@ -102,9 +94,18 @@ } for(int i=0;i<ret;i++) { - event_t *event = events[i].data.ptr; - if(event->fn) { - int saved_ev = event->poll; + Event *event = events[i].data.ptr; + if(!event) { + ssize_t r = read(ev->eventfd, &event, sizeof(Event*)); + if(r == sizeof(Event*)) { + if(event->fn) { + if(!event->fn(ev, event) && event->finish) { + event->finish(ev, event); + } + } + } + } else if(event->fn) { + int saved_ev = event->events; if(!event->fn(ev, event)) { // event fn returned 0 -> remove event from epoll if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { @@ -121,21 +122,16 @@ event->finish(ev, event); } } else { - if(saved_ev != event->poll) { + if(saved_ev != event->events) { // event type changed struct epoll_event epev; epev.events = EPOLLET; epev.data.ptr = event; // adjust epoll events - if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { - epev.events |= EPOLLIN; - } - if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) { - epev.events |= EPOLLOUT; - } + epev.events = ev_convert2sys_events(event->events); - if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, NULL)) { + if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) { log_ereport( LOG_FAILURE, "epoll_wait failed: %s", @@ -148,40 +144,74 @@ } } -/* returns a event handler port */ -int ev_get_port(event_handler_t *h) { - int nps = h->nep; - if(nps == 1) { - return h->ep[0]; +int ev_convert2sys_events(int events) { + int e = EPOLLET; + if((events & EVENT_POLLIN) == EVENT_POLLIN) { + e |= EPOLLIN; } - - int cp = h->lep % nps; - ws_atomic_inc32(&h->lep); - - return h->ep[cp]; + if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { + e |= EPOLLOUT; + } + return e; } -int ev_pollin(event_handler_t *h, int fd, event_t *event) { +int ev_pollin(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; struct epoll_event epev; epev.events = EPOLLIN | EPOLLET; // input event, edge triggered epev.data.ptr = event; - return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); + return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); } -int ev_pollout(event_handler_t *h, int fd, event_t *event) { +int ev_pollout(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->poll = EVENT_POLLOUT; + event->events = EVENT_POLLOUT; struct epoll_event epev; epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered epev.data.ptr = event; - return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); + return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); +} + +int ev_remove_poll(EventHandler *h, int fd) { + return epoll_ctl(h->ep, EPOLL_CTL_DEL, fd, NULL); +} + +int evt_send(EventHandler *h, Event *event) { + event->object = 0; + event->events = 0; + ssize_t r = write(h->eventfd, &event, sizeof(Event*)); + return r > 0 ? 0 : 1; } -int evt_send(event_handler_t *h, event_t *event) { - event->object = 0; - // TODO: implement using threadpool or eventfd - fprintf(stderr, "Warning: evt_send not implemented\n"); - return 0; +// TODO: remove this fake aio +int ev_aioread(int fd, aiocb_s *cb) { + ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); + cb->result = result; + if(result < 0) { + cb->result_errno = errno; + } + return evt_send(cb->evhandler, cb->event); } + +int ev_aiowrite(int fd, aiocb_s *cb) { + ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset); + cb->result = result; + if(result < 0) { + cb->result_errno = errno; + } + return evt_send(cb->evhandler, cb->event); +} + + +int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); +} + +int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); +} + +int event_removepoll(EventHandler *ev, SYS_NETFD fd) { + return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); +}